1use core::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_base::secrets::{ExposeSecret, SecretString};
21use common_config::Configurable;
22pub use common_procedure::options::ProcedureConfig;
23use common_telemetry::logging::{LoggingOptions, TracingOptions};
24use common_wal::config::DatanodeWalConfig;
25use common_workload::{sanitize_workload_types, DatanodeWorkloadType};
26use file_engine::config::EngineConfig as FileEngineConfig;
27use meta_client::MetaClientOptions;
28use metric_engine::config::EngineConfig as MetricEngineConfig;
29use mito2::config::MitoConfig;
30use query::options::QueryOptions;
31use serde::{Deserialize, Serialize};
32use servers::export_metrics::ExportMetricsOption;
33use servers::grpc::GrpcOptions;
34use servers::heartbeat_options::HeartbeatOptions;
35use servers::http::HttpOptions;
36
37pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
38
39const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
44#[serde(tag = "type")]
45pub enum ObjectStoreConfig {
46 File(FileConfig),
47 S3(S3Config),
48 Oss(OssConfig),
49 Azblob(AzblobConfig),
50 Gcs(GcsConfig),
51}
52
53impl ObjectStoreConfig {
54 pub fn provider_name(&self) -> &'static str {
56 match self {
57 Self::File(_) => "File",
58 Self::S3(_) => "S3",
59 Self::Oss(_) => "Oss",
60 Self::Azblob(_) => "Azblob",
61 Self::Gcs(_) => "Gcs",
62 }
63 }
64
65 pub fn is_object_storage(&self) -> bool {
67 !matches!(self, Self::File(_))
68 }
69
70 pub fn config_name(&self) -> &str {
72 let name = match self {
73 Self::File(_) => self.provider_name(),
75 Self::S3(s3) => &s3.name,
76 Self::Oss(oss) => &oss.name,
77 Self::Azblob(az) => &az.name,
78 Self::Gcs(gcs) => &gcs.name,
79 };
80
81 if name.trim().is_empty() {
82 return self.provider_name();
83 }
84
85 name
86 }
87}
88
89#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
91#[serde(default)]
92pub struct StorageConfig {
93 pub data_home: String,
95 #[serde(flatten)]
96 pub store: ObjectStoreConfig,
97 pub providers: Vec<ObjectStoreConfig>,
99}
100
101impl StorageConfig {
102 pub fn is_object_storage(&self) -> bool {
104 self.store.is_object_storage()
105 }
106}
107
108impl Default for StorageConfig {
109 fn default() -> Self {
110 Self {
111 data_home: DEFAULT_DATA_HOME.to_string(),
112 store: ObjectStoreConfig::default(),
113 providers: vec![],
114 }
115 }
116}
117
118#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
119#[serde(default)]
120pub struct FileConfig {}
121
122#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
123#[serde(default)]
124pub struct ObjectStorageCacheConfig {
125 pub cache_path: Option<String>,
127 pub cache_capacity: Option<ReadableSize>,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
133#[serde(default)]
134pub struct HttpClientConfig {
135 pub(crate) pool_max_idle_per_host: u32,
137
138 #[serde(with = "humantime_serde")]
140 pub(crate) connect_timeout: Duration,
141
142 #[serde(with = "humantime_serde")]
145 pub(crate) timeout: Duration,
146
147 #[serde(with = "humantime_serde")]
149 pub(crate) pool_idle_timeout: Duration,
150}
151
152impl Default for HttpClientConfig {
153 fn default() -> Self {
154 Self {
155 pool_max_idle_per_host: 1024,
156 connect_timeout: Duration::from_secs(30),
157 timeout: Duration::from_secs(30),
158 pool_idle_timeout: Duration::from_secs(90),
159 }
160 }
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
164#[serde(default)]
165pub struct S3Config {
166 pub name: String,
167 pub bucket: String,
168 pub root: String,
169 #[serde(skip_serializing)]
170 pub access_key_id: SecretString,
171 #[serde(skip_serializing)]
172 pub secret_access_key: SecretString,
173 pub endpoint: Option<String>,
174 pub region: Option<String>,
175 pub enable_virtual_host_style: bool,
179 #[serde(flatten)]
180 pub cache: ObjectStorageCacheConfig,
181 pub http_client: HttpClientConfig,
182}
183
184impl PartialEq for S3Config {
185 fn eq(&self, other: &Self) -> bool {
186 self.name == other.name
187 && self.bucket == other.bucket
188 && self.root == other.root
189 && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
190 && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
191 && self.endpoint == other.endpoint
192 && self.region == other.region
193 && self.enable_virtual_host_style == other.enable_virtual_host_style
194 && self.cache == other.cache
195 && self.http_client == other.http_client
196 }
197}
198
199#[derive(Debug, Clone, Serialize, Deserialize)]
200#[serde(default)]
201pub struct OssConfig {
202 pub name: String,
203 pub bucket: String,
204 pub root: String,
205 #[serde(skip_serializing)]
206 pub access_key_id: SecretString,
207 #[serde(skip_serializing)]
208 pub access_key_secret: SecretString,
209 pub endpoint: String,
210 #[serde(flatten)]
211 pub cache: ObjectStorageCacheConfig,
212 pub http_client: HttpClientConfig,
213}
214
215impl PartialEq for OssConfig {
216 fn eq(&self, other: &Self) -> bool {
217 self.name == other.name
218 && self.bucket == other.bucket
219 && self.root == other.root
220 && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
221 && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
222 && self.endpoint == other.endpoint
223 && self.cache == other.cache
224 && self.http_client == other.http_client
225 }
226}
227
228#[derive(Debug, Clone, Serialize, Deserialize)]
229#[serde(default)]
230pub struct AzblobConfig {
231 pub name: String,
232 pub container: String,
233 pub root: String,
234 #[serde(skip_serializing)]
235 pub account_name: SecretString,
236 #[serde(skip_serializing)]
237 pub account_key: SecretString,
238 pub endpoint: String,
239 pub sas_token: Option<String>,
240 #[serde(flatten)]
241 pub cache: ObjectStorageCacheConfig,
242 pub http_client: HttpClientConfig,
243}
244
245impl PartialEq for AzblobConfig {
246 fn eq(&self, other: &Self) -> bool {
247 self.name == other.name
248 && self.container == other.container
249 && self.root == other.root
250 && self.account_name.expose_secret() == other.account_name.expose_secret()
251 && self.account_key.expose_secret() == other.account_key.expose_secret()
252 && self.endpoint == other.endpoint
253 && self.sas_token == other.sas_token
254 && self.cache == other.cache
255 && self.http_client == other.http_client
256 }
257}
258
259#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(default)]
261pub struct GcsConfig {
262 pub name: String,
263 pub root: String,
264 pub bucket: String,
265 pub scope: String,
266 #[serde(skip_serializing)]
267 pub credential_path: SecretString,
268 #[serde(skip_serializing)]
269 pub credential: SecretString,
270 pub endpoint: String,
271 #[serde(flatten)]
272 pub cache: ObjectStorageCacheConfig,
273 pub http_client: HttpClientConfig,
274}
275
276impl PartialEq for GcsConfig {
277 fn eq(&self, other: &Self) -> bool {
278 self.name == other.name
279 && self.root == other.root
280 && self.bucket == other.bucket
281 && self.scope == other.scope
282 && self.credential_path.expose_secret() == other.credential_path.expose_secret()
283 && self.credential.expose_secret() == other.credential.expose_secret()
284 && self.endpoint == other.endpoint
285 && self.cache == other.cache
286 && self.http_client == other.http_client
287 }
288}
289
290impl Default for S3Config {
291 fn default() -> Self {
292 Self {
293 name: String::default(),
294 bucket: String::default(),
295 root: String::default(),
296 access_key_id: SecretString::from(String::default()),
297 secret_access_key: SecretString::from(String::default()),
298 enable_virtual_host_style: false,
299 endpoint: Option::default(),
300 region: Option::default(),
301 cache: ObjectStorageCacheConfig::default(),
302 http_client: HttpClientConfig::default(),
303 }
304 }
305}
306
307impl Default for OssConfig {
308 fn default() -> Self {
309 Self {
310 name: String::default(),
311 bucket: String::default(),
312 root: String::default(),
313 access_key_id: SecretString::from(String::default()),
314 access_key_secret: SecretString::from(String::default()),
315 endpoint: String::default(),
316 cache: ObjectStorageCacheConfig::default(),
317 http_client: HttpClientConfig::default(),
318 }
319 }
320}
321
322impl Default for AzblobConfig {
323 fn default() -> Self {
324 Self {
325 name: String::default(),
326 container: String::default(),
327 root: String::default(),
328 account_name: SecretString::from(String::default()),
329 account_key: SecretString::from(String::default()),
330 endpoint: String::default(),
331 sas_token: Option::default(),
332 cache: ObjectStorageCacheConfig::default(),
333 http_client: HttpClientConfig::default(),
334 }
335 }
336}
337
338impl Default for GcsConfig {
339 fn default() -> Self {
340 Self {
341 name: String::default(),
342 root: String::default(),
343 bucket: String::default(),
344 scope: String::default(),
345 credential_path: SecretString::from(String::default()),
346 credential: SecretString::from(String::default()),
347 endpoint: String::default(),
348 cache: ObjectStorageCacheConfig::default(),
349 http_client: HttpClientConfig::default(),
350 }
351 }
352}
353
354impl Default for ObjectStoreConfig {
355 fn default() -> Self {
356 ObjectStoreConfig::File(FileConfig {})
357 }
358}
359
360#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
361#[serde(default)]
362pub struct DatanodeOptions {
363 pub node_id: Option<u64>,
364 pub workload_types: Vec<DatanodeWorkloadType>,
365 pub require_lease_before_startup: bool,
366 pub init_regions_in_background: bool,
367 pub init_regions_parallelism: usize,
368 pub grpc: GrpcOptions,
369 pub heartbeat: HeartbeatOptions,
370 pub http: HttpOptions,
371 pub meta_client: Option<MetaClientOptions>,
372 pub wal: DatanodeWalConfig,
373 pub storage: StorageConfig,
374 pub max_concurrent_queries: usize,
375 pub region_engine: Vec<RegionEngineConfig>,
377 pub logging: LoggingOptions,
378 pub enable_telemetry: bool,
379 pub export_metrics: ExportMetricsOption,
380 pub tracing: TracingOptions,
381 pub query: QueryOptions,
382
383 #[deprecated(note = "Please use `grpc.addr` instead.")]
385 pub rpc_addr: Option<String>,
386 #[deprecated(note = "Please use `grpc.hostname` instead.")]
387 pub rpc_hostname: Option<String>,
388 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
389 pub rpc_runtime_size: Option<usize>,
390 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
391 pub rpc_max_recv_message_size: Option<ReadableSize>,
392 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
393 pub rpc_max_send_message_size: Option<ReadableSize>,
394}
395
396impl DatanodeOptions {
397 pub fn sanitize(&mut self) {
399 sanitize_workload_types(&mut self.workload_types);
400 }
401}
402
403impl Default for DatanodeOptions {
404 #[allow(deprecated)]
405 fn default() -> Self {
406 Self {
407 node_id: None,
408 workload_types: vec![DatanodeWorkloadType::Hybrid],
409 require_lease_before_startup: false,
410 init_regions_in_background: false,
411 init_regions_parallelism: 16,
412 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
413 http: HttpOptions::default(),
414 meta_client: None,
415 wal: DatanodeWalConfig::default(),
416 storage: StorageConfig::default(),
417 max_concurrent_queries: 0,
418 region_engine: vec![
419 RegionEngineConfig::Mito(MitoConfig::default()),
420 RegionEngineConfig::File(FileEngineConfig::default()),
421 ],
422 logging: LoggingOptions::default(),
423 heartbeat: HeartbeatOptions::datanode_default(),
424 enable_telemetry: true,
425 export_metrics: ExportMetricsOption::default(),
426 tracing: TracingOptions::default(),
427 query: QueryOptions::default(),
428
429 rpc_addr: None,
431 rpc_hostname: None,
432 rpc_runtime_size: None,
433 rpc_max_recv_message_size: None,
434 rpc_max_send_message_size: None,
435 }
436 }
437}
438
439impl Configurable for DatanodeOptions {
440 fn env_list_keys() -> Option<&'static [&'static str]> {
441 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
442 }
443}
444
445#[allow(clippy::large_enum_variant)]
446#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
447pub enum RegionEngineConfig {
448 #[serde(rename = "mito")]
449 Mito(MitoConfig),
450 #[serde(rename = "file")]
451 File(FileEngineConfig),
452 #[serde(rename = "metric")]
453 Metric(MetricEngineConfig),
454}
455
456#[cfg(test)]
457mod tests {
458 use common_base::secrets::ExposeSecret;
459
460 use super::*;
461
462 #[test]
463 fn test_toml() {
464 let opts = DatanodeOptions::default();
465 let toml_string = toml::to_string(&opts).unwrap();
466 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
467 }
468
469 #[test]
470 fn test_config_name() {
471 let object_store_config = ObjectStoreConfig::default();
472 assert_eq!("File", object_store_config.config_name());
473
474 let s3_config = ObjectStoreConfig::S3(S3Config::default());
475 assert_eq!("S3", s3_config.config_name());
476 assert_eq!("S3", s3_config.provider_name());
477
478 let s3_config = ObjectStoreConfig::S3(S3Config {
479 name: "test".to_string(),
480 ..Default::default()
481 });
482 assert_eq!("test", s3_config.config_name());
483 assert_eq!("S3", s3_config.provider_name());
484 }
485
486 #[test]
487 fn test_is_object_storage() {
488 let store = ObjectStoreConfig::default();
489 assert!(!store.is_object_storage());
490 let s3_config = ObjectStoreConfig::S3(S3Config::default());
491 assert!(s3_config.is_object_storage());
492 let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
493 assert!(oss_config.is_object_storage());
494 let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
495 assert!(gcs_config.is_object_storage());
496 let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
497 assert!(azblob_config.is_object_storage());
498 }
499
500 #[test]
501 fn test_secstr() {
502 let toml_str = r#"
503 [storage]
504 type = "S3"
505 access_key_id = "access_key_id"
506 secret_access_key = "secret_access_key"
507 "#;
508 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
509 match &opts.storage.store {
510 ObjectStoreConfig::S3(cfg) => {
511 assert_eq!(
512 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
513 format!("{:?}", cfg.access_key_id)
514 );
515 assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
516 }
517 _ => unreachable!(),
518 }
519 }
520}