1use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19use common_options::memory::MemoryOptions;
20pub use common_procedure::options::ProcedureConfig;
21use common_telemetry::logging::{LoggingOptions, TracingOptions};
22use common_wal::config::DatanodeWalConfig;
23use common_workload::{DatanodeWorkloadType, sanitize_workload_types};
24use file_engine::config::EngineConfig as FileEngineConfig;
25use meta_client::MetaClientOptions;
26use metric_engine::config::EngineConfig as MetricEngineConfig;
27use mito2::config::MitoConfig;
28pub(crate) use object_store::config::ObjectStoreConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::grpc::GrpcOptions;
32use servers::heartbeat_options::HeartbeatOptions;
33use servers::http::HttpOptions;
34
35#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
37#[serde(default)]
38pub struct StorageConfig {
39 pub data_home: String,
41 #[serde(flatten)]
42 pub store: ObjectStoreConfig,
43 pub providers: Vec<ObjectStoreConfig>,
45}
46
47impl StorageConfig {
48 pub fn is_object_storage(&self) -> bool {
50 self.store.is_object_storage()
51 }
52}
53
54impl Default for StorageConfig {
55 fn default() -> Self {
56 Self {
57 data_home: DEFAULT_DATA_HOME.to_string(),
58 store: ObjectStoreConfig::default(),
59 providers: vec![],
60 }
61 }
62}
63
64#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
65#[serde(default)]
66pub struct DatanodeOptions {
67 pub node_id: Option<u64>,
68 pub default_column_prefix: Option<String>,
69 pub workload_types: Vec<DatanodeWorkloadType>,
70 pub require_lease_before_startup: bool,
71 pub init_regions_in_background: bool,
72 pub init_regions_parallelism: usize,
73 pub grpc: GrpcOptions,
74 pub heartbeat: HeartbeatOptions,
75 pub http: HttpOptions,
76 pub meta_client: Option<MetaClientOptions>,
77 pub wal: DatanodeWalConfig,
78 pub storage: StorageConfig,
79 pub max_concurrent_queries: usize,
80 pub region_engine: Vec<RegionEngineConfig>,
82 pub logging: LoggingOptions,
83 pub enable_telemetry: bool,
84 pub tracing: TracingOptions,
85 pub query: QueryOptions,
86 pub memory: MemoryOptions,
87
88 #[deprecated(note = "Please use `grpc.addr` instead.")]
90 pub rpc_addr: Option<String>,
91 #[deprecated(note = "Please use `grpc.hostname` instead.")]
92 pub rpc_hostname: Option<String>,
93 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
94 pub rpc_runtime_size: Option<usize>,
95 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
96 pub rpc_max_recv_message_size: Option<ReadableSize>,
97 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
98 pub rpc_max_send_message_size: Option<ReadableSize>,
99}
100
101impl DatanodeOptions {
102 pub fn sanitize(&mut self) {
104 sanitize_workload_types(&mut self.workload_types);
105
106 if self.storage.is_object_storage() {
107 self.storage
108 .store
109 .cache_config_mut()
110 .unwrap()
111 .sanitize(&self.storage.data_home);
112 }
113 }
114}
115
116impl Default for DatanodeOptions {
117 #[allow(deprecated)]
118 fn default() -> Self {
119 Self {
120 node_id: None,
121 default_column_prefix: None,
122 workload_types: vec![DatanodeWorkloadType::Hybrid],
123 require_lease_before_startup: false,
124 init_regions_in_background: false,
125 init_regions_parallelism: 16,
126 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
127 http: HttpOptions::default(),
128 meta_client: None,
129 wal: DatanodeWalConfig::default(),
130 storage: StorageConfig::default(),
131 max_concurrent_queries: 0,
132 region_engine: vec![
133 RegionEngineConfig::Mito(MitoConfig::default()),
134 RegionEngineConfig::File(FileEngineConfig::default()),
135 ],
136 logging: LoggingOptions::default(),
137 heartbeat: HeartbeatOptions::datanode_default(),
138 enable_telemetry: true,
139 tracing: TracingOptions::default(),
140 query: QueryOptions::default(),
141 memory: MemoryOptions::default(),
142
143 rpc_addr: None,
145 rpc_hostname: None,
146 rpc_runtime_size: None,
147 rpc_max_recv_message_size: None,
148 rpc_max_send_message_size: None,
149 }
150 }
151}
152
153impl Configurable for DatanodeOptions {
154 fn env_list_keys() -> Option<&'static [&'static str]> {
155 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
156 }
157}
158
159#[allow(clippy::large_enum_variant)]
160#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
161pub enum RegionEngineConfig {
162 #[serde(rename = "mito")]
163 Mito(MitoConfig),
164 #[serde(rename = "file")]
165 File(FileEngineConfig),
166 #[serde(rename = "metric")]
167 Metric(MetricEngineConfig),
168}
169
170#[cfg(test)]
171mod tests {
172 use common_base::secrets::ExposeSecret;
173
174 use super::*;
175
176 #[test]
177 fn test_toml() {
178 let opts = DatanodeOptions::default();
179 let toml_string = toml::to_string(&opts).unwrap();
180 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
181 }
182
183 #[test]
184 fn test_secstr() {
185 let toml_str = r#"
186 [storage]
187 type = "S3"
188 access_key_id = "access_key_id"
189 secret_access_key = "secret_access_key"
190 "#;
191 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
192 match &opts.storage.store {
193 ObjectStoreConfig::S3(cfg) => {
194 assert_eq!(
195 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
196 format!("{:?}", cfg.connection.access_key_id)
197 );
198 assert_eq!(
199 "access_key_id",
200 cfg.connection.access_key_id.expose_secret()
201 );
202 }
203 _ => unreachable!(),
204 }
205 }
206 #[test]
207 fn test_skip_ssl_validation_config() {
208 let toml_str_true = r#"
210 [storage]
211 type = "S3"
212 [storage.http_client]
213 skip_ssl_validation = true
214 "#;
215 let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
216 match &opts.storage.store {
217 ObjectStoreConfig::S3(cfg) => {
218 assert!(cfg.http_client.skip_ssl_validation);
219 }
220 _ => panic!("Expected S3 config"),
221 }
222
223 let toml_str_false = r#"
225 [storage]
226 type = "S3"
227 [storage.http_client]
228 skip_ssl_validation = false
229 "#;
230 let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
231 match &opts.storage.store {
232 ObjectStoreConfig::S3(cfg) => {
233 assert!(!cfg.http_client.skip_ssl_validation);
234 }
235 _ => panic!("Expected S3 config"),
236 }
237 let toml_str_default = r#"
239 [storage]
240 type = "S3"
241 "#;
242 let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
243 match &opts.storage.store {
244 ObjectStoreConfig::S3(cfg) => {
245 assert!(!cfg.http_client.skip_ssl_validation);
246 }
247 _ => panic!("Expected S3 config"),
248 }
249 }
250
251 #[test]
252 fn test_cache_config() {
253 let toml_str = r#"
254 [storage]
255 data_home = "test_data_home"
256 type = "S3"
257 [storage.cache_config]
258 enable_read_cache = true
259 "#;
260 let mut opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
261 opts.sanitize();
262 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
263 assert_eq!(
264 opts.storage.store.cache_config().unwrap().cache_path,
265 "test_data_home"
266 );
267 }
268}