专注app软件定制开发【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang

文章目录


前言

Fang专注app软件定制开发是一个的后台任务处理库,采用Postgres DB专注app软件定制开发作为任务队列。同时支持Asynk和Blocking任务。Asynk任务采用的是tokio的特性,Worker工作在tokio下。Blocking任务使用的是std::thread,Worker专注app软件定制开发工作在一个单独的线程。


一、Fang安装

1. 添加依赖

添加Fang到你的Cargo.toml文件中

注意 Fang仅支持rust 1.62+版本

仅使用Blocking

[dependencies]fang = { version = "0.7" , features = ["blocking"], default-features = false }
  • 1
  • 2

仅使用Asynk

[dependencies]fang = { version = "0.7" , features = ["asynk"], default-features = false }
  • 1
  • 2

同时使用Blocking和Asynk

fang = { version = "0.7" }
  • 1

2. 专注app软件定制开发创建数据库

专注app软件定制开发这里需要使用Diesel CLI来完成数据库的迁移,将在后面的文章中介绍

在你的Postgres DB中创建fang_tasks表,然后运行以下脚本

CREATE EXTENSION IF NOT EXISTS "uuid-ossp";CREATE TYPE fang_task_state AS ENUM ('new', 'in_progress', 'failed', 'finished');CREATE TABLE fang_tasks (     id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),     metadata jsonb NOT NULL,     error_message TEXT,     state fang_task_state default 'new' NOT NULL,     task_type VARCHAR default 'common' NOT NULL,     created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),     updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());CREATE INDEX fang_tasks_state_index ON fang_tasks(state);CREATE INDEX fang_tasks_type_index ON fang_tasks(task_type);CREATE INDEX fang_tasks_created_at_index ON fang_tasks(created_at);CREATE INDEX fang_tasks_metadata_index ON fang_tasks(metadata);CREATE TABLE fang_periodic_tasks (  id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),  metadata jsonb NOT NULL,  period_in_seconds INTEGER NOT NULL,  scheduled_at TIMESTAMP WITH TIME ZONE,  created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),  updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW());CREATE INDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);CREATE INDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

这些文件可以在源码目录migrations中找到,github搜Fang,然后进入下载源码。

二、使用

1.定义一个任务

Blocking任务

每个要被Fang执行的任务都必须实现fang::Runnable特质,特质实现#[typetag::serde]使之具有任务的属性。

use fang::Error;use fang::Runnable;use fang::typetag;use fang::PgConnection;use fang::serde::{Deserialize, Serialize};#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]struct MyTask {    pub number: u16,}#[typetag::serde]impl Runnable for MyTask {    fn run(&self, _connection: &PgConnection) -> Result<(), Error> {        println!("the number is {}", self.number);        Ok(())    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

run函数的第二个参数是PgConnection,你可以重复使用它来操作任务,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。

Asynk任务

每个要被Fang执行的任务都必须实现fang::AsyncRunnable特质

注意 不要实现两个同名的AsyncRunnable,这会导致typetag失败

use fang::AsyncRunnable;use fang::asynk::async_queue::AsyncQueueable;use fang::serde::{Deserialize, Serialize};use fang::async_trait;#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]struct AsyncTask {  pub number: u16,}#[typetag::serde]#[async_trait]impl AsyncRunnable for AsyncTask {    async fn run(&self, _queueable: &mut dyn AsyncQueueable) -> Result<(), Error> {        Ok(())    }    // this func is optional to impl    // Default task-type it is common    fn task_type(&self) -> String {        "my-task-type".to_string()    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

2.任务队列

Blocking任务

需要使用Queue::enqueue_task来入队列

use fang::Queue;...Queue::enqueue_task(&MyTask { number: 10 }).unwrap();
  • 1
  • 2
  • 3
  • 4
  • 5

上面的示例在每次调用时都会创建一个新的 postgres 连接

重用相同的 postgres 连接来将多个任务排入队列

let queue = Queue::new();for id in &unsynced_feed_ids {    queue.push_task(&SyncFeedMyTask { feed_id: *id }).unwrap();}
  • 1
  • 2
  • 3
  • 4
  • 5

或者使用PgConnection结构体

Queue::push_task_query(pg_connection, &new_task).unwrap();
  • 1

Asynk任务

使用AsyncQueueable::insert_task来入队,可以根据你自己后端来进行操作,默认为Postgres

use fang::asynk::async_queue::AsyncQueue;use fang::NoTls;use fang::AsyncRunnable;// 创建异步队列let max_pool_size: u32 = 2;let mut queue = AsyncQueue::builder()    // Postgres 数据库 url    .uri("postgres://postgres:postgres@localhost/fang")    // 允许的最大连接数控i昂    .max_pool_size(max_pool_size)    // 如果希望任务中的唯一性,则为false    .duplicated_tasks(true)    .build();// 要进行操作之前,总是要先连接queue.connect(NoTls).await.unwrap();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。

let task = AsyncTask { 8 };let task_returned = queue  .insert_task(&task as &dyn AsyncRunnable)  .await  .unwrap();
  • 1
  • 2
  • 3
  • 4
  • 5

3. 启动Worker

Blocking任务

每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
使用WorkerPool来启动Worker,WorkerPool::new接收一个整型参数,Worker的数量

use fang::WorkerPool;WorkerPool::new(10).start();
  • 1
  • 2
  • 3

使用shutdown停止线程

use fang::WorkerPool;worker_pool = WorkerPool::new(10).start().unwrap;worker_pool.shutdown()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

Asynk任务

每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
使用AsyncWorkerPool来启动Worker

use fang::asynk::async_worker_pool::AsyncWorkerPool;// 必须创建一个队列// 插入一些任务let mut pool: AsyncWorkerPool<AsyncQueue<NoTls>> = AsyncWorkerPool::builder()        .number_of_workers(max_pool_size)        .queue(queue.clone())        .build();pool.start().await;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4. 配置

Blocking任务

在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用WorkerPool.new_with_params来创建,它接受两个参数——工人数量和WorkerParams结构体。

WorkerParams的定义是这样的

pub struct WorkerParams {    pub retention_mode: Option<RetentionMode>,    pub sleep_params: Option<SleepParams>,    pub task_type: Option<String>,}pub enum RetentionMode {    KeepAll,    RemoveAll,    RemoveFinished,}pub struct SleepParams {    pub sleep_period: u64,    pub max_sleep_period: u64,    pub min_sleep_period: u64,    pub sleep_step: u64,}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

Asynk任务

使用AsyncWorkerPool的builder方法即可。需要链式调用,创建一个AsyncWorkerPool,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。

5. 配置Worker类型

可以指定Worker类型,来指定指定类型Worker执行指定类型的任务

Blocking任务

Runnable特质中添加方法

...#[typetag::serde]impl Runnable for MyTask {    fn run(&self) -> Result<(), Error> {        println!("the number is {}", self.number);        Ok(())    }    fn task_type(&self) -> String {        "number".to_string()    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

设置task_type

let mut worker_params = WorkerParams::new();worker_params.set_task_type("number".to_string());WorkerPool::new_with_params(10, worker_params).start();
  • 1
  • 2
  • 3
  • 4

没有设置task_type的Worker可以执行任何任务

Asynk任务

功能与Blocking任务相同。使用AsyncWorker的builer来设置

6. 配置保留模式

默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:

pub enum RetentionMode {    KeepAll,        \\ 不删除任务    RemoveAll,      \\ 删除所有任务    RemoveFinished, \\ 默认值,完成就删除}
  • 1
  • 2
  • 3
  • 4
  • 5

Blocking任务

使用set_retention_mode设置保留模式

let mut worker_params = WorkerParams::new();worker_params.set_retention_mode(RetentionMode::RemoveAll);WorkerPool::new_with_params(10, worker_params).start();
  • 1
  • 2
  • 3
  • 4

Asynk任务

使用AsyncWorker的builder。

7. 配置睡眠值

Blocking任务

使用 useSleepParams来配置睡眠值:

pub struct SleepParams {    pub sleep_period: u64,     \\ 默认值 5    pub max_sleep_period: u64, \\ 默认值 15    pub min_sleep_period: u64, \\ 默认值 5    pub sleep_step: u64,       \\ 默认值 5}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。

使用set_sleep_params来设置

let sleep_params = SleepParams {    sleep_period: 2,    max_sleep_period: 6,    min_sleep_period: 2,    sleep_step: 1,};let mut worker_params = WorkerParams::new();worker_params.set_sleep_params(sleep_params);WorkerPool::new_with_params(10, worker_params).start();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

Asynk任务

使用AsyncWorker的builder。

8. 定时任务

如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建fang_periodic_tasks表,就在本文安装那个部分。

Blocking任务

use fang::Scheduler;use fang::Queue;let queue = Queue::new();queue     .push_periodic_task(&SyncMyTask::default(), 120)     .unwrap();queue     .push_periodic_task(&DeliverMyTask::default(), 60)     .unwrap();Scheduler::start(10, 5);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。

Scheduler::start(10, 5)启动调度程序。它接受两个参数:

  • 数据库检查周期(以秒为单位)
  • 可接受的错误限制(以秒为单位)

Asynk任务

use fang::asynk::async_scheduler::Scheduler;use fang::asynk::async_queue::AsyncQueueable;use fang::asynk::async_queue::AsyncQueue;// 在此之前构建一个Async队列let schedule_in_future = Utc::now() + OtherDuration::seconds(5);let _periodic_task = queue.insert_periodic_task(    &AsyncTask { number: 1 },    schedule_in_future,    10,).await;let check_period: u64 = 1;let error_margin_seconds: u64 = 2;let mut scheduler = Scheduler::builder()    .check_period(check_period)    .error_margin_seconds(error_margin_seconds)    .queue(&mut queue as &mut dyn AsyncQueueable)    .build();// 在其他线程或循环之前添加更多任务// 调度程序循环scheduler.start().await.unwrap();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

总结

以上就是本文的所有内容,介绍了Rust中借助Fang库来实现后台任务,进行后台任务的处理,还有定时任务,配置等。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发