datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::export_metrics::ExportMetricsOption;
32use servers::grpc::GrpcOptions;
33use servers::heartbeat_options::HeartbeatOptions;
34use servers::http::HttpOptions;
35
36/// Storage engine config
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38#[serde(default)]
39pub struct StorageConfig {
40    /// The working directory of database
41    pub data_home: String,
42    #[serde(flatten)]
43    pub store: ObjectStoreConfig,
44    /// Object storage providers
45    pub providers: Vec<ObjectStoreConfig>,
46}
47
48impl StorageConfig {
49    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
50    pub fn is_object_storage(&self) -> bool {
51        self.store.is_object_storage()
52    }
53}
54
55impl Default for StorageConfig {
56    fn default() -> Self {
57        Self {
58            data_home: DEFAULT_DATA_HOME.to_string(),
59            store: ObjectStoreConfig::default(),
60            providers: vec![],
61        }
62    }
63}
64
65#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66#[serde(default)]
67pub struct DatanodeOptions {
68    pub node_id: Option<u64>,
69    pub default_column_prefix: Option<String>,
70    pub workload_types: Vec<DatanodeWorkloadType>,
71    pub require_lease_before_startup: bool,
72    pub init_regions_in_background: bool,
73    pub init_regions_parallelism: usize,
74    pub grpc: GrpcOptions,
75    pub heartbeat: HeartbeatOptions,
76    pub http: HttpOptions,
77    pub meta_client: Option<MetaClientOptions>,
78    pub wal: DatanodeWalConfig,
79    pub storage: StorageConfig,
80    pub max_concurrent_queries: usize,
81    /// Options for different store engines.
82    pub region_engine: Vec<RegionEngineConfig>,
83    pub logging: LoggingOptions,
84    pub enable_telemetry: bool,
85    pub export_metrics: ExportMetricsOption,
86    pub tracing: TracingOptions,
87    pub query: QueryOptions,
88    pub memory: MemoryOptions,
89
90    /// Deprecated options, please use the new options instead.
91    #[deprecated(note = "Please use `grpc.addr` instead.")]
92    pub rpc_addr: Option<String>,
93    #[deprecated(note = "Please use `grpc.hostname` instead.")]
94    pub rpc_hostname: Option<String>,
95    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
96    pub rpc_runtime_size: Option<usize>,
97    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
98    pub rpc_max_recv_message_size: Option<ReadableSize>,
99    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
100    pub rpc_max_send_message_size: Option<ReadableSize>,
101}
102
103impl DatanodeOptions {
104    /// Sanitize the `DatanodeOptions` to ensure the config is valid.
105    pub fn sanitize(&mut self) {
106        sanitize_workload_types(&mut self.workload_types);
107
108        if self.storage.is_object_storage() {
109            self.storage
110                .store
111                .cache_config_mut()
112                .unwrap()
113                .sanitize(&self.storage.data_home);
114        }
115    }
116}
117
118impl Default for DatanodeOptions {
119    #[allow(deprecated)]
120    fn default() -> Self {
121        Self {
122            node_id: None,
123            default_column_prefix: None,
124            workload_types: vec![DatanodeWorkloadType::Hybrid],
125            require_lease_before_startup: false,
126            init_regions_in_background: false,
127            init_regions_parallelism: 16,
128            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
129            http: HttpOptions::default(),
130            meta_client: None,
131            wal: DatanodeWalConfig::default(),
132            storage: StorageConfig::default(),
133            max_concurrent_queries: 0,
134            region_engine: vec![
135                RegionEngineConfig::Mito(MitoConfig::default()),
136                RegionEngineConfig::File(FileEngineConfig::default()),
137            ],
138            logging: LoggingOptions::default(),
139            heartbeat: HeartbeatOptions::datanode_default(),
140            enable_telemetry: true,
141            export_metrics: ExportMetricsOption::default(),
142            tracing: TracingOptions::default(),
143            query: QueryOptions::default(),
144            memory: MemoryOptions::default(),
145
146            // Deprecated options
147            rpc_addr: None,
148            rpc_hostname: None,
149            rpc_runtime_size: None,
150            rpc_max_recv_message_size: None,
151            rpc_max_send_message_size: None,
152        }
153    }
154}
155
156impl Configurable for DatanodeOptions {
157    fn env_list_keys() -> Option<&'static [&'static str]> {
158        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
159    }
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
164pub enum RegionEngineConfig {
165    #[serde(rename = "mito")]
166    Mito(MitoConfig),
167    #[serde(rename = "file")]
168    File(FileEngineConfig),
169    #[serde(rename = "metric")]
170    Metric(MetricEngineConfig),
171}
172
173#[cfg(test)]
174mod tests {
175    use common_base::secrets::ExposeSecret;
176
177    use super::*;
178
179    #[test]
180    fn test_toml() {
181        let opts = DatanodeOptions::default();
182        let toml_string = toml::to_string(&opts).unwrap();
183        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
184    }
185
186    #[test]
187    fn test_secstr() {
188        let toml_str = r#"
189            [storage]
190            type = "S3"
191            access_key_id = "access_key_id"
192            secret_access_key = "secret_access_key"
193        "#;
194        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
195        match &opts.storage.store {
196            ObjectStoreConfig::S3(cfg) => {
197                assert_eq!(
198                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
199                    format!("{:?}", cfg.connection.access_key_id)
200                );
201                assert_eq!(
202                    "access_key_id",
203                    cfg.connection.access_key_id.expose_secret()
204                );
205            }
206            _ => unreachable!(),
207        }
208    }
209    #[test]
210    fn test_skip_ssl_validation_config() {
211        // Test with skip_ssl_validation = true
212        let toml_str_true = r#"
213            [storage]
214            type = "S3"
215            [storage.http_client]
216            skip_ssl_validation = true
217        "#;
218        let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
219        match &opts.storage.store {
220            ObjectStoreConfig::S3(cfg) => {
221                assert!(cfg.http_client.skip_ssl_validation);
222            }
223            _ => panic!("Expected S3 config"),
224        }
225
226        // Test with skip_ssl_validation = false
227        let toml_str_false = r#"
228            [storage]
229            type = "S3"
230            [storage.http_client]
231            skip_ssl_validation = false
232        "#;
233        let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
234        match &opts.storage.store {
235            ObjectStoreConfig::S3(cfg) => {
236                assert!(!cfg.http_client.skip_ssl_validation);
237            }
238            _ => panic!("Expected S3 config"),
239        }
240        // Test default value (should be false)
241        let toml_str_default = r#"
242            [storage]
243            type = "S3"
244        "#;
245        let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
246        match &opts.storage.store {
247            ObjectStoreConfig::S3(cfg) => {
248                assert!(!cfg.http_client.skip_ssl_validation);
249            }
250            _ => panic!("Expected S3 config"),
251        }
252    }
253
254    #[test]
255    fn test_cache_config() {
256        let toml_str = r#"
257            [storage]
258            data_home = "test_data_home"
259            type = "S3"
260            [storage.cache_config]
261            enable_read_cache = true
262        "#;
263        let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
264        opts.sanitize();
265        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
266        assert_eq!(
267            opts.storage.store.cache_config().unwrap().cache_path,
268            "test_data_home"
269        );
270    }
271}