1use std::time::Duration;
16
17use common_base::readable_size::ReadableSize;
18use common_base::secrets::{ExposeSecret, SecretString};
19use opendal::services::{Azblob, Gcs, Oss, S3};
20use serde::{Deserialize, Serialize};
21
22use crate::util;
23
24const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
25
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28#[serde(tag = "type")]
29pub enum ObjectStoreConfig {
30 File(FileConfig),
31 S3(S3Config),
32 Oss(OssConfig),
33 Azblob(AzblobConfig),
34 Gcs(GcsConfig),
35}
36
37impl Default for ObjectStoreConfig {
38 fn default() -> Self {
39 ObjectStoreConfig::File(FileConfig {})
40 }
41}
42
43impl ObjectStoreConfig {
44 pub fn provider_name(&self) -> &'static str {
46 match self {
47 Self::File(_) => "File",
48 Self::S3(_) => "S3",
49 Self::Oss(_) => "Oss",
50 Self::Azblob(_) => "Azblob",
51 Self::Gcs(_) => "Gcs",
52 }
53 }
54
55 pub fn is_object_storage(&self) -> bool {
57 !matches!(self, Self::File(_))
58 }
59
60 pub fn config_name(&self) -> &str {
62 let name = match self {
63 Self::File(_) => self.provider_name(),
65 Self::S3(s3) => &s3.name,
66 Self::Oss(oss) => &oss.name,
67 Self::Azblob(az) => &az.name,
68 Self::Gcs(gcs) => &gcs.name,
69 };
70
71 if name.trim().is_empty() {
72 return self.provider_name();
73 }
74
75 name
76 }
77
78 pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
80 match self {
81 Self::File(_) => None,
82 Self::S3(s3) => Some(&s3.cache),
83 Self::Oss(oss) => Some(&oss.cache),
84 Self::Azblob(az) => Some(&az.cache),
85 Self::Gcs(gcs) => Some(&gcs.cache),
86 }
87 }
88
89 pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
91 match self {
92 Self::File(_) => None,
93 Self::S3(s3) => Some(&mut s3.cache),
94 Self::Oss(oss) => Some(&mut oss.cache),
95 Self::Azblob(az) => Some(&mut az.cache),
96 Self::Gcs(gcs) => Some(&mut gcs.cache),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
102#[serde(default)]
103pub struct FileConfig {}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
106#[serde(default)]
107pub struct S3Connection {
108 pub bucket: String,
109 pub root: String,
110 #[serde(skip_serializing)]
111 pub access_key_id: SecretString,
112 #[serde(skip_serializing)]
113 pub secret_access_key: SecretString,
114 pub endpoint: Option<String>,
115 pub region: Option<String>,
116 pub enable_virtual_host_style: bool,
120 pub disable_ec2_metadata: bool,
125}
126
127impl From<&S3Connection> for S3 {
128 fn from(connection: &S3Connection) -> Self {
129 let root = util::normalize_dir(&connection.root);
130
131 let mut builder = S3::default()
132 .root(&root)
133 .bucket(&connection.bucket)
134 .access_key_id(connection.access_key_id.expose_secret())
135 .secret_access_key(connection.secret_access_key.expose_secret());
136
137 if connection.disable_ec2_metadata {
138 builder = builder.disable_ec2_metadata();
139 }
140
141 if let Some(endpoint) = &connection.endpoint {
142 builder = builder.endpoint(endpoint);
143 }
144 if let Some(region) = &connection.region {
145 builder = builder.region(region);
146 }
147 if connection.enable_virtual_host_style {
148 builder = builder.enable_virtual_host_style();
149 }
150
151 builder
152 }
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct S3Config {
158 pub name: String,
159 #[serde(flatten)]
160 pub connection: S3Connection,
161 #[serde(flatten)]
162 pub cache: ObjectStorageCacheConfig,
163 pub http_client: HttpClientConfig,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
167#[serde(default)]
168pub struct OssConnection {
169 pub bucket: String,
170 pub root: String,
171 #[serde(skip_serializing)]
172 pub access_key_id: SecretString,
173 #[serde(skip_serializing)]
174 pub access_key_secret: SecretString,
175 pub endpoint: String,
176}
177
178impl From<&OssConnection> for Oss {
179 fn from(connection: &OssConnection) -> Self {
180 let root = util::normalize_dir(&connection.root);
181 Oss::default()
182 .root(&root)
183 .bucket(&connection.bucket)
184 .endpoint(&connection.endpoint)
185 .access_key_id(connection.access_key_id.expose_secret())
186 .access_key_secret(connection.access_key_secret.expose_secret())
187 }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
191#[serde(default)]
192pub struct OssConfig {
193 pub name: String,
194 #[serde(flatten)]
195 pub connection: OssConnection,
196 #[serde(flatten)]
197 pub cache: ObjectStorageCacheConfig,
198 pub http_client: HttpClientConfig,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
202#[serde(default)]
203pub struct AzblobConnection {
204 pub container: String,
205 pub root: String,
206 #[serde(skip_serializing)]
207 pub account_name: SecretString,
208 #[serde(skip_serializing)]
209 pub account_key: SecretString,
210 pub endpoint: String,
211 pub sas_token: Option<String>,
212}
213
214impl From<&AzblobConnection> for Azblob {
215 fn from(connection: &AzblobConnection) -> Self {
216 let root = util::normalize_dir(&connection.root);
217 let mut builder = Azblob::default()
218 .root(&root)
219 .container(&connection.container)
220 .endpoint(&connection.endpoint)
221 .account_name(connection.account_name.expose_secret())
222 .account_key(connection.account_key.expose_secret());
223
224 if let Some(token) = &connection.sas_token {
225 builder = builder.sas_token(token);
226 };
227
228 builder
229 }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
233#[serde(default)]
234pub struct AzblobConfig {
235 pub name: String,
236 #[serde(flatten)]
237 pub connection: AzblobConnection,
238 #[serde(flatten)]
239 pub cache: ObjectStorageCacheConfig,
240 pub http_client: HttpClientConfig,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
244#[serde(default)]
245pub struct GcsConnection {
246 pub root: String,
247 pub bucket: String,
248 pub scope: String,
249 #[serde(skip_serializing)]
250 pub credential_path: SecretString,
251 #[serde(skip_serializing)]
252 pub credential: SecretString,
253 pub endpoint: String,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
257#[serde(default)]
258pub struct GcsConfig {
259 pub name: String,
260 #[serde(flatten)]
261 pub connection: GcsConnection,
262 #[serde(flatten)]
263 pub cache: ObjectStorageCacheConfig,
264 pub http_client: HttpClientConfig,
265}
266
267impl From<&GcsConnection> for Gcs {
268 fn from(connection: &GcsConnection) -> Self {
269 let root = util::normalize_dir(&connection.root);
270 Gcs::default()
271 .root(&root)
272 .bucket(&connection.bucket)
273 .scope(&connection.scope)
274 .credential_path(connection.credential_path.expose_secret())
275 .credential(connection.credential.expose_secret())
276 .endpoint(&connection.endpoint)
277 }
278}
279#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
281#[serde(default)]
282pub struct HttpClientConfig {
283 pub(crate) pool_max_idle_per_host: u32,
285
286 #[serde(with = "humantime_serde")]
288 pub(crate) connect_timeout: Duration,
289
290 #[serde(with = "humantime_serde")]
293 pub(crate) timeout: Duration,
294
295 #[serde(with = "humantime_serde")]
297 pub(crate) pool_idle_timeout: Duration,
298
299 pub skip_ssl_validation: bool,
301}
302
303impl Default for HttpClientConfig {
304 fn default() -> Self {
305 Self {
306 pool_max_idle_per_host: 1024,
307 connect_timeout: Duration::from_secs(30),
308 timeout: Duration::from_secs(30),
309 pool_idle_timeout: Duration::from_secs(90),
310 skip_ssl_validation: false,
311 }
312 }
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
316#[serde(default)]
317pub struct ObjectStorageCacheConfig {
318 pub enable_read_cache: bool,
320 pub cache_path: String,
322 pub cache_capacity: ReadableSize,
324}
325
326impl Default for ObjectStorageCacheConfig {
327 fn default() -> Self {
328 Self {
329 enable_read_cache: true,
330 cache_path: String::default(),
332 cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
333 }
334 }
335}
336
337impl ObjectStorageCacheConfig {
338 pub fn sanitize(&mut self, data_home: &str) {
340 if self.cache_path.is_empty() {
342 self.cache_path = data_home.to_string();
343 }
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350 use crate::config::ObjectStoreConfig;
351
352 #[test]
353 fn test_config_name() {
354 let object_store_config = ObjectStoreConfig::default();
355 assert_eq!("File", object_store_config.config_name());
356
357 let s3_config = ObjectStoreConfig::S3(S3Config::default());
358 assert_eq!("S3", s3_config.config_name());
359 assert_eq!("S3", s3_config.provider_name());
360
361 let s3_config = ObjectStoreConfig::S3(S3Config {
362 name: "test".to_string(),
363 ..Default::default()
364 });
365 assert_eq!("test", s3_config.config_name());
366 assert_eq!("S3", s3_config.provider_name());
367 }
368
369 #[test]
370 fn test_is_object_storage() {
371 let store = ObjectStoreConfig::default();
372 assert!(!store.is_object_storage());
373 let s3_config = ObjectStoreConfig::S3(S3Config::default());
374 assert!(s3_config.is_object_storage());
375 let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
376 assert!(oss_config.is_object_storage());
377 let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
378 assert!(gcs_config.is_object_storage());
379 let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
380 assert!(azblob_config.is_object_storage());
381 }
382}