object_store/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use common_base::readable_size::ReadableSize;
18use common_base::secrets::{ExposeSecret, SecretString};
19use opendal::services::{Azblob, Gcs, Oss, S3};
20use serde::{Deserialize, Serialize};
21
22use crate::util;
23
24const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
25
26/// Object storage config
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
28#[serde(tag = "type")]
29pub enum ObjectStoreConfig {
30    File(FileConfig),
31    S3(S3Config),
32    Oss(OssConfig),
33    Azblob(AzblobConfig),
34    Gcs(GcsConfig),
35}
36
37impl Default for ObjectStoreConfig {
38    fn default() -> Self {
39        ObjectStoreConfig::File(FileConfig {})
40    }
41}
42
43impl ObjectStoreConfig {
44    /// Returns the object storage type name, such as `S3`, `Oss` etc.
45    pub fn provider_name(&self) -> &'static str {
46        match self {
47            Self::File(_) => "File",
48            Self::S3(_) => "S3",
49            Self::Oss(_) => "Oss",
50            Self::Azblob(_) => "Azblob",
51            Self::Gcs(_) => "Gcs",
52        }
53    }
54
55    /// Returns true when it's a remote object storage such as AWS s3 etc.
56    pub fn is_object_storage(&self) -> bool {
57        !matches!(self, Self::File(_))
58    }
59
60    /// Returns the object storage configuration name, return the provider name if it's empty.
61    pub fn config_name(&self) -> &str {
62        let name = match self {
63            // file storage doesn't support name
64            Self::File(_) => self.provider_name(),
65            Self::S3(s3) => &s3.name,
66            Self::Oss(oss) => &oss.name,
67            Self::Azblob(az) => &az.name,
68            Self::Gcs(gcs) => &gcs.name,
69        };
70
71        if name.trim().is_empty() {
72            return self.provider_name();
73        }
74
75        name
76    }
77
78    /// Returns the object storage cache configuration.
79    pub fn cache_config(&self) -> Option<&ObjectStorageCacheConfig> {
80        match self {
81            Self::File(_) => None,
82            Self::S3(s3) => Some(&s3.cache),
83            Self::Oss(oss) => Some(&oss.cache),
84            Self::Azblob(az) => Some(&az.cache),
85            Self::Gcs(gcs) => Some(&gcs.cache),
86        }
87    }
88
89    /// Returns the mutable object storage cache configuration.
90    pub fn cache_config_mut(&mut self) -> Option<&mut ObjectStorageCacheConfig> {
91        match self {
92            Self::File(_) => None,
93            Self::S3(s3) => Some(&mut s3.cache),
94            Self::Oss(oss) => Some(&mut oss.cache),
95            Self::Azblob(az) => Some(&mut az.cache),
96            Self::Gcs(gcs) => Some(&mut gcs.cache),
97        }
98    }
99}
100
101#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
102#[serde(default)]
103pub struct FileConfig {}
104
105#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
106#[serde(default)]
107pub struct S3Connection {
108    pub bucket: String,
109    pub root: String,
110    #[serde(skip_serializing)]
111    pub access_key_id: SecretString,
112    #[serde(skip_serializing)]
113    pub secret_access_key: SecretString,
114    pub endpoint: Option<String>,
115    pub region: Option<String>,
116    /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
117    /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
118    /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
119    pub enable_virtual_host_style: bool,
120    /// Disable EC2 metadata service.
121    /// By default, opendal will use EC2 metadata service to load credentials from the instance metadata,
122    /// when access key id and secret access key are not provided.
123    /// If enabled, opendal will *NOT* use EC2 metadata service.
124    pub disable_ec2_metadata: bool,
125}
126
127impl From<&S3Connection> for S3 {
128    fn from(connection: &S3Connection) -> Self {
129        let root = util::normalize_dir(&connection.root);
130
131        let mut builder = S3::default()
132            .root(&root)
133            .bucket(&connection.bucket)
134            .access_key_id(connection.access_key_id.expose_secret())
135            .secret_access_key(connection.secret_access_key.expose_secret());
136
137        if connection.disable_ec2_metadata {
138            builder = builder.disable_ec2_metadata();
139        }
140
141        if let Some(endpoint) = &connection.endpoint {
142            builder = builder.endpoint(endpoint);
143        }
144        if let Some(region) = &connection.region {
145            builder = builder.region(region);
146        }
147        if connection.enable_virtual_host_style {
148            builder = builder.enable_virtual_host_style();
149        }
150
151        builder
152    }
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct S3Config {
158    pub name: String,
159    #[serde(flatten)]
160    pub connection: S3Connection,
161    #[serde(flatten)]
162    pub cache: ObjectStorageCacheConfig,
163    pub http_client: HttpClientConfig,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
167#[serde(default)]
168pub struct OssConnection {
169    pub bucket: String,
170    pub root: String,
171    #[serde(skip_serializing)]
172    pub access_key_id: SecretString,
173    #[serde(skip_serializing)]
174    pub access_key_secret: SecretString,
175    pub endpoint: String,
176}
177
178impl From<&OssConnection> for Oss {
179    fn from(connection: &OssConnection) -> Self {
180        let root = util::normalize_dir(&connection.root);
181        Oss::default()
182            .root(&root)
183            .bucket(&connection.bucket)
184            .endpoint(&connection.endpoint)
185            .access_key_id(connection.access_key_id.expose_secret())
186            .access_key_secret(connection.access_key_secret.expose_secret())
187    }
188}
189
190#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
191#[serde(default)]
192pub struct OssConfig {
193    pub name: String,
194    #[serde(flatten)]
195    pub connection: OssConnection,
196    #[serde(flatten)]
197    pub cache: ObjectStorageCacheConfig,
198    pub http_client: HttpClientConfig,
199}
200
201#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
202#[serde(default)]
203pub struct AzblobConnection {
204    pub container: String,
205    pub root: String,
206    #[serde(skip_serializing)]
207    pub account_name: SecretString,
208    #[serde(skip_serializing)]
209    pub account_key: SecretString,
210    pub endpoint: String,
211    pub sas_token: Option<String>,
212}
213
214impl From<&AzblobConnection> for Azblob {
215    fn from(connection: &AzblobConnection) -> Self {
216        let root = util::normalize_dir(&connection.root);
217        let mut builder = Azblob::default()
218            .root(&root)
219            .container(&connection.container)
220            .endpoint(&connection.endpoint)
221            .account_name(connection.account_name.expose_secret())
222            .account_key(connection.account_key.expose_secret());
223
224        if let Some(token) = &connection.sas_token {
225            builder = builder.sas_token(token);
226        };
227
228        builder
229    }
230}
231
232#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
233#[serde(default)]
234pub struct AzblobConfig {
235    pub name: String,
236    #[serde(flatten)]
237    pub connection: AzblobConnection,
238    #[serde(flatten)]
239    pub cache: ObjectStorageCacheConfig,
240    pub http_client: HttpClientConfig,
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
244#[serde(default)]
245pub struct GcsConnection {
246    pub root: String,
247    pub bucket: String,
248    pub scope: String,
249    #[serde(skip_serializing)]
250    pub credential_path: SecretString,
251    #[serde(skip_serializing)]
252    pub credential: SecretString,
253    pub endpoint: String,
254}
255
256#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
257#[serde(default)]
258pub struct GcsConfig {
259    pub name: String,
260    #[serde(flatten)]
261    pub connection: GcsConnection,
262    #[serde(flatten)]
263    pub cache: ObjectStorageCacheConfig,
264    pub http_client: HttpClientConfig,
265}
266
267impl From<&GcsConnection> for Gcs {
268    fn from(connection: &GcsConnection) -> Self {
269        let root = util::normalize_dir(&connection.root);
270        Gcs::default()
271            .root(&root)
272            .bucket(&connection.bucket)
273            .scope(&connection.scope)
274            .credential_path(connection.credential_path.expose_secret())
275            .credential(connection.credential.expose_secret())
276            .endpoint(&connection.endpoint)
277    }
278}
279/// The http client options to the storage.
280#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
281#[serde(default)]
282pub struct HttpClientConfig {
283    /// The maximum idle connection per host allowed in the pool.
284    pub(crate) pool_max_idle_per_host: u32,
285
286    /// The timeout for only the connect phase of a http client.
287    #[serde(with = "humantime_serde")]
288    pub(crate) connect_timeout: Duration,
289
290    /// The total request timeout, applied from when the request starts connecting until the response body has finished.
291    /// Also considered a total deadline.
292    #[serde(with = "humantime_serde")]
293    pub(crate) timeout: Duration,
294
295    /// The timeout for idle sockets being kept-alive.
296    #[serde(with = "humantime_serde")]
297    pub(crate) pool_idle_timeout: Duration,
298
299    /// Skip SSL certificate validation (insecure)
300    pub skip_ssl_validation: bool,
301}
302
303impl Default for HttpClientConfig {
304    fn default() -> Self {
305        Self {
306            pool_max_idle_per_host: 1024,
307            connect_timeout: Duration::from_secs(30),
308            timeout: Duration::from_secs(30),
309            pool_idle_timeout: Duration::from_secs(90),
310            skip_ssl_validation: false,
311        }
312    }
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
316#[serde(default)]
317pub struct ObjectStorageCacheConfig {
318    /// Whether to enable read cache. If not set, the read cache will be enabled by default.
319    pub enable_read_cache: bool,
320    /// The local file cache directory
321    pub cache_path: String,
322    /// The cache capacity in bytes
323    pub cache_capacity: ReadableSize,
324}
325
326impl Default for ObjectStorageCacheConfig {
327    fn default() -> Self {
328        Self {
329            enable_read_cache: true,
330            // The cache directory is set to the value of data_home in the build_cache_layer process.
331            cache_path: String::default(),
332            cache_capacity: DEFAULT_OBJECT_STORE_CACHE_SIZE,
333        }
334    }
335}
336
337impl ObjectStorageCacheConfig {
338    /// Sanitize the `ObjectStorageCacheConfig` to ensure the config is valid.
339    pub fn sanitize(&mut self, data_home: &str) {
340        // If `cache_path` is unset, default to use `${data_home}` as the local read cache directory.
341        if self.cache_path.is_empty() {
342            self.cache_path = data_home.to_string();
343        }
344    }
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350    use crate::config::ObjectStoreConfig;
351
352    #[test]
353    fn test_config_name() {
354        let object_store_config = ObjectStoreConfig::default();
355        assert_eq!("File", object_store_config.config_name());
356
357        let s3_config = ObjectStoreConfig::S3(S3Config::default());
358        assert_eq!("S3", s3_config.config_name());
359        assert_eq!("S3", s3_config.provider_name());
360
361        let s3_config = ObjectStoreConfig::S3(S3Config {
362            name: "test".to_string(),
363            ..Default::default()
364        });
365        assert_eq!("test", s3_config.config_name());
366        assert_eq!("S3", s3_config.provider_name());
367    }
368
369    #[test]
370    fn test_is_object_storage() {
371        let store = ObjectStoreConfig::default();
372        assert!(!store.is_object_storage());
373        let s3_config = ObjectStoreConfig::S3(S3Config::default());
374        assert!(s3_config.is_object_storage());
375        let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
376        assert!(oss_config.is_object_storage());
377        let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
378        assert!(gcs_config.is_object_storage());
379        let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
380        assert!(azblob_config.is_object_storage());
381    }
382}