1use common_base::readable_size::ReadableSize;
18use common_config::{Configurable, DEFAULT_DATA_HOME};
19pub use common_procedure::options::ProcedureConfig;
20use common_telemetry::logging::{LoggingOptions, TracingOptions};
21use common_wal::config::DatanodeWalConfig;
22use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
23use file_engine::config::EngineConfig as FileEngineConfig;
24use meta_client::MetaClientOptions;
25use metric_engine::config::EngineConfig as MetricEngineConfig;
26use mito2::config::MitoConfig;
27pub(crate) use object_store::config::ObjectStoreConfig;
28use query::options::QueryOptions;
29use serde::{Deserialize, Serialize};
30use servers::export_metrics::ExportMetricsOption;
31use servers::grpc::GrpcOptions;
32use servers::heartbeat_options::HeartbeatOptions;
33use servers::http::HttpOptions;
34
35pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
36
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
39#[serde(default)]
40pub struct StorageConfig {
41 pub data_home: String,
43 #[serde(flatten)]
44 pub store: ObjectStoreConfig,
45 pub providers: Vec<ObjectStoreConfig>,
47}
48
49impl StorageConfig {
50 pub fn is_object_storage(&self) -> bool {
52 self.store.is_object_storage()
53 }
54}
55
56impl Default for StorageConfig {
57 fn default() -> Self {
58 Self {
59 data_home: DEFAULT_DATA_HOME.to_string(),
60 store: ObjectStoreConfig::default(),
61 providers: vec![],
62 }
63 }
64}
65
66#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
67#[serde(default)]
68pub struct DatanodeOptions {
69 pub node_id: Option<u64>,
70 pub workload_types: Vec<DatanodeWorkloadType>,
71 pub require_lease_before_startup: bool,
72 pub init_regions_in_background: bool,
73 pub init_regions_parallelism: usize,
74 pub grpc: GrpcOptions,
75 pub heartbeat: HeartbeatOptions,
76 pub http: HttpOptions,
77 pub meta_client: Option<MetaClientOptions>,
78 pub wal: DatanodeWalConfig,
79 pub storage: StorageConfig,
80 pub max_concurrent_queries: usize,
81 pub region_engine: Vec<RegionEngineConfig>,
83 pub logging: LoggingOptions,
84 pub enable_telemetry: bool,
85 pub export_metrics: ExportMetricsOption,
86 pub tracing: TracingOptions,
87 pub query: QueryOptions,
88
89 #[deprecated(note = "Please use `grpc.addr` instead.")]
91 pub rpc_addr: Option<String>,
92 #[deprecated(note = "Please use `grpc.hostname` instead.")]
93 pub rpc_hostname: Option<String>,
94 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
95 pub rpc_runtime_size: Option<usize>,
96 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
97 pub rpc_max_recv_message_size: Option<ReadableSize>,
98 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
99 pub rpc_max_send_message_size: Option<ReadableSize>,
100}
101
102impl DatanodeOptions {
103 pub fn sanitize(&mut self) {
105 sanitize_workload_types(&mut self.workload_types);
106 }
107}
108
109impl Default for DatanodeOptions {
110 #[allow(deprecated)]
111 fn default() -> Self {
112 Self {
113 node_id: None,
114 workload_types: vec![DatanodeWorkloadType::Hybrid],
115 require_lease_before_startup: false,
116 init_regions_in_background: false,
117 init_regions_parallelism: 16,
118 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
119 http: HttpOptions::default(),
120 meta_client: None,
121 wal: DatanodeWalConfig::default(),
122 storage: StorageConfig::default(),
123 max_concurrent_queries: 0,
124 region_engine: vec![
125 RegionEngineConfig::Mito(MitoConfig::default()),
126 RegionEngineConfig::File(FileEngineConfig::default()),
127 ],
128 logging: LoggingOptions::default(),
129 heartbeat: HeartbeatOptions::datanode_default(),
130 enable_telemetry: true,
131 export_metrics: ExportMetricsOption::default(),
132 tracing: TracingOptions::default(),
133 query: QueryOptions::default(),
134
135 rpc_addr: None,
137 rpc_hostname: None,
138 rpc_runtime_size: None,
139 rpc_max_recv_message_size: None,
140 rpc_max_send_message_size: None,
141 }
142 }
143}
144
145impl Configurable for DatanodeOptions {
146 fn env_list_keys() -> Option<&'static [&'static str]> {
147 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
148 }
149}
150
151#[allow(clippy::large_enum_variant)]
152#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
153pub enum RegionEngineConfig {
154 #[serde(rename = "mito")]
155 Mito(MitoConfig),
156 #[serde(rename = "file")]
157 File(FileEngineConfig),
158 #[serde(rename = "metric")]
159 Metric(MetricEngineConfig),
160}
161
162#[cfg(test)]
163mod tests {
164 use common_base::secrets::ExposeSecret;
165
166 use super::*;
167
168 #[test]
169 fn test_toml() {
170 let opts = DatanodeOptions::default();
171 let toml_string = toml::to_string(&opts).unwrap();
172 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
173 }
174
175 #[test]
176 fn test_secstr() {
177 let toml_str = r#"
178 [storage]
179 type = "S3"
180 access_key_id = "access_key_id"
181 secret_access_key = "secret_access_key"
182 "#;
183 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
184 match &opts.storage.store {
185 ObjectStoreConfig::S3(cfg) => {
186 assert_eq!(
187 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
188 format!("{:?}", cfg.access_key_id)
189 );
190 assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
191 }
192 _ => unreachable!(),
193 }
194 }
195 #[test]
196 fn test_skip_ssl_validation_config() {
197 let toml_str_true = r#"
199 [storage]
200 type = "S3"
201 [storage.http_client]
202 skip_ssl_validation = true
203 "#;
204 let opts: DatanodeOptions = toml::from_str(toml_str_true).unwrap();
205 match &opts.storage.store {
206 ObjectStoreConfig::S3(cfg) => {
207 assert!(cfg.http_client.skip_ssl_validation);
208 }
209 _ => panic!("Expected S3 config"),
210 }
211
212 let toml_str_false = r#"
214 [storage]
215 type = "S3"
216 [storage.http_client]
217 skip_ssl_validation = false
218 "#;
219 let opts: DatanodeOptions = toml::from_str(toml_str_false).unwrap();
220 match &opts.storage.store {
221 ObjectStoreConfig::S3(cfg) => {
222 assert!(!cfg.http_client.skip_ssl_validation);
223 }
224 _ => panic!("Expected S3 config"),
225 }
226 let toml_str_default = r#"
228 [storage]
229 type = "S3"
230 "#;
231 let opts: DatanodeOptions = toml::from_str(toml_str_default).unwrap();
232 match &opts.storage.store {
233 ObjectStoreConfig::S3(cfg) => {
234 assert!(!cfg.http_client.skip_ssl_validation);
235 }
236 _ => panic!("Expected S3 config"),
237 }
238 }
239}