1use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::grpc::GrpcOptions;
32use servers::http::HttpOptions;
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
36#[serde(default)]
37pub struct StorageConfig {
38 pub data_home: String,
40 #[serde(flatten)]
41 pub store: ObjectStoreConfig,
42 pub providers: Vec<ObjectStoreConfig>,
44}
45
46impl StorageConfig {
47 pub fn is_object_storage(&self) -> bool {
49 self.store.is_object_storage()
50 }
51}
52
53impl Default for StorageConfig {
54 fn default() -> Self {
55 Self {
56 data_home: DEFAULT_DATA_HOME.to_string(),
57 store: ObjectStoreConfig::default(),
58 providers: vec![],
59 }
60 }
61}
62
63#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
64#[serde(default)]
65pub struct DatanodeOptions {
66 pub node_id: Option<u64>,
67 pub default_column_prefix: Option<String>,
68 pub workload_types: Vec<DatanodeWorkloadType>,
69 pub require_lease_before_startup: bool,
70 pub init_regions_in_background: bool,
71 pub init_regions_parallelism: usize,
72 pub grpc: GrpcOptions,
73 pub http: HttpOptions,
74 pub meta_client: Option<MetaClientOptions>,
75 pub wal: DatanodeWalConfig,
76 pub storage: StorageConfig,
77 pub max_concurrent_queries: usize,
78 pub region_engine: Vec<RegionEngineConfig>,
80 pub logging: LoggingOptions,
81 pub enable_telemetry: bool,
82 pub tracing: TracingOptions,
83 pub query: QueryOptions,
84 pub memory: MemoryOptions,
85
86 #[deprecated(note = "Please use `grpc.addr` instead.")]
88 pub rpc_addr: Option<String>,
89 #[deprecated(note = "Please use `grpc.hostname` instead.")]
90 pub rpc_hostname: Option<String>,
91 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
92 pub rpc_runtime_size: Option<usize>,
93 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
94 pub rpc_max_recv_message_size: Option<ReadableSize>,
95 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
96 pub rpc_max_send_message_size: Option<ReadableSize>,
97}
98
99impl DatanodeOptions {
100 pub fn sanitize(&mut self) {
102 sanitize_workload_types(&mut self.workload_types);
103
104 if self.storage.is_object_storage() {
105 self.storage
106 .store
107 .cache_config_mut()
108 .unwrap()
109 .sanitize(&self.storage.data_home);
110 }
111 }
112}
113
114impl Default for DatanodeOptions {
115 #[allow(deprecated)]
116 fn default() -> Self {
117 Self {
118 node_id: None,
119 default_column_prefix: None,
120 workload_types: vec![DatanodeWorkloadType::Hybrid],
121 require_lease_before_startup: false,
122 init_regions_in_background: false,
123 init_regions_parallelism: 16,
124 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
125 http: HttpOptions::default(),
126 meta_client: None,
127 wal: DatanodeWalConfig::default(),
128 storage: StorageConfig::default(),
129 max_concurrent_queries: 0,
130 region_engine: vec![
131 RegionEngineConfig::Mito(MitoConfig::default()),
132 RegionEngineConfig::File(FileEngineConfig::default()),
133 ],
134 logging: LoggingOptions::default(),
135 enable_telemetry: true,
136 tracing: TracingOptions::default(),
137 query: QueryOptions::default(),
138 memory: MemoryOptions::default(),
139
140 rpc_addr: None,
142 rpc_hostname: None,
143 rpc_runtime_size: None,
144 rpc_max_recv_message_size: None,
145 rpc_max_send_message_size: None,
146 }
147 }
148}
149
150impl Configurable for DatanodeOptions {
151 fn env_list_keys() -> Option<&'static [&'static str]> {
152 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
153 }
154}
155
156#[allow(clippy::large_enum_variant)]
157#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
158pub enum RegionEngineConfig {
159 #[serde(rename = "mito")]
160 Mito(MitoConfig),
161 #[serde(rename = "file")]
162 File(FileEngineConfig),
163 #[serde(rename = "metric")]
164 Metric(MetricEngineConfig),
165}
166
167#[cfg(test)]
168mod tests {
169 use common_base::secrets::ExposeSecret;
170
171 use super::*;
172
173 #[test]
174 fn test_toml() {
175 let opts = DatanodeOptions::default();
176 let toml_string = toml::to_string(&opts).unwrap();
177 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
178 }
179
180 #[test]
181 fn test_secstr() {
182 let toml_str = r#"
183 [storage]
184 type = "S3"
185 access_key_id = "access_key_id"
186 secret_access_key = "secret_access_key"
187 "#;
188 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
189 match &opts.storage.store {
190 ObjectStoreConfig::S3(cfg) => {
191 assert_eq!(
192 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
193 format!("{:?}", cfg.connection.access_key_id)
194 );
195 assert_eq!(
196 "access_key_id",
197 cfg.connection.access_key_id.expose_secret()
198 );
199 }
200 _ => unreachable!(),
201 }
202 }
203 #[test]
204 fn test_skip_ssl_validation_config() {
205 let toml_str_true = r#"
207 [storage]
208 type = "S3"
209 [storage.http_client]
210 skip_ssl_validation = true
211 "#;
212 let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
213 match &opts.storage.store {
214 ObjectStoreConfig::S3(cfg) => {
215 assert!(cfg.http_client.skip_ssl_validation);
216 }
217 _ => panic!("Expected S3 config"),
218 }
219
220 let toml_str_false = r#"
222 [storage]
223 type = "S3"
224 [storage.http_client]
225 skip_ssl_validation = false
226 "#;
227 let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
228 match &opts.storage.store {
229 ObjectStoreConfig::S3(cfg) => {
230 assert!(!cfg.http_client.skip_ssl_validation);
231 }
232 _ => panic!("Expected S3 config"),
233 }
234 let toml_str_default = r#"
236 [storage]
237 type = "S3"
238 "#;
239 let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
240 match &opts.storage.store {
241 ObjectStoreConfig::S3(cfg) => {
242 assert!(!cfg.http_client.skip_ssl_validation);
243 }
244 _ => panic!("Expected S3 config"),
245 }
246 }
247
248 #[test]
249 fn test_cache_config() {
250 let toml_str = r#"
251 [storage]
252 data_home = "test_data_home"
253 type = "S3"
254 [storage.cache_config]
255 enable_read_cache = true
256 "#;
257 let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
258 opts.sanitize();
259 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
260 assert_eq!(
261 opts.storage.store.cache_config().unwrap().cache_path,
262 "test_data_home"
263 );
264 }
265}