datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use core::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_base::secrets::{ExposeSecret, SecretString};
21use common_config::Configurable;
22pub use common_procedure::options::ProcedureConfig;
23use common_telemetry::logging::{LoggingOptions, TracingOptions};
24use common_wal::config::DatanodeWalConfig;
25use file_engine::config::EngineConfig as FileEngineConfig;
26use meta_client::MetaClientOptions;
27use metric_engine::config::EngineConfig as MetricEngineConfig;
28use mito2::config::MitoConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::export_metrics::ExportMetricsOption;
32use servers::grpc::GrpcOptions;
33use servers::heartbeat_options::HeartbeatOptions;
34use servers::http::HttpOptions;
35
36pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
37
38/// Default data home in file storage
39const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
40
41/// Object storage config
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43#[serde(tag = "type")]
44pub enum ObjectStoreConfig {
45    File(FileConfig),
46    S3(S3Config),
47    Oss(OssConfig),
48    Azblob(AzblobConfig),
49    Gcs(GcsConfig),
50}
51
52impl ObjectStoreConfig {
53    /// Returns the object storage type name, such as `S3`, `Oss` etc.
54    pub fn provider_name(&self) -> &'static str {
55        match self {
56            Self::File(_) => "File",
57            Self::S3(_) => "S3",
58            Self::Oss(_) => "Oss",
59            Self::Azblob(_) => "Azblob",
60            Self::Gcs(_) => "Gcs",
61        }
62    }
63
64    /// Returns true when it's a remote object storage such as AWS s3 etc.
65    pub fn is_object_storage(&self) -> bool {
66        !matches!(self, Self::File(_))
67    }
68
69    /// Returns the object storage configuration name, return the provider name if it's empty.
70    pub fn config_name(&self) -> &str {
71        let name = match self {
72            // file storage doesn't support name
73            Self::File(_) => self.provider_name(),
74            Self::S3(s3) => &s3.name,
75            Self::Oss(oss) => &oss.name,
76            Self::Azblob(az) => &az.name,
77            Self::Gcs(gcs) => &gcs.name,
78        };
79
80        if name.trim().is_empty() {
81            return self.provider_name();
82        }
83
84        name
85    }
86}
87
88/// Storage engine config
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
90#[serde(default)]
91pub struct StorageConfig {
92    /// The working directory of database
93    pub data_home: String,
94    #[serde(flatten)]
95    pub store: ObjectStoreConfig,
96    /// Object storage providers
97    pub providers: Vec<ObjectStoreConfig>,
98}
99
100impl StorageConfig {
101    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
102    pub fn is_object_storage(&self) -> bool {
103        self.store.is_object_storage()
104    }
105}
106
107impl Default for StorageConfig {
108    fn default() -> Self {
109        Self {
110            data_home: DEFAULT_DATA_HOME.to_string(),
111            store: ObjectStoreConfig::default(),
112            providers: vec![],
113        }
114    }
115}
116
117#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
118#[serde(default)]
119pub struct FileConfig {}
120
121#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
122#[serde(default)]
123pub struct ObjectStorageCacheConfig {
124    /// The local file cache directory
125    pub cache_path: Option<String>,
126    /// The cache capacity in bytes
127    pub cache_capacity: Option<ReadableSize>,
128}
129
130/// The http client options to the storage.
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
132#[serde(default)]
133pub struct HttpClientConfig {
134    /// The maximum idle connection per host allowed in the pool.
135    pub(crate) pool_max_idle_per_host: u32,
136
137    /// The timeout for only the connect phase of a http client.
138    #[serde(with = "humantime_serde")]
139    pub(crate) connect_timeout: Duration,
140
141    /// The total request timeout, applied from when the request starts connecting until the response body has finished.
142    /// Also considered a total deadline.
143    #[serde(with = "humantime_serde")]
144    pub(crate) timeout: Duration,
145
146    /// The timeout for idle sockets being kept-alive.
147    #[serde(with = "humantime_serde")]
148    pub(crate) pool_idle_timeout: Duration,
149}
150
151impl Default for HttpClientConfig {
152    fn default() -> Self {
153        Self {
154            pool_max_idle_per_host: 1024,
155            connect_timeout: Duration::from_secs(30),
156            timeout: Duration::from_secs(30),
157            pool_idle_timeout: Duration::from_secs(90),
158        }
159    }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(default)]
164pub struct S3Config {
165    pub name: String,
166    pub bucket: String,
167    pub root: String,
168    #[serde(skip_serializing)]
169    pub access_key_id: SecretString,
170    #[serde(skip_serializing)]
171    pub secret_access_key: SecretString,
172    pub endpoint: Option<String>,
173    pub region: Option<String>,
174    /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
175    /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
176    /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
177    pub enable_virtual_host_style: bool,
178    #[serde(flatten)]
179    pub cache: ObjectStorageCacheConfig,
180    pub http_client: HttpClientConfig,
181}
182
183impl PartialEq for S3Config {
184    fn eq(&self, other: &Self) -> bool {
185        self.name == other.name
186            && self.bucket == other.bucket
187            && self.root == other.root
188            && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
189            && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
190            && self.endpoint == other.endpoint
191            && self.region == other.region
192            && self.enable_virtual_host_style == other.enable_virtual_host_style
193            && self.cache == other.cache
194            && self.http_client == other.http_client
195    }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(default)]
200pub struct OssConfig {
201    pub name: String,
202    pub bucket: String,
203    pub root: String,
204    #[serde(skip_serializing)]
205    pub access_key_id: SecretString,
206    #[serde(skip_serializing)]
207    pub access_key_secret: SecretString,
208    pub endpoint: String,
209    #[serde(flatten)]
210    pub cache: ObjectStorageCacheConfig,
211    pub http_client: HttpClientConfig,
212}
213
214impl PartialEq for OssConfig {
215    fn eq(&self, other: &Self) -> bool {
216        self.name == other.name
217            && self.bucket == other.bucket
218            && self.root == other.root
219            && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
220            && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
221            && self.endpoint == other.endpoint
222            && self.cache == other.cache
223            && self.http_client == other.http_client
224    }
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228#[serde(default)]
229pub struct AzblobConfig {
230    pub name: String,
231    pub container: String,
232    pub root: String,
233    #[serde(skip_serializing)]
234    pub account_name: SecretString,
235    #[serde(skip_serializing)]
236    pub account_key: SecretString,
237    pub endpoint: String,
238    pub sas_token: Option<String>,
239    #[serde(flatten)]
240    pub cache: ObjectStorageCacheConfig,
241    pub http_client: HttpClientConfig,
242}
243
244impl PartialEq for AzblobConfig {
245    fn eq(&self, other: &Self) -> bool {
246        self.name == other.name
247            && self.container == other.container
248            && self.root == other.root
249            && self.account_name.expose_secret() == other.account_name.expose_secret()
250            && self.account_key.expose_secret() == other.account_key.expose_secret()
251            && self.endpoint == other.endpoint
252            && self.sas_token == other.sas_token
253            && self.cache == other.cache
254            && self.http_client == other.http_client
255    }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259#[serde(default)]
260pub struct GcsConfig {
261    pub name: String,
262    pub root: String,
263    pub bucket: String,
264    pub scope: String,
265    #[serde(skip_serializing)]
266    pub credential_path: SecretString,
267    #[serde(skip_serializing)]
268    pub credential: SecretString,
269    pub endpoint: String,
270    #[serde(flatten)]
271    pub cache: ObjectStorageCacheConfig,
272    pub http_client: HttpClientConfig,
273}
274
275impl PartialEq for GcsConfig {
276    fn eq(&self, other: &Self) -> bool {
277        self.name == other.name
278            && self.root == other.root
279            && self.bucket == other.bucket
280            && self.scope == other.scope
281            && self.credential_path.expose_secret() == other.credential_path.expose_secret()
282            && self.credential.expose_secret() == other.credential.expose_secret()
283            && self.endpoint == other.endpoint
284            && self.cache == other.cache
285            && self.http_client == other.http_client
286    }
287}
288
289impl Default for S3Config {
290    fn default() -> Self {
291        Self {
292            name: String::default(),
293            bucket: String::default(),
294            root: String::default(),
295            access_key_id: SecretString::from(String::default()),
296            secret_access_key: SecretString::from(String::default()),
297            enable_virtual_host_style: false,
298            endpoint: Option::default(),
299            region: Option::default(),
300            cache: ObjectStorageCacheConfig::default(),
301            http_client: HttpClientConfig::default(),
302        }
303    }
304}
305
306impl Default for OssConfig {
307    fn default() -> Self {
308        Self {
309            name: String::default(),
310            bucket: String::default(),
311            root: String::default(),
312            access_key_id: SecretString::from(String::default()),
313            access_key_secret: SecretString::from(String::default()),
314            endpoint: String::default(),
315            cache: ObjectStorageCacheConfig::default(),
316            http_client: HttpClientConfig::default(),
317        }
318    }
319}
320
321impl Default for AzblobConfig {
322    fn default() -> Self {
323        Self {
324            name: String::default(),
325            container: String::default(),
326            root: String::default(),
327            account_name: SecretString::from(String::default()),
328            account_key: SecretString::from(String::default()),
329            endpoint: String::default(),
330            sas_token: Option::default(),
331            cache: ObjectStorageCacheConfig::default(),
332            http_client: HttpClientConfig::default(),
333        }
334    }
335}
336
337impl Default for GcsConfig {
338    fn default() -> Self {
339        Self {
340            name: String::default(),
341            root: String::default(),
342            bucket: String::default(),
343            scope: String::default(),
344            credential_path: SecretString::from(String::default()),
345            credential: SecretString::from(String::default()),
346            endpoint: String::default(),
347            cache: ObjectStorageCacheConfig::default(),
348            http_client: HttpClientConfig::default(),
349        }
350    }
351}
352
353impl Default for ObjectStoreConfig {
354    fn default() -> Self {
355        ObjectStoreConfig::File(FileConfig {})
356    }
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
360#[serde(default)]
361pub struct DatanodeOptions {
362    pub node_id: Option<u64>,
363    pub require_lease_before_startup: bool,
364    pub init_regions_in_background: bool,
365    pub init_regions_parallelism: usize,
366    pub grpc: GrpcOptions,
367    pub heartbeat: HeartbeatOptions,
368    pub http: HttpOptions,
369    pub meta_client: Option<MetaClientOptions>,
370    pub wal: DatanodeWalConfig,
371    pub storage: StorageConfig,
372    pub max_concurrent_queries: usize,
373    /// Options for different store engines.
374    pub region_engine: Vec<RegionEngineConfig>,
375    pub logging: LoggingOptions,
376    pub enable_telemetry: bool,
377    pub export_metrics: ExportMetricsOption,
378    pub tracing: TracingOptions,
379    pub query: QueryOptions,
380
381    /// Deprecated options, please use the new options instead.
382    #[deprecated(note = "Please use `grpc.addr` instead.")]
383    pub rpc_addr: Option<String>,
384    #[deprecated(note = "Please use `grpc.hostname` instead.")]
385    pub rpc_hostname: Option<String>,
386    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
387    pub rpc_runtime_size: Option<usize>,
388    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
389    pub rpc_max_recv_message_size: Option<ReadableSize>,
390    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
391    pub rpc_max_send_message_size: Option<ReadableSize>,
392}
393
394impl Default for DatanodeOptions {
395    #[allow(deprecated)]
396    fn default() -> Self {
397        Self {
398            node_id: None,
399            require_lease_before_startup: false,
400            init_regions_in_background: false,
401            init_regions_parallelism: 16,
402            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
403            http: HttpOptions::default(),
404            meta_client: None,
405            wal: DatanodeWalConfig::default(),
406            storage: StorageConfig::default(),
407            max_concurrent_queries: 0,
408            region_engine: vec![
409                RegionEngineConfig::Mito(MitoConfig::default()),
410                RegionEngineConfig::File(FileEngineConfig::default()),
411            ],
412            logging: LoggingOptions::default(),
413            heartbeat: HeartbeatOptions::datanode_default(),
414            enable_telemetry: true,
415            export_metrics: ExportMetricsOption::default(),
416            tracing: TracingOptions::default(),
417            query: QueryOptions::default(),
418
419            // Deprecated options
420            rpc_addr: None,
421            rpc_hostname: None,
422            rpc_runtime_size: None,
423            rpc_max_recv_message_size: None,
424            rpc_max_send_message_size: None,
425        }
426    }
427}
428
429impl Configurable for DatanodeOptions {
430    fn env_list_keys() -> Option<&'static [&'static str]> {
431        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
432    }
433}
434
435#[allow(clippy::large_enum_variant)]
436#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
437pub enum RegionEngineConfig {
438    #[serde(rename = "mito")]
439    Mito(MitoConfig),
440    #[serde(rename = "file")]
441    File(FileEngineConfig),
442    #[serde(rename = "metric")]
443    Metric(MetricEngineConfig),
444}
445
446#[cfg(test)]
447mod tests {
448    use common_base::secrets::ExposeSecret;
449
450    use super::*;
451
452    #[test]
453    fn test_toml() {
454        let opts = DatanodeOptions::default();
455        let toml_string = toml::to_string(&opts).unwrap();
456        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
457    }
458
459    #[test]
460    fn test_config_name() {
461        let object_store_config = ObjectStoreConfig::default();
462        assert_eq!("File", object_store_config.config_name());
463
464        let s3_config = ObjectStoreConfig::S3(S3Config::default());
465        assert_eq!("S3", s3_config.config_name());
466        assert_eq!("S3", s3_config.provider_name());
467
468        let s3_config = ObjectStoreConfig::S3(S3Config {
469            name: "test".to_string(),
470            ..Default::default()
471        });
472        assert_eq!("test", s3_config.config_name());
473        assert_eq!("S3", s3_config.provider_name());
474    }
475
476    #[test]
477    fn test_is_object_storage() {
478        let store = ObjectStoreConfig::default();
479        assert!(!store.is_object_storage());
480        let s3_config = ObjectStoreConfig::S3(S3Config::default());
481        assert!(s3_config.is_object_storage());
482        let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
483        assert!(oss_config.is_object_storage());
484        let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
485        assert!(gcs_config.is_object_storage());
486        let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
487        assert!(azblob_config.is_object_storage());
488    }
489
490    #[test]
491    fn test_secstr() {
492        let toml_str = r#"
493            [storage]
494            type = "S3"
495            access_key_id = "access_key_id"
496            secret_access_key = "secret_access_key"
497        "#;
498        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
499        match &opts.storage.store {
500            ObjectStoreConfig::S3(cfg) => {
501                assert_eq!(
502                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
503                    format!("{:?}", cfg.access_key_id)
504                );
505                assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
506            }
507            _ => unreachable!(),
508        }
509    }
510}