object_store/
factory.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{fs, path};
16
17use common_telemetry::info;
18use opendal::layers::HttpClientLayer;
19use opendal::services::{Fs, Gcs, Oss, S3};
20use snafu::prelude::*;
21
22use crate::config::{AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config};
23use crate::error::{self, Result};
24use crate::services::Azblob;
25use crate::util::{build_http_client, clean_temp_dir, join_dir, normalize_dir};
26use crate::{ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR, ObjectStore, util};
27
28pub async fn new_raw_object_store(
29    store: &ObjectStoreConfig,
30    data_home: &str,
31) -> Result<ObjectStore> {
32    let data_home = normalize_dir(data_home);
33    match store {
34        ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config),
35        ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await,
36        ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await,
37        ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await,
38        ObjectStoreConfig::Gcs(gcs_config) => new_gcs_object_store(gcs_config).await,
39    }
40}
41
42/// A helper function to create a file system object store.
43pub fn new_fs_object_store(data_home: &str, _file_config: &FileConfig) -> Result<ObjectStore> {
44    fs::create_dir_all(path::Path::new(&data_home))
45        .context(error::CreateDirSnafu { dir: data_home })?;
46    info!("The file storage home is: {}", data_home);
47
48    let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
49    clean_temp_dir(&atomic_write_dir)?;
50
51    // Compatible code. Remove this after a major release.
52    let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
53    clean_temp_dir(&old_atomic_temp_dir)?;
54
55    let builder = Fs::default()
56        .root(data_home)
57        .atomic_write_dir(&atomic_write_dir);
58
59    let object_store = ObjectStore::new(builder)
60        .context(error::InitBackendSnafu)?
61        .finish();
62
63    Ok(object_store)
64}
65
66pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
67    let root = util::normalize_dir(&azblob_config.connection.root);
68    info!(
69        "The azure storage container is: {}, root is: {}",
70        azblob_config.connection.container, &root
71    );
72
73    let client = build_http_client(&azblob_config.http_client)?;
74    let builder = Azblob::from(&azblob_config.connection);
75    let operator = ObjectStore::new(builder)
76        .context(error::InitBackendSnafu)?
77        .layer(HttpClientLayer::new(client))
78        .finish();
79
80    Ok(operator)
81}
82
83pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
84    let root = util::normalize_dir(&gcs_config.connection.root);
85    info!(
86        "The gcs storage bucket is: {}, root is: {}",
87        gcs_config.connection.bucket, &root
88    );
89
90    let client = build_http_client(&gcs_config.http_client)?;
91    let builder = Gcs::from(&gcs_config.connection);
92    let operator = ObjectStore::new(builder)
93        .context(error::InitBackendSnafu)?
94        .layer(HttpClientLayer::new(client))
95        .finish();
96
97    Ok(operator)
98}
99
100pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
101    let root = util::normalize_dir(&oss_config.connection.root);
102    info!(
103        "The oss storage bucket is: {}, root is: {}",
104        oss_config.connection.bucket, &root
105    );
106
107    let client = build_http_client(&oss_config.http_client)?;
108    let builder = Oss::from(&oss_config.connection);
109    let operator = ObjectStore::new(builder)
110        .context(error::InitBackendSnafu)?
111        .layer(HttpClientLayer::new(client))
112        .finish();
113
114    Ok(operator)
115}
116
117pub async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
118    let root = util::normalize_dir(&s3_config.connection.root);
119    info!(
120        "The s3 storage bucket is: {}, root is: {}",
121        s3_config.connection.bucket, &root
122    );
123
124    let client = build_http_client(&s3_config.http_client)?;
125    let builder = S3::from(&s3_config.connection);
126    let operator = ObjectStore::new(builder)
127        .context(error::InitBackendSnafu)?
128        .layer(HttpClientLayer::new(client))
129        .finish();
130
131    Ok(operator)
132}