Workers

Loco 提供了以下用于后台任务的选项:

  • Redis 支持 (由 sidekiq-rs 驱动)
  • Postgres 支持 (自有实现)
  • SQLite 支持 (自有实现)
  • 基于 Tokio-async (同进程,基于事件线程的后台任务)

您可以入队和执行任务,而无需了解实际的后台队列实现,类似于 Rails 的 ActiveJob,因此您可以通过简单的配置更改进行切换,而无需修改代码。

异步 vs 队列

当您生成一个新的应用时,您可能为 workers 选择了默认的 async 配置。这意味着 workers 在 Tokio 的异步池中启动任务,这为您在同一运行服务器中提供了合适的后台进程。

您可能希望配置任务在由队列支持的单独进程中运行,以便在服务器之间分配负载。

首先,切换到 BackgroundQueue

# Worker 配置
workers:
  # 指定 worker 模式。选项:
  #   - BackgroundQueue - Workers 在后台异步运行,处理队列中的任务。
  #   - ForegroundBlocking - Workers 在前台同步运行,阻塞直到任务完成。
  #   - BackgroundAsync - Workers 在后台异步运行,利用异步能力处理任务。
  mode: BackgroundQueue

然后,配置一个基于 Redis 的队列后端:

queue:
  kind: Redis
  # Redis 连接 URI
  uri: ""
  dangerously_flush: false

或者一个基于 Postgres 的队列后端:

queue:
  kind: Postgres
  # Postgres 队列连接 URI
  uri: ""
  dangerously_flush: false

或者一个基于 SQLite 的队列后端:

queue:
  kind: Sqlite
  # SQLite 队列连接 URI
  uri: ""
  dangerously_flush: false

运行 worker 进程

您可以以两种方式运行,具体取决于您为后台 workers 选择的设置:

Usage: demo_app start [OPTIONS]

Options:
  -w, --worker                     启动 worker
  -s, --server-and-worker          启动同进程的服务器和 worker

当您配置了真正的 Redis 队列并且想要一个仅用于执行后台任务的进程时,请选择 --worker。 您可以为每个服务器使用单个进程。 在这种情况下,您可以使用 cargo loco start 仅运行您的主 Web 或 API 服务器。

$ cargo loco start --worker # 启动一个独立的 worker 任务执行进程
$ cargo loco start # 启动一个独立的 API 服务或 Web 服务器,没有 workers

当您配置了 async 后台 workers 时,请选择 -s,任务将作为当前运行的服务器进程的一部分执行。

例如,运行 --server-and-worker

$ cargo loco start --server-and-worker # API 服务和 workers 都将执行

在代码中创建后台任务

要使用 worker,我们主要考虑将任务添加到队列中,因此您 use worker 并在之后执行:

    // .. 在您的控制器中 ..
    DownloadWorker::perform_later(
        &ctx,
        DownloadWorkerArgs {
            user_guid: "foo".to_string(),
        },
    )
    .await

与 Rails 和 Ruby 不同,使用 Rust,您可以享受 强类型 的任务参数,这些参数会被序列化并推送到队列中。

在 worker 中使用共享状态

请参阅 如何拥有全局状态,但通常您可以通过使用类似 lazy_static 的方法来使用单个共享状态,然后在 worker 中简单地引用它。

如果此状态可以序列化,强烈建议 通过 WorkerArgs 传递它。

创建新的 worker

添加 worker 意味着编写后台任务逻辑,以接收 参数 并执行任务。 我们还需要让 loco 知道它,并将其注册到全局任务处理器中。

将 worker 添加到 workers/

#[async_trait]
impl BackgroundWorker<DownloadWorkerArgs> for DownloadWorker {
    fn build(ctx: &AppContext) -> Self {
        Self { ctx: ctx.clone() }
    }
    async fn perform(&self, args: DownloadWorkerArgs) -> Result<()> {
        println!("================================================");
        println!("正在向用户 {} 发送付款报告", args.user_guid);

        // TODO: 这里进行一些实际的工作...

        println!("================================================");
        Ok(())
    }
}

并在 app.rs 中注册它:

#[async_trait]
impl Hooks for App {
//..
    async fn connect_workers(ctx: &AppContext, queue: &Queue) -> Result<()> {
        queue.register(DownloadWorker::build(ctx)).await?;
        Ok(())
    }
// ..
}

生成 Worker

要使用 loco generate 自动添加 worker,请执行以下命令:

cargo loco generate worker report_worker

worker 生成器会创建一个与您的应用关联的 worker 文件,并生成一个测试模板文件,使您能够验证您的 worker。

配置 Workers

在您的 config/<environment>.yaml 中,您可以指定 worker 模式。 BackgroundAsyncBackgroundQueue 将以非阻塞方式处理任务,而 ForegroundBlocking 将以阻塞方式处理任务。

BackgroundAsyncBackgroundQueue 之间的主要区别在于,后者将使用 Redis/Postgres/SQLite 来存储任务,而前者不需要 Redis/Postgres/SQLite,将在同一进程内使用异步内存。

# Worker 配置
workers:
  # 指定 worker 模式。选项:
  #   - BackgroundQueue - Workers 在后台异步运行,处理队列中的任务。
  #   - ForegroundBlocking - Workers 在前台同步运行,阻塞直到任务完成。
  #   - BackgroundAsync - Workers 在后台异步运行,利用异步能力处理任务。
  mode: BackgroundQueue

从 UI 管理 Workers

您可以使用 Loco admin job project 管理任务队列。 <img style="width:100%; max-width:640px" src="tour.png"/>

通过 CLI 管理任务队列

任务队列管理功能提供了一种强大而灵活的方式来处理应用程序中任务的生命周期。它允许您取消、清理、删除过时任务、导出任务详情和导入任务,确保高效且有组织的任务处理。

功能概览

  • 取消任务
    提供按名称取消特定任务的功能,将其状态更新为 cancelled。 这对于停止不再需要、不相关或在检测到错误时阻止它们被处理的任务非常有用。
  • 清理任务
    允许删除已经完成或取消的任务。 这通过消除不必要的条目,有助于维护干净高效的任务队列。
  • 清除过时任务
    允许您根据任务的存活天数删除任务。 这对于通过删除较旧的、不相关的任务来保持精简的任务队列特别有用。 注意:您可以使用 --dump 选项将任务详情导出到文件,手动修改导出文件中的任务参数,然后使用 import 功能将更新后的任务重新引入系统。
  • 导出任务详情
    支持将所有任务的详细信息导出到指定位置的文件格式。 此功能对于备份、审计或进一步分析很有价值。
  • 导入任务
    方便从外部文件导入任务,从而可以轻松地将新任务或恢复任务添加到系统中。 这确保了外部任务数据无缝集成到您的应用程序工作流程中。

要访问任务管理命令,请使用以下 CLI 结构:

管理任务队列

Usage: demo_app-cli jobs [OPTIONS] <COMMAND>

Commands:
  cancel  取消具有指定名称的任务,将其状态设置为“cancelled”
  tidy    删除已完成或已取消的任务
  purge   根据任务的存活天数删除任务
  dump    将所有任务的详细信息保存到指定文件夹中的文件
  import  从文件导入任务
  help    打印此消息或给定子命令的帮助信息

Options:
  -e, --environment <ENVIRONMENT>  指定环境 [default: development]
  -h, --help                       打印帮助信息
  -V, --version                    打印版本信息

测试 Worker

您可以使用 Loco 轻松测试您的 worker 后台任务。 确保您的 worker 设置为 ForegroundBlocking 模式,这将阻塞任务,确保它同步运行。 测试 worker 时,测试将等待直到您的 worker 完成,从而允许您验证 worker 是否完成了其预期的任务。

建议在 tests/workers 目录中实现测试,以将所有 worker 测试整合到一个位置。

此外,您可以利用 worker 生成器,它会自动创建测试,从而节省您在库中配置测试的时间。

以下是如何构建测试的示例:

use loco_rs::testing::prelude::*;

#[tokio::test]
#[serial]
async fn test_run_report_worker_worker() {
    // 设置测试环境
    let boot = boot_test::<App, Migrator>().await.unwrap();

    // 以 'ForegroundBlocking' 模式执行 worker,防止它异步运行
    assert!(
        ReportWorkerWorker::perform_later(&boot.app_context, ReportWorkerWorkerArgs {})
            .await
            .is_ok()
    );

    // 在 worker 执行后包含额外的断言验证
}