datanode/
config.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode configurations
16
17use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::grpc::GrpcOptions;
32use servers::heartbeat_options::HeartbeatOptions;
33use servers::http::HttpOptions;
34
35/// Storage engine config
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37#[serde(default)]
38pub struct StorageConfig {
39    /// The working directory of database
40    pub data_home: String,
41    #[serde(flatten)]
42    pub store: ObjectStoreConfig,
43    /// Object storage providers
44    pub providers: Vec<ObjectStoreConfig>,
45}
46
47impl StorageConfig {
48    /// Returns true when the default storage config is a remote object storage service such as AWS S3, etc.
49    pub fn is_object_storage(&self) -> bool {
50        self.store.is_object_storage()
51    }
52}
53
54impl Default for StorageConfig {
55    fn default() -> Self {
56        Self {
57            data_home: DEFAULT_DATA_HOME.to_string(),
58            store: ObjectStoreConfig::default(),
59            providers: vec![],
60        }
61    }
62}
63
64#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
65#[serde(default)]
66pub struct DatanodeOptions {
67    pub node_id: Option<u64>,
68    pub default_column_prefix: Option<String>,
69    pub workload_types: Vec<DatanodeWorkloadType>,
70    pub require_lease_before_startup: bool,
71    pub init_regions_in_background: bool,
72    pub init_regions_parallelism: usize,
73    pub grpc: GrpcOptions,
74    pub heartbeat: HeartbeatOptions,
75    pub http: HttpOptions,
76    pub meta_client: Option<MetaClientOptions>,
77    pub wal: DatanodeWalConfig,
78    pub storage: StorageConfig,
79    pub max_concurrent_queries: usize,
80    /// Options for different store engines.
81    pub region_engine: Vec<RegionEngineConfig>,
82    pub logging: LoggingOptions,
83    pub enable_telemetry: bool,
84    pub tracing: TracingOptions,
85    pub query: QueryOptions,
86    pub memory: MemoryOptions,
87
88    /// Deprecated options, please use the new options instead.
89    #[deprecated(note = "Please use `grpc.addr` instead.")]
90    pub rpc_addr: Option<String>,
91    #[deprecated(note = "Please use `grpc.hostname` instead.")]
92    pub rpc_hostname: Option<String>,
93    #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
94    pub rpc_runtime_size: Option<usize>,
95    #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
96    pub rpc_max_recv_message_size: Option<ReadableSize>,
97    #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
98    pub rpc_max_send_message_size: Option<ReadableSize>,
99}
100
101impl DatanodeOptions {
102    /// Sanitize the `DatanodeOptions` to ensure the config is valid.
103    pub fn sanitize(&mut self) {
104        sanitize_workload_types(&mut self.workload_types);
105
106        if self.storage.is_object_storage() {
107            self.storage
108                .store
109                .cache_config_mut()
110                .unwrap()
111                .sanitize(&self.storage.data_home);
112        }
113    }
114}
115
116impl Default for DatanodeOptions {
117    #[allow(deprecated)]
118    fn default() -> Self {
119        Self {
120            node_id: None,
121            default_column_prefix: None,
122            workload_types: vec![DatanodeWorkloadType::Hybrid],
123            require_lease_before_startup: false,
124            init_regions_in_background: false,
125            init_regions_parallelism: 16,
126            grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
127            http: HttpOptions::default(),
128            meta_client: None,
129            wal: DatanodeWalConfig::default(),
130            storage: StorageConfig::default(),
131            max_concurrent_queries: 0,
132            region_engine: vec![
133                RegionEngineConfig::Mito(MitoConfig::default()),
134                RegionEngineConfig::File(FileEngineConfig::default()),
135            ],
136            logging: LoggingOptions::default(),
137            heartbeat: HeartbeatOptions::datanode_default(),
138            enable_telemetry: true,
139            tracing: TracingOptions::default(),
140            query: QueryOptions::default(),
141            memory: MemoryOptions::default(),
142
143            // Deprecated options
144            rpc_addr: None,
145            rpc_hostname: None,
146            rpc_runtime_size: None,
147            rpc_max_recv_message_size: None,
148            rpc_max_send_message_size: None,
149        }
150    }
151}
152
153impl Configurable for DatanodeOptions {
154    fn env_list_keys() -> Option<&'static [&'static str]> {
155        Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
156    }
157}
158
159#[allow(clippy::large_enum_variant)]
160#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
161pub enum RegionEngineConfig {
162    #[serde(rename = "mito")]
163    Mito(MitoConfig),
164    #[serde(rename = "file")]
165    File(FileEngineConfig),
166    #[serde(rename = "metric")]
167    Metric(MetricEngineConfig),
168}
169
170#[cfg(test)]
171mod tests {
172    use common_base::secrets::ExposeSecret;
173
174    use super::*;
175
176    #[test]
177    fn test_toml() {
178        let opts = DatanodeOptions::default();
179        let toml_string = toml::to_string(&opts).unwrap();
180        let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
181    }
182
183    #[test]
184    fn test_secstr() {
185        let toml_str = r#"
186            [storage]
187            type = "S3"
188            access_key_id = "access_key_id"
189            secret_access_key = "secret_access_key"
190        "#;
191        let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
192        match &opts.storage.store {
193            ObjectStoreConfig::S3(cfg) => {
194                assert_eq!(
195                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
196                    format!("{:?}", cfg.connection.access_key_id)
197                );
198                assert_eq!(
199                    "access_key_id",
200                    cfg.connection.access_key_id.expose_secret()
201                );
202            }
203            _ => unreachable!(),
204        }
205    }
206    #[test]
207    fn test_skip_ssl_validation_config() {
208        // Test with skip_ssl_validation = true
209        let toml_str_true = r#"
210            [storage]
211            type = "S3"
212            [storage.http_client]
213            skip_ssl_validation = true
214        "#;
215        let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
216        match &opts.storage.store {
217            ObjectStoreConfig::S3(cfg) => {
218                assert!(cfg.http_client.skip_ssl_validation);
219            }
220            _ => panic!("Expected S3 config"),
221        }
222
223        // Test with skip_ssl_validation = false
224        let toml_str_false = r#"
225            [storage]
226            type = "S3"
227            [storage.http_client]
228            skip_ssl_validation = false
229        "#;
230        let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
231        match &opts.storage.store {
232            ObjectStoreConfig::S3(cfg) => {
233                assert!(!cfg.http_client.skip_ssl_validation);
234            }
235            _ => panic!("Expected S3 config"),
236        }
237        // Test default value (should be false)
238        let toml_str_default = r#"
239            [storage]
240            type = "S3"
241        "#;
242        let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
243        match &opts.storage.store {
244            ObjectStoreConfig::S3(cfg) => {
245                assert!(!cfg.http_client.skip_ssl_validation);
246            }
247            _ => panic!("Expected S3 config"),
248        }
249    }
250
251    #[test]
252    fn test_cache_config() {
253        let toml_str = r#"
254            [storage]
255            data_home = "test_data_home"
256            type = "S3"
257            [storage.cache_config]
258            enable_read_cache = true
259        "#;
260        let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
261        opts.sanitize();
262        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
263        assert_eq!(
264            opts.storage.store.cache_config().unwrap().cache_path,
265            "test_data_home"
266        );
267    }
268}