datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19pub use common_procedure::options::ProcedureConfig;
20use common_telemetry::logging::{LoggingOptions, TracingOptions};
21use common_wal::config::DatanodeWalConfig;
22use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
23use file_engine::config::EngineConfig as FileEngineConfig;
24use meta_client::MetaClientOptions;
25use metric_engine::config::EngineConfig as MetricEngineConfig;
26use mito2::config::MitoConfig;
27pub(crate) use object_store::config::ObjectStoreConfig;
28use query::options::QueryOptions;
29use serde::{Deserialize, Serialize};
30use servers::export_metrics::ExportMetricsOption;
31use servers::grpc::GrpcOptions;
32use servers::heartbeat_options::HeartbeatOptions;
33use servers::http::HttpOptions;
34
35pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
36
37/// Storage engine config
38#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39#[serde(default)]
40pub struct StorageConfig {
41    /// The working directory of database
42    pub data_home: String,
43    #[serde(flatten)]
44    pub store: ObjectStoreConfig,
45    /// Object storage providers
46    pub providers: Vec<ObjectStoreConfig>,
47}
48
49impl StorageConfig {
50    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
51    pub fn is_object_storage(&self) -> bool {
52        self.store.is_object_storage()
53    }
54}
55
56impl Default for StorageConfig {
57    fn default() -> Self {
58        Self {
59            data_home: DEFAULT_DATA_HOME.to_string(),
60            store: ObjectStoreConfig::default(),
61            providers: vec![],
62        }
63    }
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
67#[serde(default)]
68pub struct DatanodeOptions {
69    pub node_id: Option<u64>,
70    pub workload_types: Vec<DatanodeWorkloadType>,
71    pub require_lease_before_startup: bool,
72    pub init_regions_in_background: bool,
73    pub init_regions_parallelism: usize,
74    pub grpc: GrpcOptions,
75    pub heartbeat: HeartbeatOptions,
76    pub http: HttpOptions,
77    pub meta_client: Option<MetaClientOptions>,
78    pub wal: DatanodeWalConfig,
79    pub storage: StorageConfig,
80    pub max_concurrent_queries: usize,
81    /// Options for different store engines.
82    pub region_engine: Vec<RegionEngineConfig>,
83    pub logging: LoggingOptions,
84    pub enable_telemetry: bool,
85    pub export_metrics: ExportMetricsOption,
86    pub tracing: TracingOptions,
87    pub query: QueryOptions,
88
89    /// Deprecated options, please use the new options instead.
90    #[deprecated(note = "Please use `grpc.addr` instead.")]
91    pub rpc_addr: Option<String>,
92    #[deprecated(note = "Please use `grpc.hostname` instead.")]
93    pub rpc_hostname: Option<String>,
94    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
95    pub rpc_runtime_size: Option<usize>,
96    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
97    pub rpc_max_recv_message_size: Option<ReadableSize>,
98    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
99    pub rpc_max_send_message_size: Option<ReadableSize>,
100}
101
102impl DatanodeOptions {
103    /// Sanitize the `DatanodeOptions` to ensure the config is valid.
104    pub fn sanitize(&mut self) {
105        sanitize_workload_types(&mut self.workload_types);
106    }
107}
108
109impl Default for DatanodeOptions {
110    #[allow(deprecated)]
111    fn default() -> Self {
112        Self {
113            node_id: None,
114            workload_types: vec![DatanodeWorkloadType::Hybrid],
115            require_lease_before_startup: false,
116            init_regions_in_background: false,
117            init_regions_parallelism: 16,
118            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
119            http: HttpOptions::default(),
120            meta_client: None,
121            wal: DatanodeWalConfig::default(),
122            storage: StorageConfig::default(),
123            max_concurrent_queries: 0,
124            region_engine: vec![
125                RegionEngineConfig::Mito(MitoConfig::default()),
126                RegionEngineConfig::File(FileEngineConfig::default()),
127            ],
128            logging: LoggingOptions::default(),
129            heartbeat: HeartbeatOptions::datanode_default(),
130            enable_telemetry: true,
131            export_metrics: ExportMetricsOption::default(),
132            tracing: TracingOptions::default(),
133            query: QueryOptions::default(),
134
135            // Deprecated options
136            rpc_addr: None,
137            rpc_hostname: None,
138            rpc_runtime_size: None,
139            rpc_max_recv_message_size: None,
140            rpc_max_send_message_size: None,
141        }
142    }
143}
144
145impl Configurable for DatanodeOptions {
146    fn env_list_keys() -> Option<&'static [&'static str]> {
147        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
148    }
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
153pub enum RegionEngineConfig {
154    #[serde(rename = "mito")]
155    Mito(MitoConfig),
156    #[serde(rename = "file")]
157    File(FileEngineConfig),
158    #[serde(rename = "metric")]
159    Metric(MetricEngineConfig),
160}
161
162#[cfg(test)]
163mod tests {
164    use common_base::secrets::ExposeSecret;
165
166    use super::*;
167
168    #[test]
169    fn test_toml() {
170        let opts = DatanodeOptions::default();
171        let toml_string = toml::to_string(&opts).unwrap();
172        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
173    }
174
175    #[test]
176    fn test_secstr() {
177        let toml_str = r#"
178            [storage]
179            type = "S3"
180            access_key_id = "access_key_id"
181            secret_access_key = "secret_access_key"
182        "#;
183        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
184        match &opts.storage.store {
185            ObjectStoreConfig::S3(cfg) => {
186                assert_eq!(
187                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
188                    format!("{:?}", cfg.access_key_id)
189                );
190                assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
191            }
192            _ => unreachable!(),
193        }
194    }
195    #[test]
196    fn test_skip_ssl_validation_config() {
197        // Test with skip_ssl_validation = true
198        let toml_str_true = r#"
199            [storage]
200            type = "S3"
201            [storage.http_client]
202            skip_ssl_validation = true
203        "#;
204        let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
205        match &opts.storage.store {
206            ObjectStoreConfig::S3(cfg) => {
207                assert!(cfg.http_client.skip_ssl_validation);
208            }
209            _ => panic!("Expected S3 config"),
210        }
211
212        // Test with skip_ssl_validation = false
213        let toml_str_false = r#"
214            [storage]
215            type = "S3"
216            [storage.http_client]
217            skip_ssl_validation = false
218        "#;
219        let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
220        match &opts.storage.store {
221            ObjectStoreConfig::S3(cfg) => {
222                assert!(!cfg.http_client.skip_ssl_validation);
223            }
224            _ => panic!("Expected S3 config"),
225        }
226        // Test default value (should be false)
227        let toml_str_default = r#"
228            [storage]
229            type = "S3"
230        "#;
231        let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
232        match &opts.storage.store {
233            ObjectStoreConfig::S3(cfg) => {
234                assert!(!cfg.http_client.skip_ssl_validation);
235            }
236            _ => panic!("Expected S3 config"),
237        }
238    }
239}