存储

在 Loco Storage 中,我们帮助用户通过多种操作来处理文件。存储可以是内存存储、磁盘存储,或者使用云服务,例如 AWS S3、GCP 和 Azure。

Loco 支持简单的存储操作和高级功能,例如数据镜像或具有不同故障模式的备份策略。

默认情况下,内存存储和磁盘存储是开箱即用的。要使用云服务提供商,您应该指定以下特性(features):

  • storage_aws_s3
  • storage_azure
  • storage_gcp
  • all_storage

默认情况下,Loco 初始化一个 Null 提供程序,这意味着任何存储操作都将返回错误。

设置

app.rs 文件中添加 after_context 函数作为 Hook,并从 loco_rs 导入 storage 模块。

use loco_rs::storage;

async fn after_context(ctx: AppContext) -> Result<AppContext> {
    Ok(ctx)
}

此 Hook 返回一个 Storage 实例,其中包含所有存储配置,将在接下来的章节中介绍。此 Storage 实例作为应用程序上下文的一部分存储,并在控制器、端点、任务 worker 等中可用。

术语表

StorageDriverTrait 实现,用于执行存储操作
Storage用于管理一个或多个存储驱动程序的抽象实现。
StrategyTrait,实现 Storage 的各种策略,例如镜像或备份。
FailureMode在每个 Strategy 中实现,用于确定在发生故障时如何处理操作。

初始化 Storage

Storage 可以配置为单个驱动程序或多个驱动程序。

单个存储

在此示例中,我们初始化了内存驱动程序,并使用 single 函数创建了一个新的 storage。

use loco_rs::storage;

async fn after_context(ctx: AppContext) -> Result<AppContext> {
    Ok(AppContext {
        storage: Storage::single(storage::drivers::mem::new()).into(),
        ..ctx
    })
}

多个驱动程序

对于高级用法,您可以设置多个驱动程序并应用开箱即用的智能策略。每个策略都有自己的一组故障模式,您可以决定如何处理这些模式。

创建多个驱动程序:

use crate::storage::{drivers, Storage};

let aws_1 = drivers::aws::new("users");
let azure = drivers::azure::new("users");
let aws_2 = drivers::aws::new("users-mirror");

镜像策略 (Mirror Strategy):

您可以通过定义镜像服务来保持多个服务同步。镜像服务在两个或多个从属服务之间复制上传、删除、重命名和复制操作。下载行为会冗余地检索数据,这意味着如果从主存储检索文件失败,则返回在辅助存储中找到的第一个文件。

行为 (Behaviour)

创建三个存储实例后,我们需要创建镜像策略实例并定义故障模式。镜像策略需要主存储和辅助存储列表,以及故障模式选项:

  • MirrorAll: 所有辅助存储都必须成功。如果一个失败,操作将继续执行其余操作,但会返回错误。
  • AllowMirrorFailure: 当一个或多个镜像操作失败时,操作不会返回错误。

故障模式与上传、删除、移动和复制相关。

示例:


// 通过设置主存储和按名称设置辅助存储来定义镜像策略。
let strategy = Box::new(MirrorStrategy::new(
    "store_1",
    Some(vec!["store_2".to_string(), "store_3".to_string()]),
    FailureMode::MirrorAll,
)) as Box<dyn StorageStrategy>;

// 使用存储映射和策略创建 storage。
 let storage = Storage::new(
    BTreeMap::from([
        ("store_1".to_string(), aws_1),
        ("store_2".to_string(), azure),
        ("store_3".to_string(), aws_2),
    ]),
    strategy.into(),
);

备份策略 (Backup Strategy):

您可以跨多个存储备份您的操作,并控制故障模式策略。

创建三个存储实例后,我们需要创建备份策略实例并定义故障模式。备份策略需要主存储和辅助存储列表,以及故障模式选项:

  • BackupAll: 所有辅助存储都必须成功。如果一个失败,操作将继续执行其余操作,但会返回错误。
  • AllowBackupFailure: 当一个或多个备份操作失败时,操作不会返回错误。
  • AtLeastOneFailure: 至少一个操作应该成功。
  • CountFailure: 给定数量的备份应该成功。

故障模式与上传、删除、移动和复制相关。下载始终从主存储检索文件。

示例:


// 通过设置主存储和按名称设置辅助存储来定义备份策略。
let strategy: Box<dyn StorageStrategy> = Box::new(BackupStrategy::new(
    "store_1",
    Some(vec!["store_2".to_string(), "store_3".to_string()]),
    FailureMode::AllowBackupFailure,
)) as Box<dyn StorageStrategy>;

let storage = Storage::new(
    BTreeMap::from([
        ("store_1".to_string(), store_1),
        ("store_2".to_string(), store_2),
        ("store_3".to_string(), store_3),
    ]),
    strategy.into(),
);

创建您自己的策略 (Create Your Own Strategy)

如果您有特定的策略,您可以通过实现 StorageStrategy 并实现所有存储功能来轻松创建它。

在控制器中使用 (Usage In Controller)

按照此示例操作,请确保在 axum crate 中启用 multipart feature。

async fn upload_file(
    State(ctx): State<AppContext>,
    mut multipart: Multipart,
) -> Result<Response> {
    let mut file = None;
    while let Some(field) = multipart.next_field().await.map_err(|err| {
        tracing::error!(error = ?err,"could not readd multipart");
        Error::BadRequest("could not readd multipart".into())
    })? {
        let file_name = match field.file_name() {
            Some(file_name) => file_name.to_string(),
            _ => return Err(Error::BadRequest("file name not found".into())),
        };

        let content = field.bytes().await.map_err(|err| {
            tracing::error!(error = ?err,"could not readd bytes");
            Error::BadRequest("could not readd bytes".into())
        })?;

        let path = PathBuf::from("folder").join(file_name);
        ctx.storage.as_ref().upload(path.as_path(), &content).await?;

        file = Some(path);
    }

    file.map_or_else(not_found, |path| {
        format::json(views::upload::Response::new(path.as_path()))
    })
}

测试 (Testing)

通过在您的控制器中测试文件存储,您可以参考以下示例:

use loco_rs::testing::prelude::*;

#[tokio::test]
#[serial]
async fn can_register() {
    request::<App, _, _>(|request, ctx| async move {
        let file_content = "loco file upload";
        let file_part = Part::bytes(file_content.as_bytes()).file_name("loco.txt");

        let multipart_form = MultipartForm::new().add_part("file", file_part);

        let response = request.post("/upload/file").multipart(multipart_form).await;

        response.assert_status_ok();

        let res: views::upload::Response = serde_json::from_str(&response.text()).unwrap();

        let stored_file: String = ctx.storage.as_ref().download(&res.path).await.unwrap();

        assert_eq!(stored_file, file_content);
    })
    .await;
}