1use std::{fs, path};
16
17use common_base::secrets::ExposeSecret;
18use common_telemetry::info;
19use opendal::layers::HttpClientLayer;
20use opendal::services::{Fs, Gcs, Oss, S3};
21use snafu::prelude::*;
22
23use crate::config::{AzblobConfig, FileConfig, GcsConfig, ObjectStoreConfig, OssConfig, S3Config};
24use crate::error::{self, Result};
25use crate::services::Azblob;
26use crate::util::{build_http_client, clean_temp_dir, join_dir, normalize_dir};
27use crate::{util, ObjectStore, ATOMIC_WRITE_DIR, OLD_ATOMIC_WRITE_DIR};
28
29pub async fn new_raw_object_store(
30 store: &ObjectStoreConfig,
31 data_home: &str,
32) -> Result<ObjectStore> {
33 let data_home = normalize_dir(data_home);
34 match store {
35 ObjectStoreConfig::File(file_config) => new_fs_object_store(&data_home, file_config).await,
36 ObjectStoreConfig::S3(s3_config) => new_s3_object_store(s3_config).await,
37 ObjectStoreConfig::Oss(oss_config) => new_oss_object_store(oss_config).await,
38 ObjectStoreConfig::Azblob(azblob_config) => new_azblob_object_store(azblob_config).await,
39 ObjectStoreConfig::Gcs(gcs_config) => new_gcs_object_store(gcs_config).await,
40 }
41}
42
43pub async fn new_fs_object_store(
45 data_home: &str,
46 _file_config: &FileConfig,
47) -> Result<ObjectStore> {
48 fs::create_dir_all(path::Path::new(&data_home))
49 .context(error::CreateDirSnafu { dir: data_home })?;
50 info!("The file storage home is: {}", data_home);
51
52 let atomic_write_dir = join_dir(data_home, ATOMIC_WRITE_DIR);
53 clean_temp_dir(&atomic_write_dir)?;
54
55 let old_atomic_temp_dir = join_dir(data_home, OLD_ATOMIC_WRITE_DIR);
57 clean_temp_dir(&old_atomic_temp_dir)?;
58
59 let builder = Fs::default()
60 .root(data_home)
61 .atomic_write_dir(&atomic_write_dir);
62
63 let object_store = ObjectStore::new(builder)
64 .context(error::InitBackendSnafu)?
65 .finish();
66
67 Ok(object_store)
68}
69
70pub async fn new_azblob_object_store(azblob_config: &AzblobConfig) -> Result<ObjectStore> {
71 let root = util::normalize_dir(&azblob_config.root);
72
73 info!(
74 "The azure storage container is: {}, root is: {}",
75 azblob_config.container, &root
76 );
77
78 let client = build_http_client(&azblob_config.http_client)?;
79
80 let mut builder = Azblob::default()
81 .root(&root)
82 .container(&azblob_config.container)
83 .endpoint(&azblob_config.endpoint)
84 .account_name(azblob_config.account_name.expose_secret())
85 .account_key(azblob_config.account_key.expose_secret());
86
87 if let Some(token) = &azblob_config.sas_token {
88 builder = builder.sas_token(token);
89 };
90
91 let operator = ObjectStore::new(builder)
92 .context(error::InitBackendSnafu)?
93 .layer(HttpClientLayer::new(client))
94 .finish();
95
96 Ok(operator)
97}
98
99pub async fn new_gcs_object_store(gcs_config: &GcsConfig) -> Result<ObjectStore> {
100 let root = util::normalize_dir(&gcs_config.root);
101 info!(
102 "The gcs storage bucket is: {}, root is: {}",
103 gcs_config.bucket, &root
104 );
105
106 let client = build_http_client(&gcs_config.http_client)?;
107
108 let builder = Gcs::default()
109 .root(&root)
110 .bucket(&gcs_config.bucket)
111 .scope(&gcs_config.scope)
112 .credential_path(gcs_config.credential_path.expose_secret())
113 .credential(gcs_config.credential.expose_secret())
114 .endpoint(&gcs_config.endpoint);
115
116 let operator = ObjectStore::new(builder)
117 .context(error::InitBackendSnafu)?
118 .layer(HttpClientLayer::new(client))
119 .finish();
120
121 Ok(operator)
122}
123
124pub async fn new_oss_object_store(oss_config: &OssConfig) -> Result<ObjectStore> {
125 let root = util::normalize_dir(&oss_config.root);
126 info!(
127 "The oss storage bucket is: {}, root is: {}",
128 oss_config.bucket, &root
129 );
130
131 let client = build_http_client(&oss_config.http_client)?;
132
133 let builder = Oss::default()
134 .root(&root)
135 .bucket(&oss_config.bucket)
136 .endpoint(&oss_config.endpoint)
137 .access_key_id(oss_config.access_key_id.expose_secret())
138 .access_key_secret(oss_config.access_key_secret.expose_secret());
139
140 let operator = ObjectStore::new(builder)
141 .context(error::InitBackendSnafu)?
142 .layer(HttpClientLayer::new(client))
143 .finish();
144
145 Ok(operator)
146}
147
148pub async fn new_s3_object_store(s3_config: &S3Config) -> Result<ObjectStore> {
149 let root = util::normalize_dir(&s3_config.root);
150
151 info!(
152 "The s3 storage bucket is: {}, root is: {}",
153 s3_config.bucket, &root
154 );
155
156 let client = build_http_client(&s3_config.http_client)?;
157
158 let mut builder = S3::default()
159 .root(&root)
160 .bucket(&s3_config.bucket)
161 .access_key_id(s3_config.access_key_id.expose_secret())
162 .secret_access_key(s3_config.secret_access_key.expose_secret());
163
164 if s3_config.endpoint.is_some() {
165 builder = builder.endpoint(s3_config.endpoint.as_ref().unwrap());
166 }
167 if s3_config.region.is_some() {
168 builder = builder.region(s3_config.region.as_ref().unwrap());
169 }
170 if s3_config.enable_virtual_host_style {
171 builder = builder.enable_virtual_host_style();
172 }
173
174 let operator = ObjectStore::new(builder)
175 .context(error::InitBackendSnafu)?
176 .layer(HttpClientLayer::new(client))
177 .finish();
178
179 Ok(operator)
180}