1use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::export_metrics::ExportMetricsOption;
32use servers::grpc::GrpcOptions;
33use servers::heartbeat_options::HeartbeatOptions;
34use servers::http::HttpOptions;
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
38#[serde(default)]
39pub struct StorageConfig {
40 pub data_home: String,
42 #[serde(flatten)]
43 pub store: ObjectStoreConfig,
44 pub providers: Vec<ObjectStoreConfig>,
46}
47
48impl StorageConfig {
49 pub fn is_object_storage(&self) -> bool {
51 self.store.is_object_storage()
52 }
53}
54
55impl Default for StorageConfig {
56 fn default() -> Self {
57 Self {
58 data_home: DEFAULT_DATA_HOME.to_string(),
59 store: ObjectStoreConfig::default(),
60 providers: vec![],
61 }
62 }
63}
64
65#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
66#[serde(default)]
67pub struct DatanodeOptions {
68 pub node_id: Option<u64>,
69 pub default_column_prefix: Option<String>,
70 pub workload_types: Vec<DatanodeWorkloadType>,
71 pub require_lease_before_startup: bool,
72 pub init_regions_in_background: bool,
73 pub init_regions_parallelism: usize,
74 pub grpc: GrpcOptions,
75 pub heartbeat: HeartbeatOptions,
76 pub http: HttpOptions,
77 pub meta_client: Option<MetaClientOptions>,
78 pub wal: DatanodeWalConfig,
79 pub storage: StorageConfig,
80 pub max_concurrent_queries: usize,
81 pub region_engine: Vec<RegionEngineConfig>,
83 pub logging: LoggingOptions,
84 pub enable_telemetry: bool,
85 pub export_metrics: ExportMetricsOption,
86 pub tracing: TracingOptions,
87 pub query: QueryOptions,
88 pub memory: MemoryOptions,
89
90 #[deprecated(note = "Please use `grpc.addr` instead.")]
92 pub rpc_addr: Option<String>,
93 #[deprecated(note = "Please use `grpc.hostname` instead.")]
94 pub rpc_hostname: Option<String>,
95 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
96 pub rpc_runtime_size: Option<usize>,
97 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
98 pub rpc_max_recv_message_size: Option<ReadableSize>,
99 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
100 pub rpc_max_send_message_size: Option<ReadableSize>,
101}
102
103impl DatanodeOptions {
104 pub fn sanitize(&mut self) {
106 sanitize_workload_types(&mut self.workload_types);
107
108 if self.storage.is_object_storage() {
109 self.storage
110 .store
111 .cache_config_mut()
112 .unwrap()
113 .sanitize(&self.storage.data_home);
114 }
115 }
116}
117
118impl Default for DatanodeOptions {
119 #[allow(deprecated)]
120 fn default() -> Self {
121 Self {
122 node_id: None,
123 default_column_prefix: None,
124 workload_types: vec![DatanodeWorkloadType::Hybrid],
125 require_lease_before_startup: false,
126 init_regions_in_background: false,
127 init_regions_parallelism: 16,
128 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
129 http: HttpOptions::default(),
130 meta_client: None,
131 wal: DatanodeWalConfig::default(),
132 storage: StorageConfig::default(),
133 max_concurrent_queries: 0,
134 region_engine: vec![
135 RegionEngineConfig::Mito(MitoConfig::default()),
136 RegionEngineConfig::File(FileEngineConfig::default()),
137 ],
138 logging: LoggingOptions::default(),
139 heartbeat: HeartbeatOptions::datanode_default(),
140 enable_telemetry: true,
141 export_metrics: ExportMetricsOption::default(),
142 tracing: TracingOptions::default(),
143 query: QueryOptions::default(),
144 memory: MemoryOptions::default(),
145
146 rpc_addr: None,
148 rpc_hostname: None,
149 rpc_runtime_size: None,
150 rpc_max_recv_message_size: None,
151 rpc_max_send_message_size: None,
152 }
153 }
154}
155
156impl Configurable for DatanodeOptions {
157 fn env_list_keys() -> Option<&'static [&'static str]> {
158 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
159 }
160}
161
162#[allow(clippy::large_enum_variant)]
163#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
164pub enum RegionEngineConfig {
165 #[serde(rename = "mito")]
166 Mito(MitoConfig),
167 #[serde(rename = "file")]
168 File(FileEngineConfig),
169 #[serde(rename = "metric")]
170 Metric(MetricEngineConfig),
171}
172
173#[cfg(test)]
174mod tests {
175 use common_base::secrets::ExposeSecret;
176
177 use super::*;
178
179 #[test]
180 fn test_toml() {
181 let opts = DatanodeOptions::default();
182 let toml_string = toml::to_string(&opts).unwrap();
183 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
184 }
185
186 #[test]
187 fn test_secstr() {
188 let toml_str = r#"
189 [storage]
190 type = "S3"
191 access_key_id = "access_key_id"
192 secret_access_key = "secret_access_key"
193 "#;
194 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
195 match &opts.storage.store {
196 ObjectStoreConfig::S3(cfg) => {
197 assert_eq!(
198 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
199 format!("{:?}", cfg.connection.access_key_id)
200 );
201 assert_eq!(
202 "access_key_id",
203 cfg.connection.access_key_id.expose_secret()
204 );
205 }
206 _ => unreachable!(),
207 }
208 }
209 #[test]
210 fn test_skip_ssl_validation_config() {
211 let toml_str_true = r#"
213 [storage]
214 type = "S3"
215 [storage.http_client]
216 skip_ssl_validation = true
217 "#;
218 let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
219 match &opts.storage.store {
220 ObjectStoreConfig::S3(cfg) => {
221 assert!(cfg.http_client.skip_ssl_validation);
222 }
223 _ => panic!("Expected S3 config"),
224 }
225
226 let toml_str_false = r#"
228 [storage]
229 type = "S3"
230 [storage.http_client]
231 skip_ssl_validation = false
232 "#;
233 let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
234 match &opts.storage.store {
235 ObjectStoreConfig::S3(cfg) => {
236 assert!(!cfg.http_client.skip_ssl_validation);
237 }
238 _ => panic!("Expected S3 config"),
239 }
240 let toml_str_default = r#"
242 [storage]
243 type = "S3"
244 "#;
245 let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
246 match &opts.storage.store {
247 ObjectStoreConfig::S3(cfg) => {
248 assert!(!cfg.http_client.skip_ssl_validation);
249 }
250 _ => panic!("Expected S3 config"),
251 }
252 }
253
254 #[test]
255 fn test_cache_config() {
256 let toml_str = r#"
257 [storage]
258 data_home = "test_data_home"
259 type = "S3"
260 [storage.cache_config]
261 enable_read_cache = true
262 "#;
263 let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
264 opts.sanitize();
265 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
266 assert_eq!(
267 opts.storage.store.cache_config().unwrap().cache_path,
268 "test_data_home"
269 );
270 }
271}