1use std::{fs, path};
16
17use common_base::secrets::ExposeSecret;
18use common_telemetry::info;
19use opendal::services::{Fs, Gcs, Oss, S3};
20use snafu::prelude::*;
21
22use crate::config::{AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config};
23use crate::error::{self, Result};
24use crate::services::Azblob;
25use crate::util::{build_http_client, clean_temp_dir, join_dir, normalize_dir};
26use crate::{util, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
27
28pub async fn new_raw_object_store(
29 store: &ObjectStoreConfig,
30 data_home: &str,
31) -> Result<ObjectStore> {
32 let data_home = normalize_dir(data_home);
33 match store {
34 ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config).await,
35 ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await,
36 ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await,
37 ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await,
38 ObjectStoreConfig::Gcs(gcs_config) => new_gcs_object_store(gcs_config).await,
39 }
40}
41
42pub async fn new_fs_object_store(
44 data_home: &str,
45 _file_config: &FileConfig,
46) -> Result<ObjectStore> {
47 fs::create_dir_all(path::Path::new(&data_home))
48 .context(error::CreateDirSnafu { dir: data_home })?;
49 info!("The file storage home is: {}", data_home);
50
51 let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
52 clean_temp_dir(&atomic_write_dir)?;
53
54 let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
56 clean_temp_dir(&old_atomic_temp_dir)?;
57
58 let builder = Fs::default()
59 .root(data_home)
60 .atomic_write_dir(&atomic_write_dir);
61
62 let object_store = ObjectStore::new(builder)
63 .context(error::InitBackendSnafu)?
64 .finish();
65
66 Ok(object_store)
67}
68
69pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
70 let root = util::normalize_dir(&azblob_config.root);
71
72 info!(
73 "The azure storage container is: {}, root is: {}",
74 azblob_config.container, &root
75 );
76
77 let client = build_http_client(&azblob_config.http_client)?;
78
79 let mut builder = Azblob::default()
80 .root(&root)
81 .container(&azblob_config.container)
82 .endpoint(&azblob_config.endpoint)
83 .account_name(azblob_config.account_name.expose_secret())
84 .account_key(azblob_config.account_key.expose_secret())
85 .http_client(client);
86
87 if let Some(token) = &azblob_config.sas_token {
88 builder = builder.sas_token(token);
89 };
90
91 Ok(ObjectStore::new(builder)
92 .context(error::InitBackendSnafu)?
93 .finish())
94}
95
96pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
97 let root = util::normalize_dir(&gcs_config.root);
98 info!(
99 "The gcs storage bucket is: {}, root is: {}",
100 gcs_config.bucket, &root
101 );
102
103 let client = build_http_client(&gcs_config.http_client);
104
105 let builder = Gcs::default()
106 .root(&root)
107 .bucket(&gcs_config.bucket)
108 .scope(&gcs_config.scope)
109 .credential_path(gcs_config.credential_path.expose_secret())
110 .credential(gcs_config.credential.expose_secret())
111 .endpoint(&gcs_config.endpoint)
112 .http_client(client?);
113
114 Ok(ObjectStore::new(builder)
115 .context(error::InitBackendSnafu)?
116 .finish())
117}
118
119pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
120 let root = util::normalize_dir(&oss_config.root);
121 info!(
122 "The oss storage bucket is: {}, root is: {}",
123 oss_config.bucket, &root
124 );
125
126 let client = build_http_client(&oss_config.http_client)?;
127
128 let builder = Oss::default()
129 .root(&root)
130 .bucket(&oss_config.bucket)
131 .endpoint(&oss_config.endpoint)
132 .access_key_id(oss_config.access_key_id.expose_secret())
133 .access_key_secret(oss_config.access_key_secret.expose_secret())
134 .http_client(client);
135
136 Ok(ObjectStore::new(builder)
137 .context(error::InitBackendSnafu)?
138 .finish())
139}
140
141pub async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
142 let root = util::normalize_dir(&s3_config.root);
143
144 info!(
145 "The s3 storage bucket is: {}, root is: {}",
146 s3_config.bucket, &root
147 );
148
149 let client = build_http_client(&s3_config.http_client)?;
150
151 let mut builder = S3::default()
152 .root(&root)
153 .bucket(&s3_config.bucket)
154 .access_key_id(s3_config.access_key_id.expose_secret())
155 .secret_access_key(s3_config.secret_access_key.expose_secret())
156 .http_client(client);
157
158 if s3_config.endpoint.is_some() {
159 builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
160 }
161 if s3_config.region.is_some() {
162 builder = builder.region(s3_config.region.as_ref().unwrap());
163 }
164 if s3_config.enable_virtual_host_style {
165 builder = builder.enable_virtual_host_style();
166 }
167
168 Ok(ObjectStore::new(builder)
169 .context(error::InitBackendSnafu)?
170 .finish())
171}