datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::grpc::GrpcOptions;
32use servers::http::HttpOptions;
33
34/// Storage engine config
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(default)]
37pub struct StorageConfig {
38    /// The working directory of database
39    pub data_home: String,
40    #[serde(flatten)]
41    pub store: ObjectStoreConfig,
42    /// Object storage providers
43    pub providers: Vec<ObjectStoreConfig>,
44}
45
46impl StorageConfig {
47    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
48    pub fn is_object_storage(&self) -> bool {
49        self.store.is_object_storage()
50    }
51}
52
53impl Default for StorageConfig {
54    fn default() -> Self {
55        Self {
56            data_home: DEFAULT_DATA_HOME.to_string(),
57            store: ObjectStoreConfig::default(),
58            providers: vec![],
59        }
60    }
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
64#[serde(default)]
65pub struct DatanodeOptions {
66    pub node_id: Option<u64>,
67    pub default_column_prefix: Option<String>,
68    pub workload_types: Vec<DatanodeWorkloadType>,
69    pub require_lease_before_startup: bool,
70    pub init_regions_in_background: bool,
71    pub init_regions_parallelism: usize,
72    pub grpc: GrpcOptions,
73    pub http: HttpOptions,
74    pub meta_client: Option<MetaClientOptions>,
75    pub wal: DatanodeWalConfig,
76    pub storage: StorageConfig,
77    pub max_concurrent_queries: usize,
78    /// Options for different store engines.
79    pub region_engine: Vec<RegionEngineConfig>,
80    pub logging: LoggingOptions,
81    pub enable_telemetry: bool,
82    pub tracing: TracingOptions,
83    pub query: QueryOptions,
84    pub memory: MemoryOptions,
85
86    /// Deprecated options, please use the new options instead.
87    #[deprecated(note = "Please use `grpc.addr` instead.")]
88    pub rpc_addr: Option<String>,
89    #[deprecated(note = "Please use `grpc.hostname` instead.")]
90    pub rpc_hostname: Option<String>,
91    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
92    pub rpc_runtime_size: Option<usize>,
93    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
94    pub rpc_max_recv_message_size: Option<ReadableSize>,
95    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
96    pub rpc_max_send_message_size: Option<ReadableSize>,
97}
98
99impl DatanodeOptions {
100    /// Sanitize the `DatanodeOptions` to ensure the config is valid.
101    pub fn sanitize(&mut self) {
102        sanitize_workload_types(&mut self.workload_types);
103
104        if self.storage.is_object_storage() {
105            self.storage
106                .store
107                .cache_config_mut()
108                .unwrap()
109                .sanitize(&self.storage.data_home);
110        }
111    }
112}
113
114impl Default for DatanodeOptions {
115    #[allow(deprecated)]
116    fn default() -> Self {
117        Self {
118            node_id: None,
119            default_column_prefix: None,
120            workload_types: vec![DatanodeWorkloadType::Hybrid],
121            require_lease_before_startup: false,
122            init_regions_in_background: false,
123            init_regions_parallelism: 16,
124            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
125            http: HttpOptions::default(),
126            meta_client: None,
127            wal: DatanodeWalConfig::default(),
128            storage: StorageConfig::default(),
129            max_concurrent_queries: 0,
130            region_engine: vec![
131                RegionEngineConfig::Mito(MitoConfig::default()),
132                RegionEngineConfig::File(FileEngineConfig::default()),
133            ],
134            logging: LoggingOptions::default(),
135            enable_telemetry: true,
136            tracing: TracingOptions::default(),
137            query: QueryOptions::default(),
138            memory: MemoryOptions::default(),
139
140            // Deprecated options
141            rpc_addr: None,
142            rpc_hostname: None,
143            rpc_runtime_size: None,
144            rpc_max_recv_message_size: None,
145            rpc_max_send_message_size: None,
146        }
147    }
148}
149
150impl Configurable for DatanodeOptions {
151    fn env_list_keys() -> Option<&'static [&'static str]> {
152        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
153    }
154}
155
156#[allow(clippy::large_enum_variant)]
157#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
158pub enum RegionEngineConfig {
159    #[serde(rename = "mito")]
160    Mito(MitoConfig),
161    #[serde(rename = "file")]
162    File(FileEngineConfig),
163    #[serde(rename = "metric")]
164    Metric(MetricEngineConfig),
165}
166
167#[cfg(test)]
168mod tests {
169    use common_base::secrets::ExposeSecret;
170
171    use super::*;
172
173    #[test]
174    fn test_toml() {
175        let opts = DatanodeOptions::default();
176        let toml_string = toml::to_string(&opts).unwrap();
177        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
178    }
179
180    #[test]
181    fn test_secstr() {
182        let toml_str = r#"
183            [storage]
184            type = "S3"
185            access_key_id = "access_key_id"
186            secret_access_key = "secret_access_key"
187        "#;
188        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
189        match &opts.storage.store {
190            ObjectStoreConfig::S3(cfg) => {
191                assert_eq!(
192                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
193                    format!("{:?}", cfg.connection.access_key_id)
194                );
195                assert_eq!(
196                    "access_key_id",
197                    cfg.connection.access_key_id.expose_secret()
198                );
199            }
200            _ => unreachable!(),
201        }
202    }
203    #[test]
204    fn test_skip_ssl_validation_config() {
205        // Test with skip_ssl_validation = true
206        let toml_str_true = r#"
207            [storage]
208            type = "S3"
209            [storage.http_client]
210            skip_ssl_validation = true
211        "#;
212        let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
213        match &opts.storage.store {
214            ObjectStoreConfig::S3(cfg) => {
215                assert!(cfg.http_client.skip_ssl_validation);
216            }
217            _ => panic!("Expected S3 config"),
218        }
219
220        // Test with skip_ssl_validation = false
221        let toml_str_false = r#"
222            [storage]
223            type = "S3"
224            [storage.http_client]
225            skip_ssl_validation = false
226        "#;
227        let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
228        match &opts.storage.store {
229            ObjectStoreConfig::S3(cfg) => {
230                assert!(!cfg.http_client.skip_ssl_validation);
231            }
232            _ => panic!("Expected S3 config"),
233        }
234        // Test default value (should be false)
235        let toml_str_default = r#"
236            [storage]
237            type = "S3"
238        "#;
239        let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
240        match &opts.storage.store {
241            ObjectStoreConfig::S3(cfg) => {
242                assert!(!cfg.http_client.skip_ssl_validation);
243            }
244            _ => panic!("Expected S3 config"),
245        }
246    }
247
248    #[test]
249    fn test_cache_config() {
250        let toml_str = r#"
251            [storage]
252            data_home = "test_data_home"
253            type = "S3"
254            [storage.cache_config]
255            enable_read_cache = true
256        "#;
257        let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
258        opts.sanitize();
259        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
260        assert_eq!(
261            opts.storage.store.cache_config().unwrap().cache_path,
262            "test_data_home"
263        );
264    }
265}