datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use core::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_base::secrets::{ExposeSecret, SecretString};
21use common_config::Configurable;
22pub use common_procedure::options::ProcedureConfig;
23use common_telemetry::logging::{LoggingOptions, TracingOptions};
24use common_wal::config::DatanodeWalConfig;
25use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
26use file_engine::config::EngineConfig as FileEngineConfig;
27use meta_client::MetaClientOptions;
28use metric_engine::config::EngineConfig as MetricEngineConfig;
29use mito2::config::MitoConfig;
30use query::options::QueryOptions;
31use serde::{Deserialize, Serialize};
32use servers::export_metrics::ExportMetricsOption;
33use servers::grpc::GrpcOptions;
34use servers::heartbeat_options::HeartbeatOptions;
35use servers::http::HttpOptions;
36
37pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
38
39/// Default data home in file storage
40const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
41
42/// Object storage config
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(tag = "type")]
45pub enum ObjectStoreConfig {
46    File(FileConfig),
47    S3(S3Config),
48    Oss(OssConfig),
49    Azblob(AzblobConfig),
50    Gcs(GcsConfig),
51}
52
53impl ObjectStoreConfig {
54    /// Returns the object storage type name, such as `S3`, `Oss` etc.
55    pub fn provider_name(&self) -> &'static str {
56        match self {
57            Self::File(_) => "File",
58            Self::S3(_) => "S3",
59            Self::Oss(_) => "Oss",
60            Self::Azblob(_) => "Azblob",
61            Self::Gcs(_) => "Gcs",
62        }
63    }
64
65    /// Returns true when it's a remote object storage such as AWS s3 etc.
66    pub fn is_object_storage(&self) -> bool {
67        !matches!(self, Self::File(_))
68    }
69
70    /// Returns the object storage configuration name, return the provider name if it's empty.
71    pub fn config_name(&self) -> &str {
72        let name = match self {
73            // file storage doesn't support name
74            Self::File(_) => self.provider_name(),
75            Self::S3(s3) => &s3.name,
76            Self::Oss(oss) => &oss.name,
77            Self::Azblob(az) => &az.name,
78            Self::Gcs(gcs) => &gcs.name,
79        };
80
81        if name.trim().is_empty() {
82            return self.provider_name();
83        }
84
85        name
86    }
87}
88
89/// Storage engine config
90#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
91#[serde(default)]
92pub struct StorageConfig {
93    /// The working directory of database
94    pub data_home: String,
95    #[serde(flatten)]
96    pub store: ObjectStoreConfig,
97    /// Object storage providers
98    pub providers: Vec<ObjectStoreConfig>,
99}
100
101impl StorageConfig {
102    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
103    pub fn is_object_storage(&self) -> bool {
104        self.store.is_object_storage()
105    }
106}
107
108impl Default for StorageConfig {
109    fn default() -> Self {
110        Self {
111            data_home: DEFAULT_DATA_HOME.to_string(),
112            store: ObjectStoreConfig::default(),
113            providers: vec![],
114        }
115    }
116}
117
118#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
119#[serde(default)]
120pub struct FileConfig {}
121
122#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
123#[serde(default)]
124pub struct ObjectStorageCacheConfig {
125    /// The local file cache directory
126    pub cache_path: Option<String>,
127    /// The cache capacity in bytes
128    pub cache_capacity: Option<ReadableSize>,
129}
130
131/// The http client options to the storage.
132#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
133#[serde(default)]
134pub struct HttpClientConfig {
135    /// The maximum idle connection per host allowed in the pool.
136    pub(crate) pool_max_idle_per_host: u32,
137
138    /// The timeout for only the connect phase of a http client.
139    #[serde(with = "humantime_serde")]
140    pub(crate) connect_timeout: Duration,
141
142    /// The total request timeout, applied from when the request starts connecting until the response body has finished.
143    /// Also considered a total deadline.
144    #[serde(with = "humantime_serde")]
145    pub(crate) timeout: Duration,
146
147    /// The timeout for idle sockets being kept-alive.
148    #[serde(with = "humantime_serde")]
149    pub(crate) pool_idle_timeout: Duration,
150}
151
152impl Default for HttpClientConfig {
153    fn default() -> Self {
154        Self {
155            pool_max_idle_per_host: 1024,
156            connect_timeout: Duration::from_secs(30),
157            timeout: Duration::from_secs(30),
158            pool_idle_timeout: Duration::from_secs(90),
159        }
160    }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(default)]
165pub struct S3Config {
166    pub name: String,
167    pub bucket: String,
168    pub root: String,
169    #[serde(skip_serializing)]
170    pub access_key_id: SecretString,
171    #[serde(skip_serializing)]
172    pub secret_access_key: SecretString,
173    pub endpoint: Option<String>,
174    pub region: Option<String>,
175    /// Enable virtual host style so that opendal will send API requests in virtual host style instead of path style.
176    /// By default, opendal will send API to https://s3.us-east-1.amazonaws.com/bucket_name
177    /// Enabled, opendal will send API to https://bucket_name.s3.us-east-1.amazonaws.com
178    pub enable_virtual_host_style: bool,
179    #[serde(flatten)]
180    pub cache: ObjectStorageCacheConfig,
181    pub http_client: HttpClientConfig,
182}
183
184impl PartialEq for S3Config {
185    fn eq(&self, other: &Self) -> bool {
186        self.name == other.name
187            && self.bucket == other.bucket
188            && self.root == other.root
189            && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
190            && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
191            && self.endpoint == other.endpoint
192            && self.region == other.region
193            && self.enable_virtual_host_style == other.enable_virtual_host_style
194            && self.cache == other.cache
195            && self.http_client == other.http_client
196    }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200#[serde(default)]
201pub struct OssConfig {
202    pub name: String,
203    pub bucket: String,
204    pub root: String,
205    #[serde(skip_serializing)]
206    pub access_key_id: SecretString,
207    #[serde(skip_serializing)]
208    pub access_key_secret: SecretString,
209    pub endpoint: String,
210    #[serde(flatten)]
211    pub cache: ObjectStorageCacheConfig,
212    pub http_client: HttpClientConfig,
213}
214
215impl PartialEq for OssConfig {
216    fn eq(&self, other: &Self) -> bool {
217        self.name == other.name
218            && self.bucket == other.bucket
219            && self.root == other.root
220            && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
221            && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
222            && self.endpoint == other.endpoint
223            && self.cache == other.cache
224            && self.http_client == other.http_client
225    }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
229#[serde(default)]
230pub struct AzblobConfig {
231    pub name: String,
232    pub container: String,
233    pub root: String,
234    #[serde(skip_serializing)]
235    pub account_name: SecretString,
236    #[serde(skip_serializing)]
237    pub account_key: SecretString,
238    pub endpoint: String,
239    pub sas_token: Option<String>,
240    #[serde(flatten)]
241    pub cache: ObjectStorageCacheConfig,
242    pub http_client: HttpClientConfig,
243}
244
245impl PartialEq for AzblobConfig {
246    fn eq(&self, other: &Self) -> bool {
247        self.name == other.name
248            && self.container == other.container
249            && self.root == other.root
250            && self.account_name.expose_secret() == other.account_name.expose_secret()
251            && self.account_key.expose_secret() == other.account_key.expose_secret()
252            && self.endpoint == other.endpoint
253            && self.sas_token == other.sas_token
254            && self.cache == other.cache
255            && self.http_client == other.http_client
256    }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(default)]
261pub struct GcsConfig {
262    pub name: String,
263    pub root: String,
264    pub bucket: String,
265    pub scope: String,
266    #[serde(skip_serializing)]
267    pub credential_path: SecretString,
268    #[serde(skip_serializing)]
269    pub credential: SecretString,
270    pub endpoint: String,
271    #[serde(flatten)]
272    pub cache: ObjectStorageCacheConfig,
273    pub http_client: HttpClientConfig,
274}
275
276impl PartialEq for GcsConfig {
277    fn eq(&self, other: &Self) -> bool {
278        self.name == other.name
279            && self.root == other.root
280            && self.bucket == other.bucket
281            && self.scope == other.scope
282            && self.credential_path.expose_secret() == other.credential_path.expose_secret()
283            && self.credential.expose_secret() == other.credential.expose_secret()
284            && self.endpoint == other.endpoint
285            && self.cache == other.cache
286            && self.http_client == other.http_client
287    }
288}
289
290impl Default for S3Config {
291    fn default() -> Self {
292        Self {
293            name: String::default(),
294            bucket: String::default(),
295            root: String::default(),
296            access_key_id: SecretString::from(String::default()),
297            secret_access_key: SecretString::from(String::default()),
298            enable_virtual_host_style: false,
299            endpoint: Option::default(),
300            region: Option::default(),
301            cache: ObjectStorageCacheConfig::default(),
302            http_client: HttpClientConfig::default(),
303        }
304    }
305}
306
307impl Default for OssConfig {
308    fn default() -> Self {
309        Self {
310            name: String::default(),
311            bucket: String::default(),
312            root: String::default(),
313            access_key_id: SecretString::from(String::default()),
314            access_key_secret: SecretString::from(String::default()),
315            endpoint: String::default(),
316            cache: ObjectStorageCacheConfig::default(),
317            http_client: HttpClientConfig::default(),
318        }
319    }
320}
321
322impl Default for AzblobConfig {
323    fn default() -> Self {
324        Self {
325            name: String::default(),
326            container: String::default(),
327            root: String::default(),
328            account_name: SecretString::from(String::default()),
329            account_key: SecretString::from(String::default()),
330            endpoint: String::default(),
331            sas_token: Option::default(),
332            cache: ObjectStorageCacheConfig::default(),
333            http_client: HttpClientConfig::default(),
334        }
335    }
336}
337
338impl Default for GcsConfig {
339    fn default() -> Self {
340        Self {
341            name: String::default(),
342            root: String::default(),
343            bucket: String::default(),
344            scope: String::default(),
345            credential_path: SecretString::from(String::default()),
346            credential: SecretString::from(String::default()),
347            endpoint: String::default(),
348            cache: ObjectStorageCacheConfig::default(),
349            http_client: HttpClientConfig::default(),
350        }
351    }
352}
353
354impl Default for ObjectStoreConfig {
355    fn default() -> Self {
356        ObjectStoreConfig::File(FileConfig {})
357    }
358}
359
360#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
361#[serde(default)]
362pub struct DatanodeOptions {
363    pub node_id: Option<u64>,
364    pub workload_types: Vec<DatanodeWorkloadType>,
365    pub require_lease_before_startup: bool,
366    pub init_regions_in_background: bool,
367    pub init_regions_parallelism: usize,
368    pub grpc: GrpcOptions,
369    pub heartbeat: HeartbeatOptions,
370    pub http: HttpOptions,
371    pub meta_client: Option<MetaClientOptions>,
372    pub wal: DatanodeWalConfig,
373    pub storage: StorageConfig,
374    pub max_concurrent_queries: usize,
375    /// Options for different store engines.
376    pub region_engine: Vec<RegionEngineConfig>,
377    pub logging: LoggingOptions,
378    pub enable_telemetry: bool,
379    pub export_metrics: ExportMetricsOption,
380    pub tracing: TracingOptions,
381    pub query: QueryOptions,
382
383    /// Deprecated options, please use the new options instead.
384    #[deprecated(note = "Please use `grpc.addr` instead.")]
385    pub rpc_addr: Option<String>,
386    #[deprecated(note = "Please use `grpc.hostname` instead.")]
387    pub rpc_hostname: Option<String>,
388    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
389    pub rpc_runtime_size: Option<usize>,
390    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
391    pub rpc_max_recv_message_size: Option<ReadableSize>,
392    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
393    pub rpc_max_send_message_size: Option<ReadableSize>,
394}
395
396impl DatanodeOptions {
397    /// Sanitize the `DatanodeOptions` to ensure the config is valid.
398    pub fn sanitize(&mut self) {
399        sanitize_workload_types(&mut self.workload_types);
400    }
401}
402
403impl Default for DatanodeOptions {
404    #[allow(deprecated)]
405    fn default() -> Self {
406        Self {
407            node_id: None,
408            workload_types: vec![DatanodeWorkloadType::Hybrid],
409            require_lease_before_startup: false,
410            init_regions_in_background: false,
411            init_regions_parallelism: 16,
412            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
413            http: HttpOptions::default(),
414            meta_client: None,
415            wal: DatanodeWalConfig::default(),
416            storage: StorageConfig::default(),
417            max_concurrent_queries: 0,
418            region_engine: vec![
419                RegionEngineConfig::Mito(MitoConfig::default()),
420                RegionEngineConfig::File(FileEngineConfig::default()),
421            ],
422            logging: LoggingOptions::default(),
423            heartbeat: HeartbeatOptions::datanode_default(),
424            enable_telemetry: true,
425            export_metrics: ExportMetricsOption::default(),
426            tracing: TracingOptions::default(),
427            query: QueryOptions::default(),
428
429            // Deprecated options
430            rpc_addr: None,
431            rpc_hostname: None,
432            rpc_runtime_size: None,
433            rpc_max_recv_message_size: None,
434            rpc_max_send_message_size: None,
435        }
436    }
437}
438
439impl Configurable for DatanodeOptions {
440    fn env_list_keys() -> Option<&'static [&'static str]> {
441        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
442    }
443}
444
445#[allow(clippy::large_enum_variant)]
446#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
447pub enum RegionEngineConfig {
448    #[serde(rename = "mito")]
449    Mito(MitoConfig),
450    #[serde(rename = "file")]
451    File(FileEngineConfig),
452    #[serde(rename = "metric")]
453    Metric(MetricEngineConfig),
454}
455
456#[cfg(test)]
457mod tests {
458    use common_base::secrets::ExposeSecret;
459
460    use super::*;
461
462    #[test]
463    fn test_toml() {
464        let opts = DatanodeOptions::default();
465        let toml_string = toml::to_string(&opts).unwrap();
466        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
467    }
468
469    #[test]
470    fn test_config_name() {
471        let object_store_config = ObjectStoreConfig::default();
472        assert_eq!("File", object_store_config.config_name());
473
474        let s3_config = ObjectStoreConfig::S3(S3Config::default());
475        assert_eq!("S3", s3_config.config_name());
476        assert_eq!("S3", s3_config.provider_name());
477
478        let s3_config = ObjectStoreConfig::S3(S3Config {
479            name: "test".to_string(),
480            ..Default::default()
481        });
482        assert_eq!("test", s3_config.config_name());
483        assert_eq!("S3", s3_config.provider_name());
484    }
485
486    #[test]
487    fn test_is_object_storage() {
488        let store = ObjectStoreConfig::default();
489        assert!(!store.is_object_storage());
490        let s3_config = ObjectStoreConfig::S3(S3Config::default());
491        assert!(s3_config.is_object_storage());
492        let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
493        assert!(oss_config.is_object_storage());
494        let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
495        assert!(gcs_config.is_object_storage());
496        let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
497        assert!(azblob_config.is_object_storage());
498    }
499
500    #[test]
501    fn test_secstr() {
502        let toml_str = r#"
503            [storage]
504            type = "S3"
505            access_key_id = "access_key_id"
506            secret_access_key = "secret_access_key"
507        "#;
508        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
509        match &opts.storage.store {
510            ObjectStoreConfig::S3(cfg) => {
511                assert_eq!(
512                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
513                    format!("{:?}", cfg.access_key_id)
514                );
515                assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
516            }
517            _ => unreachable!(),
518        }
519    }
520}