1use core::time::Duration;
18
19use common_base::readable_size::ReadableSize;
20use common_base::secrets::{ExposeSecret, SecretString};
21use common_config::Configurable;
22pub use common_procedure::options::ProcedureConfig;
23use common_telemetry::logging::{LoggingOptions, TracingOptions};
24use common_wal::config::DatanodeWalConfig;
25use file_engine::config::EngineConfig as FileEngineConfig;
26use meta_client::MetaClientOptions;
27use metric_engine::config::EngineConfig as MetricEngineConfig;
28use mito2::config::MitoConfig;
29use query::options::QueryOptions;
30use serde::{Deserialize, Serialize};
31use servers::export_metrics::ExportMetricsOption;
32use servers::grpc::GrpcOptions;
33use servers::heartbeat_options::HeartbeatOptions;
34use servers::http::HttpOptions;
35
36pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize::gb(5);
37
38const DEFAULT_DATA_HOME: &str = "./greptimedb_data";
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
43#[serde(tag = "type")]
44pub enum ObjectStoreConfig {
45 File(FileConfig),
46 S3(S3Config),
47 Oss(OssConfig),
48 Azblob(AzblobConfig),
49 Gcs(GcsConfig),
50}
51
52impl ObjectStoreConfig {
53 pub fn provider_name(&self) -> &'static str {
55 match self {
56 Self::File(_) => "File",
57 Self::S3(_) => "S3",
58 Self::Oss(_) => "Oss",
59 Self::Azblob(_) => "Azblob",
60 Self::Gcs(_) => "Gcs",
61 }
62 }
63
64 pub fn is_object_storage(&self) -> bool {
66 !matches!(self, Self::File(_))
67 }
68
69 pub fn config_name(&self) -> &str {
71 let name = match self {
72 Self::File(_) => self.provider_name(),
74 Self::S3(s3) => &s3.name,
75 Self::Oss(oss) => &oss.name,
76 Self::Azblob(az) => &az.name,
77 Self::Gcs(gcs) => &gcs.name,
78 };
79
80 if name.trim().is_empty() {
81 return self.provider_name();
82 }
83
84 name
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
90#[serde(default)]
91pub struct StorageConfig {
92 pub data_home: String,
94 #[serde(flatten)]
95 pub store: ObjectStoreConfig,
96 pub providers: Vec<ObjectStoreConfig>,
98}
99
100impl StorageConfig {
101 pub fn is_object_storage(&self) -> bool {
103 self.store.is_object_storage()
104 }
105}
106
107impl Default for StorageConfig {
108 fn default() -> Self {
109 Self {
110 data_home: DEFAULT_DATA_HOME.to_string(),
111 store: ObjectStoreConfig::default(),
112 providers: vec![],
113 }
114 }
115}
116
117#[derive(Debug, Clone, Serialize, Default, Deserialize, Eq, PartialEq)]
118#[serde(default)]
119pub struct FileConfig {}
120
121#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq)]
122#[serde(default)]
123pub struct ObjectStorageCacheConfig {
124 pub cache_path: Option<String>,
126 pub cache_capacity: Option<ReadableSize>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
132#[serde(default)]
133pub struct HttpClientConfig {
134 pub(crate) pool_max_idle_per_host: u32,
136
137 #[serde(with = "humantime_serde")]
139 pub(crate) connect_timeout: Duration,
140
141 #[serde(with = "humantime_serde")]
144 pub(crate) timeout: Duration,
145
146 #[serde(with = "humantime_serde")]
148 pub(crate) pool_idle_timeout: Duration,
149}
150
151impl Default for HttpClientConfig {
152 fn default() -> Self {
153 Self {
154 pool_max_idle_per_host: 1024,
155 connect_timeout: Duration::from_secs(30),
156 timeout: Duration::from_secs(30),
157 pool_idle_timeout: Duration::from_secs(90),
158 }
159 }
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(default)]
164pub struct S3Config {
165 pub name: String,
166 pub bucket: String,
167 pub root: String,
168 #[serde(skip_serializing)]
169 pub access_key_id: SecretString,
170 #[serde(skip_serializing)]
171 pub secret_access_key: SecretString,
172 pub endpoint: Option<String>,
173 pub region: Option<String>,
174 pub enable_virtual_host_style: bool,
178 #[serde(flatten)]
179 pub cache: ObjectStorageCacheConfig,
180 pub http_client: HttpClientConfig,
181}
182
183impl PartialEq for S3Config {
184 fn eq(&self, other: &Self) -> bool {
185 self.name == other.name
186 && self.bucket == other.bucket
187 && self.root == other.root
188 && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
189 && self.secret_access_key.expose_secret() == other.secret_access_key.expose_secret()
190 && self.endpoint == other.endpoint
191 && self.region == other.region
192 && self.enable_virtual_host_style == other.enable_virtual_host_style
193 && self.cache == other.cache
194 && self.http_client == other.http_client
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199#[serde(default)]
200pub struct OssConfig {
201 pub name: String,
202 pub bucket: String,
203 pub root: String,
204 #[serde(skip_serializing)]
205 pub access_key_id: SecretString,
206 #[serde(skip_serializing)]
207 pub access_key_secret: SecretString,
208 pub endpoint: String,
209 #[serde(flatten)]
210 pub cache: ObjectStorageCacheConfig,
211 pub http_client: HttpClientConfig,
212}
213
214impl PartialEq for OssConfig {
215 fn eq(&self, other: &Self) -> bool {
216 self.name == other.name
217 && self.bucket == other.bucket
218 && self.root == other.root
219 && self.access_key_id.expose_secret() == other.access_key_id.expose_secret()
220 && self.access_key_secret.expose_secret() == other.access_key_secret.expose_secret()
221 && self.endpoint == other.endpoint
222 && self.cache == other.cache
223 && self.http_client == other.http_client
224 }
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
228#[serde(default)]
229pub struct AzblobConfig {
230 pub name: String,
231 pub container: String,
232 pub root: String,
233 #[serde(skip_serializing)]
234 pub account_name: SecretString,
235 #[serde(skip_serializing)]
236 pub account_key: SecretString,
237 pub endpoint: String,
238 pub sas_token: Option<String>,
239 #[serde(flatten)]
240 pub cache: ObjectStorageCacheConfig,
241 pub http_client: HttpClientConfig,
242}
243
244impl PartialEq for AzblobConfig {
245 fn eq(&self, other: &Self) -> bool {
246 self.name == other.name
247 && self.container == other.container
248 && self.root == other.root
249 && self.account_name.expose_secret() == other.account_name.expose_secret()
250 && self.account_key.expose_secret() == other.account_key.expose_secret()
251 && self.endpoint == other.endpoint
252 && self.sas_token == other.sas_token
253 && self.cache == other.cache
254 && self.http_client == other.http_client
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
259#[serde(default)]
260pub struct GcsConfig {
261 pub name: String,
262 pub root: String,
263 pub bucket: String,
264 pub scope: String,
265 #[serde(skip_serializing)]
266 pub credential_path: SecretString,
267 #[serde(skip_serializing)]
268 pub credential: SecretString,
269 pub endpoint: String,
270 #[serde(flatten)]
271 pub cache: ObjectStorageCacheConfig,
272 pub http_client: HttpClientConfig,
273}
274
275impl PartialEq for GcsConfig {
276 fn eq(&self, other: &Self) -> bool {
277 self.name == other.name
278 && self.root == other.root
279 && self.bucket == other.bucket
280 && self.scope == other.scope
281 && self.credential_path.expose_secret() == other.credential_path.expose_secret()
282 && self.credential.expose_secret() == other.credential.expose_secret()
283 && self.endpoint == other.endpoint
284 && self.cache == other.cache
285 && self.http_client == other.http_client
286 }
287}
288
289impl Default for S3Config {
290 fn default() -> Self {
291 Self {
292 name: String::default(),
293 bucket: String::default(),
294 root: String::default(),
295 access_key_id: SecretString::from(String::default()),
296 secret_access_key: SecretString::from(String::default()),
297 enable_virtual_host_style: false,
298 endpoint: Option::default(),
299 region: Option::default(),
300 cache: ObjectStorageCacheConfig::default(),
301 http_client: HttpClientConfig::default(),
302 }
303 }
304}
305
306impl Default for OssConfig {
307 fn default() -> Self {
308 Self {
309 name: String::default(),
310 bucket: String::default(),
311 root: String::default(),
312 access_key_id: SecretString::from(String::default()),
313 access_key_secret: SecretString::from(String::default()),
314 endpoint: String::default(),
315 cache: ObjectStorageCacheConfig::default(),
316 http_client: HttpClientConfig::default(),
317 }
318 }
319}
320
321impl Default for AzblobConfig {
322 fn default() -> Self {
323 Self {
324 name: String::default(),
325 container: String::default(),
326 root: String::default(),
327 account_name: SecretString::from(String::default()),
328 account_key: SecretString::from(String::default()),
329 endpoint: String::default(),
330 sas_token: Option::default(),
331 cache: ObjectStorageCacheConfig::default(),
332 http_client: HttpClientConfig::default(),
333 }
334 }
335}
336
337impl Default for GcsConfig {
338 fn default() -> Self {
339 Self {
340 name: String::default(),
341 root: String::default(),
342 bucket: String::default(),
343 scope: String::default(),
344 credential_path: SecretString::from(String::default()),
345 credential: SecretString::from(String::default()),
346 endpoint: String::default(),
347 cache: ObjectStorageCacheConfig::default(),
348 http_client: HttpClientConfig::default(),
349 }
350 }
351}
352
353impl Default for ObjectStoreConfig {
354 fn default() -> Self {
355 ObjectStoreConfig::File(FileConfig {})
356 }
357}
358
359#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
360#[serde(default)]
361pub struct DatanodeOptions {
362 pub node_id: Option<u64>,
363 pub require_lease_before_startup: bool,
364 pub init_regions_in_background: bool,
365 pub init_regions_parallelism: usize,
366 pub grpc: GrpcOptions,
367 pub heartbeat: HeartbeatOptions,
368 pub http: HttpOptions,
369 pub meta_client: Option<MetaClientOptions>,
370 pub wal: DatanodeWalConfig,
371 pub storage: StorageConfig,
372 pub max_concurrent_queries: usize,
373 pub region_engine: Vec<RegionEngineConfig>,
375 pub logging: LoggingOptions,
376 pub enable_telemetry: bool,
377 pub export_metrics: ExportMetricsOption,
378 pub tracing: TracingOptions,
379 pub query: QueryOptions,
380
381 #[deprecated(note = "Please use `grpc.addr` instead.")]
383 pub rpc_addr: Option<String>,
384 #[deprecated(note = "Please use `grpc.hostname` instead.")]
385 pub rpc_hostname: Option<String>,
386 #[deprecated(note = "Please use `grpc.runtime_size` instead.")]
387 pub rpc_runtime_size: Option<usize>,
388 #[deprecated(note = "Please use `grpc.max_recv_message_size` instead.")]
389 pub rpc_max_recv_message_size: Option<ReadableSize>,
390 #[deprecated(note = "Please use `grpc.max_send_message_size` instead.")]
391 pub rpc_max_send_message_size: Option<ReadableSize>,
392}
393
394impl Default for DatanodeOptions {
395 #[allow(deprecated)]
396 fn default() -> Self {
397 Self {
398 node_id: None,
399 require_lease_before_startup: false,
400 init_regions_in_background: false,
401 init_regions_parallelism: 16,
402 grpc: GrpcOptions::default().with_bind_addr("127.0.0.1:3001"),
403 http: HttpOptions::default(),
404 meta_client: None,
405 wal: DatanodeWalConfig::default(),
406 storage: StorageConfig::default(),
407 max_concurrent_queries: 0,
408 region_engine: vec![
409 RegionEngineConfig::Mito(MitoConfig::default()),
410 RegionEngineConfig::File(FileEngineConfig::default()),
411 ],
412 logging: LoggingOptions::default(),
413 heartbeat: HeartbeatOptions::datanode_default(),
414 enable_telemetry: true,
415 export_metrics: ExportMetricsOption::default(),
416 tracing: TracingOptions::default(),
417 query: QueryOptions::default(),
418
419 rpc_addr: None,
421 rpc_hostname: None,
422 rpc_runtime_size: None,
423 rpc_max_recv_message_size: None,
424 rpc_max_send_message_size: None,
425 }
426 }
427}
428
429impl Configurable for DatanodeOptions {
430 fn env_list_keys() -> Option<&'static [&'static str]> {
431 Some(&["meta_client.metasrv_addrs", "wal.broker_endpoints"])
432 }
433}
434
435#[allow(clippy::large_enum_variant)]
436#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
437pub enum RegionEngineConfig {
438 #[serde(rename = "mito")]
439 Mito(MitoConfig),
440 #[serde(rename = "file")]
441 File(FileEngineConfig),
442 #[serde(rename = "metric")]
443 Metric(MetricEngineConfig),
444}
445
446#[cfg(test)]
447mod tests {
448 use common_base::secrets::ExposeSecret;
449
450 use super::*;
451
452 #[test]
453 fn test_toml() {
454 let opts = DatanodeOptions::default();
455 let toml_string = toml::to_string(&opts).unwrap();
456 let _parsed: DatanodeOptions = toml::from_str(&toml_string).unwrap();
457 }
458
459 #[test]
460 fn test_config_name() {
461 let object_store_config = ObjectStoreConfig::default();
462 assert_eq!("File", object_store_config.config_name());
463
464 let s3_config = ObjectStoreConfig::S3(S3Config::default());
465 assert_eq!("S3", s3_config.config_name());
466 assert_eq!("S3", s3_config.provider_name());
467
468 let s3_config = ObjectStoreConfig::S3(S3Config {
469 name: "test".to_string(),
470 ..Default::default()
471 });
472 assert_eq!("test", s3_config.config_name());
473 assert_eq!("S3", s3_config.provider_name());
474 }
475
476 #[test]
477 fn test_is_object_storage() {
478 let store = ObjectStoreConfig::default();
479 assert!(!store.is_object_storage());
480 let s3_config = ObjectStoreConfig::S3(S3Config::default());
481 assert!(s3_config.is_object_storage());
482 let oss_config = ObjectStoreConfig::Oss(OssConfig::default());
483 assert!(oss_config.is_object_storage());
484 let gcs_config = ObjectStoreConfig::Gcs(GcsConfig::default());
485 assert!(gcs_config.is_object_storage());
486 let azblob_config = ObjectStoreConfig::Azblob(AzblobConfig::default());
487 assert!(azblob_config.is_object_storage());
488 }
489
490 #[test]
491 fn test_secstr() {
492 let toml_str = r#"
493 [storage]
494 type = "S3"
495 access_key_id = "access_key_id"
496 secret_access_key = "secret_access_key"
497 "#;
498 let opts: DatanodeOptions = toml::from_str(toml_str).unwrap();
499 match &opts.storage.store {
500 ObjectStoreConfig::S3(cfg) => {
501 assert_eq!(
502 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
503 format!("{:?}", cfg.access_key_id)
504 );
505 assert_eq!("access_key_id", cfg.access_key_id.expose_secret());
506 }
507 _ => unreachable!(),
508 }
509 }
510}