1use std::time::Duration;
16
17use common_base::readable_size::ReadableSize;
18use common_base::secrets::{ExposeSecret, SecretString};
19use opendal::services::{Azblob, Gcs, Oss, S3};
20use serde::{Deserialize, Serialize};
21
22use crate::util;
23
24const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
25
26#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28#[serde(tag = "type")]
29pub enum ObjectStoreConfig {
30 File(FileConfig),
31 S3(S3Config),
32 Oss(OssConfig),
33 Azblob(AzblobConfig),
34 Gcs(GcsConfig),
35}
36
37impl Default for ObjectStoreConfig {
38 fn default() -> Self {
39 ObjectStoreConfig::File(FileConfig {})
40 }
41}
42
43impl ObjectStoreConfig {
44 pub fn provider_name(&self) -> &'static str {
46 match self {
47 Self::File(_) => "File",
48 Self::S3(_) => "S3",
49 Self::Oss(_) => "Oss",
50 Self::Azblob(_) => "Azblob",
51 Self::Gcs(_) => "Gcs",
52 }
53 }
54
55 pub fn is_object_storage(&self) -> bool {
57 !matches!(self, Self::File(_))
58 }
59
60 pub fn config_name(&self) -> &str {
62 let name = match self {
63 Self::File(_) => self.provider_name(),
65 Self::S3(s3) => &s3.name,
66 Self::Oss(oss) => &oss.name,
67 Self::Azblob(az) => &az.name,
68 Self::Gcs(gcs) => &gcs.name,
69 };
70
71 if name.trim().is_empty() {
72 return self.provider_name();
73 }
74
75 name
76 }
77
78 pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
80 match self {
81 Self::File(_) => None,
82 Self::S3(s3) => Some(&s3.cache),
83 Self::Oss(oss) => Some(&oss.cache),
84 Self::Azblob(az) => Some(&az.cache),
85 Self::Gcs(gcs) => Some(&gcs.cache),
86 }
87 }
88
89 pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
91 match self {
92 Self::File(_) => None,
93 Self::S3(s3) => Some(&mut s3.cache),
94 Self::Oss(oss) => Some(&mut oss.cache),
95 Self::Azblob(az) => Some(&mut az.cache),
96 Self::Gcs(gcs) => Some(&mut gcs.cache),
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
102#[serde(default)]
103pub struct FileConfig {}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
106#[serde(default)]
107pub struct S3Connection {
108 pub bucket: String,
109 pub root: String,
110 #[serde(skip_serializing)]
111 pub access_key_id: SecretString,
112 #[serde(skip_serializing)]
113 pub secret_access_key: SecretString,
114 pub endpoint: Option<String>,
115 pub region: Option<String>,
116 pub enable_virtual_host_style: bool,
120}
121
122impl From<&S3Connection> for S3 {
123 fn from(connection: &S3Connection) -> Self {
124 let root = util::normalize_dir(&connection.root);
125
126 let mut builder = S3::default()
127 .root(&root)
128 .bucket(&connection.bucket)
129 .access_key_id(connection.access_key_id.expose_secret())
130 .secret_access_key(connection.secret_access_key.expose_secret());
131
132 if let Some(endpoint) = &connection.endpoint {
133 builder = builder.endpoint(endpoint);
134 }
135 if let Some(region) = &connection.region {
136 builder = builder.region(region);
137 }
138 if connection.enable_virtual_host_style {
139 builder = builder.enable_virtual_host_style();
140 }
141
142 builder
143 }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
147#[serde(default)]
148pub struct S3Config {
149 pub name: String,
150 #[serde(flatten)]
151 pub connection: S3Connection,
152 #[serde(flatten)]
153 pub cache: ObjectStorageCacheConfig,
154 pub http_client: HttpClientConfig,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
158#[serde(default)]
159pub struct OssConnection {
160 pub bucket: String,
161 pub root: String,
162 #[serde(skip_serializing)]
163 pub access_key_id: SecretString,
164 #[serde(skip_serializing)]
165 pub access_key_secret: SecretString,
166 pub endpoint: String,
167}
168
169impl From<&OssConnection> for Oss {
170 fn from(connection: &OssConnection) -> Self {
171 let root = util::normalize_dir(&connection.root);
172 Oss::default()
173 .root(&root)
174 .bucket(&connection.bucket)
175 .endpoint(&connection.endpoint)
176 .access_key_id(connection.access_key_id.expose_secret())
177 .access_key_secret(connection.access_key_secret.expose_secret())
178 }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
182#[serde(default)]
183pub struct OssConfig {
184 pub name: String,
185 #[serde(flatten)]
186 pub connection: OssConnection,
187 #[serde(flatten)]
188 pub cache: ObjectStorageCacheConfig,
189 pub http_client: HttpClientConfig,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
193#[serde(default)]
194pub struct AzblobConnection {
195 pub container: String,
196 pub root: String,
197 #[serde(skip_serializing)]
198 pub account_name: SecretString,
199 #[serde(skip_serializing)]
200 pub account_key: SecretString,
201 pub endpoint: String,
202 pub sas_token: Option<String>,
203}
204
205impl From<&AzblobConnection> for Azblob {
206 fn from(connection: &AzblobConnection) -> Self {
207 let root = util::normalize_dir(&connection.root);
208 let mut builder = Azblob::default()
209 .root(&root)
210 .container(&connection.container)
211 .endpoint(&connection.endpoint)
212 .account_name(connection.account_name.expose_secret())
213 .account_key(connection.account_key.expose_secret());
214
215 if let Some(token) = &connection.sas_token {
216 builder = builder.sas_token(token);
217 };
218
219 builder
220 }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
224#[serde(default)]
225pub struct AzblobConfig {
226 pub name: String,
227 #[serde(flatten)]
228 pub connection: AzblobConnection,
229 #[serde(flatten)]
230 pub cache: ObjectStorageCacheConfig,
231 pub http_client: HttpClientConfig,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
235#[serde(default)]
236pub struct GcsConnection {
237 pub root: String,
238 pub bucket: String,
239 pub scope: String,
240 #[serde(skip_serializing)]
241 pub credential_path: SecretString,
242 #[serde(skip_serializing)]
243 pub credential: SecretString,
244 pub endpoint: String,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
248#[serde(default)]
249pub struct GcsConfig {
250 pub name: String,
251 #[serde(flatten)]
252 pub connection: GcsConnection,
253 #[serde(flatten)]
254 pub cache: ObjectStorageCacheConfig,
255 pub http_client: HttpClientConfig,
256}
257
258impl From<&GcsConnection> for Gcs {
259 fn from(connection: &GcsConnection) -> Self {
260 let root = util::normalize_dir(&connection.root);
261 Gcs::default()
262 .root(&root)
263 .bucket(&connection.bucket)
264 .scope(&connection.scope)
265 .credential_path(connection.credential_path.expose_secret())
266 .credential(connection.credential.expose_secret())
267 .endpoint(&connection.endpoint)
268 }
269}
270#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272#[serde(default)]
273pub struct HttpClientConfig {
274 pub(crate) pool_max_idle_per_host: u32,
276
277 #[serde(with = "humantime_serde")]
279 pub(crate) connect_timeout: Duration,
280
281 #[serde(with = "humantime_serde")]
284 pub(crate) timeout: Duration,
285
286 #[serde(with = "humantime_serde")]
288 pub(crate) pool_idle_timeout: Duration,
289
290 pub skip_ssl_validation: bool,
292}
293
294impl Default for HttpClientConfig {
295 fn default() -> Self {
296 Self {
297 pool_max_idle_per_host: 1024,
298 connect_timeout: Duration::from_secs(30),
299 timeout: Duration::from_secs(30),
300 pool_idle_timeout: Duration::from_secs(90),
301 skip_ssl_validation: false,
302 }
303 }
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
307#[serde(default)]
308pub struct ObjectStorageCacheConfig {
309 pub enable_read_cache: bool,
311 pub cache_path: String,
313 pub cache_capacity: ReadableSize,
315}
316
317impl Default for ObjectStorageCacheConfig {
318 fn default() -> Self {
319 Self {
320 enable_read_cache: true,
321 cache_path: String::default(),
323 cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
324 }
325 }
326}
327
328impl ObjectStorageCacheConfig {
329 pub fn sanitize(&mut self, data_home: &str) {
331 if self.cache_path.is_empty() {
333 self.cache_path = data_home.to_string();
334 }
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use super::*;
341 use crate::config::ObjectStoreConfig;
342
343 #[test]
344 fn test_config_name() {
345 let object_store_config = ObjectStoreConfig::default();
346 assert_eq!("File", object_store_config.config_name());
347
348 let s3_config = ObjectStoreConfig::S3(S3Config::default());
349 assert_eq!("S3", s3_config.config_name());
350 assert_eq!("S3", s3_config.provider_name());
351
352 let s3_config = ObjectStoreConfig::S3(S3Config {
353 name: "test".to_string(),
354 ..Default::default()
355 });
356 assert_eq!("test", s3_config.config_name());
357 assert_eq!("S3", s3_config.provider_name());
358 }
359
360 #[test]
361 fn test_is_object_storage() {
362 let store = ObjectStoreConfig::default();
363 assert!(!store.is_object_storage());
364 let s3_config = ObjectStoreConfig::S3(S3Config::default());
365 assert!(s3_config.is_object_storage());
366 let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
367 assert!(oss_config.is_object_storage());
368 let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
369 assert!(gcs_config.is_object_storage());
370 let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
371 assert!(azblob_config.is_object_storage());
372 }
373}