object_store/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_base::readable_size::ReadableSize;
18use common_base::secrets::{ExposeSecret, SecretString};
19use opendal::services::{Azblob, Gcs, Oss, S3};
20use serde::{Deserialize, Serialize};
21
22use crate::util;
23
24const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
25
26/// Object storage config
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28#[serde(tag = "type")]
29pub enum ObjectStoreConfig {
30    File(FileConfig),
31    S3(S3Config),
32    Oss(OssConfig),
33    Azblob(AzblobConfig),
34    Gcs(GcsConfig),
35}
36
37impl Default for ObjectStoreConfig {
38    fn default() -> Self {
39        ObjectStoreConfig::File(FileConfig {})
40    }
41}
42
43impl ObjectStoreConfig {
44    /// Returns the object storage type name, such as `S3`, `Oss` etc.
45    pub fn provider_name(&self) -> &'static str {
46        match self {
47            Self::File(_) => "File",
48            Self::S3(_) => "S3",
49            Self::Oss(_) => "Oss",
50            Self::Azblob(_) => "Azblob",
51            Self::Gcs(_) => "Gcs",
52        }
53    }
54
55    /// Returns true when it's a remote object storage such as AWS s3 etc.
56    pub fn is_object_storage(&self) -> bool {
57        !matches!(self, Self::File(_))
58    }
59
60    /// Returns the object storage configuration name, return the provider name if it's empty.
61    pub fn config_name(&self) -> &str {
62        let name = match self {
63            // file storage doesn't support name
64            Self::File(_) => self.provider_name(),
65            Self::S3(s3) => &s3.name,
66            Self::Oss(oss) => &oss.name,
67            Self::Azblob(az) => &az.name,
68            Self::Gcs(gcs) => &gcs.name,
69        };
70
71        if name.trim().is_empty() {
72            return self.provider_name();
73        }
74
75        name
76    }
77
78    /// Returns the object storage cache configuration.
79    pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
80        match self {
81            Self::File(_) => None,
82            Self::S3(s3) => Some(&s3.cache),
83            Self::Oss(oss) => Some(&oss.cache),
84            Self::Azblob(az) => Some(&az.cache),
85            Self::Gcs(gcs) => Some(&gcs.cache),
86        }
87    }
88
89    /// Returns the mutable object storage cache configuration.
90    pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
91        match self {
92            Self::File(_) => None,
93            Self::S3(s3) => Some(&mut s3.cache),
94            Self::Oss(oss) => Some(&mut oss.cache),
95            Self::Azblob(az) => Some(&mut az.cache),
96            Self::Gcs(gcs) => Some(&mut gcs.cache),
97        }
98    }
99}
100
101#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
102#[serde(default)]
103pub struct FileConfig {}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
106#[serde(default)]
107pub struct S3Connection {
108    pub bucket: String,
109    pub root: String,
110    #[serde(skip_serializing)]
111    pub access_key_id: SecretString,
112    #[serde(skip_serializing)]
113    pub secret_access_key: SecretString,
114    pub endpoint: Option<String>,
115    pub region: Option<String>,
116    /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
117    /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
118    /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
119    pub enable_virtual_host_style: bool,
120}
121
122impl From<&S3Connection> for S3 {
123    fn from(connection: &S3Connection) -> Self {
124        let root = util::normalize_dir(&connection.root);
125
126        let mut builder = S3::default()
127            .root(&root)
128            .bucket(&connection.bucket)
129            .access_key_id(connection.access_key_id.expose_secret())
130            .secret_access_key(connection.secret_access_key.expose_secret());
131
132        if let Some(endpoint) = &connection.endpoint {
133            builder = builder.endpoint(endpoint);
134        }
135        if let Some(region) = &connection.region {
136            builder = builder.region(region);
137        }
138        if connection.enable_virtual_host_style {
139            builder = builder.enable_virtual_host_style();
140        }
141
142        builder
143    }
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
147#[serde(default)]
148pub struct S3Config {
149    pub name: String,
150    #[serde(flatten)]
151    pub connection: S3Connection,
152    #[serde(flatten)]
153    pub cache: ObjectStorageCacheConfig,
154    pub http_client: HttpClientConfig,
155}
156
157#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
158#[serde(default)]
159pub struct OssConnection {
160    pub bucket: String,
161    pub root: String,
162    #[serde(skip_serializing)]
163    pub access_key_id: SecretString,
164    #[serde(skip_serializing)]
165    pub access_key_secret: SecretString,
166    pub endpoint: String,
167}
168
169impl From<&OssConnection> for Oss {
170    fn from(connection: &OssConnection) -> Self {
171        let root = util::normalize_dir(&connection.root);
172        Oss::default()
173            .root(&root)
174            .bucket(&connection.bucket)
175            .endpoint(&connection.endpoint)
176            .access_key_id(connection.access_key_id.expose_secret())
177            .access_key_secret(connection.access_key_secret.expose_secret())
178    }
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
182#[serde(default)]
183pub struct OssConfig {
184    pub name: String,
185    #[serde(flatten)]
186    pub connection: OssConnection,
187    #[serde(flatten)]
188    pub cache: ObjectStorageCacheConfig,
189    pub http_client: HttpClientConfig,
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
193#[serde(default)]
194pub struct AzblobConnection {
195    pub container: String,
196    pub root: String,
197    #[serde(skip_serializing)]
198    pub account_name: SecretString,
199    #[serde(skip_serializing)]
200    pub account_key: SecretString,
201    pub endpoint: String,
202    pub sas_token: Option<String>,
203}
204
205impl From<&AzblobConnection> for Azblob {
206    fn from(connection: &AzblobConnection) -> Self {
207        let root = util::normalize_dir(&connection.root);
208        let mut builder = Azblob::default()
209            .root(&root)
210            .container(&connection.container)
211            .endpoint(&connection.endpoint)
212            .account_name(connection.account_name.expose_secret())
213            .account_key(connection.account_key.expose_secret());
214
215        if let Some(token) = &connection.sas_token {
216            builder = builder.sas_token(token);
217        };
218
219        builder
220    }
221}
222
223#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
224#[serde(default)]
225pub struct AzblobConfig {
226    pub name: String,
227    #[serde(flatten)]
228    pub connection: AzblobConnection,
229    #[serde(flatten)]
230    pub cache: ObjectStorageCacheConfig,
231    pub http_client: HttpClientConfig,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
235#[serde(default)]
236pub struct GcsConnection {
237    pub root: String,
238    pub bucket: String,
239    pub scope: String,
240    #[serde(skip_serializing)]
241    pub credential_path: SecretString,
242    #[serde(skip_serializing)]
243    pub credential: SecretString,
244    pub endpoint: String,
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
248#[serde(default)]
249pub struct GcsConfig {
250    pub name: String,
251    #[serde(flatten)]
252    pub connection: GcsConnection,
253    #[serde(flatten)]
254    pub cache: ObjectStorageCacheConfig,
255    pub http_client: HttpClientConfig,
256}
257
258impl From<&GcsConnection> for Gcs {
259    fn from(connection: &GcsConnection) -> Self {
260        let root = util::normalize_dir(&connection.root);
261        Gcs::default()
262            .root(&root)
263            .bucket(&connection.bucket)
264            .scope(&connection.scope)
265            .credential_path(connection.credential_path.expose_secret())
266            .credential(connection.credential.expose_secret())
267            .endpoint(&connection.endpoint)
268    }
269}
270/// The http client options to the storage.
271#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
272#[serde(default)]
273pub struct HttpClientConfig {
274    /// The maximum idle connection per host allowed in the pool.
275    pub(crate) pool_max_idle_per_host: u32,
276
277    /// The timeout for only the connect phase of a http client.
278    #[serde(with = "humantime_serde")]
279    pub(crate) connect_timeout: Duration,
280
281    /// The total request timeout, applied from when the request starts connecting until the response body has finished.
282    /// Also considered a total deadline.
283    #[serde(with = "humantime_serde")]
284    pub(crate) timeout: Duration,
285
286    /// The timeout for idle sockets being kept-alive.
287    #[serde(with = "humantime_serde")]
288    pub(crate) pool_idle_timeout: Duration,
289
290    /// Skip SSL certificate validation (insecure)
291    pub skip_ssl_validation: bool,
292}
293
294impl Default for HttpClientConfig {
295    fn default() -> Self {
296        Self {
297            pool_max_idle_per_host: 1024,
298            connect_timeout: Duration::from_secs(30),
299            timeout: Duration::from_secs(30),
300            pool_idle_timeout: Duration::from_secs(90),
301            skip_ssl_validation: false,
302        }
303    }
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
307#[serde(default)]
308pub struct ObjectStorageCacheConfig {
309    /// Whether to enable read cache. If not set, the read cache will be enabled by default.
310    pub enable_read_cache: bool,
311    /// The local file cache directory
312    pub cache_path: String,
313    /// The cache capacity in bytes
314    pub cache_capacity: ReadableSize,
315}
316
317impl Default for ObjectStorageCacheConfig {
318    fn default() -> Self {
319        Self {
320            enable_read_cache: true,
321            // The cache directory is set to the value of data_home in the build_cache_layer process.
322            cache_path: String::default(),
323            cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
324        }
325    }
326}
327
328impl ObjectStorageCacheConfig {
329    /// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
330    pub fn sanitize(&mut self, data_home: &str) {
331        // If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
332        if self.cache_path.is_empty() {
333            self.cache_path = data_home.to_string();
334        }
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341    use crate::config::ObjectStoreConfig;
342
343    #[test]
344    fn test_config_name() {
345        let object_store_config = ObjectStoreConfig::default();
346        assert_eq!("File", object_store_config.config_name());
347
348        let s3_config = ObjectStoreConfig::S3(S3Config::default());
349        assert_eq!("S3", s3_config.config_name());
350        assert_eq!("S3", s3_config.provider_name());
351
352        let s3_config = ObjectStoreConfig::S3(S3Config {
353            name: "test".to_string(),
354            ..Default::default()
355        });
356        assert_eq!("test", s3_config.config_name());
357        assert_eq!("S3", s3_config.provider_name());
358    }
359
360    #[test]
361    fn test_is_object_storage() {
362        let store = ObjectStoreConfig::default();
363        assert!(!store.is_object_storage());
364        let s3_config = ObjectStoreConfig::S3(S3Config::default());
365        assert!(s3_config.is_object_storage());
366        let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
367        assert!(oss_config.is_object_storage());
368        let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
369        assert!(gcs_config.is_object_storage());
370        let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
371        assert!(azblob_config.is_object_storage());
372    }
373}