1use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtensionRef;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{Configurable, metadata_store_dir};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::ddl::flow_meta::FlowMetadataAllocator;
32use common_meta::ddl::table_meta::TableMetadataAllocator;
33use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
34use common_meta::ddl_manager::DdlManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
37use common_meta::kv_backend::KvBackendRef;
38use common_meta::procedure_executor::LocalProcedureExecutor;
39use common_meta::region_keeper::MemoryRegionKeeper;
40use common_meta::region_registry::LeaderRegionRegistry;
41use common_meta::sequence::SequenceBuilder;
42use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::info;
45use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
46use common_time::timezone::set_default_timezone;
47use common_version::{short_version, verbose_version};
48use datanode::config::DatanodeOptions;
49use datanode::datanode::{Datanode, DatanodeBuilder};
50use flow::{
51 FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
52 GrpcQueryHandlerWithBoxedError,
53};
54use frontend::frontend::Frontend;
55use frontend::instance::StandaloneDatanodeManager;
56use frontend::instance::builder::FrontendBuilder;
57use frontend::server::Services;
58use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
59use servers::export_metrics::ExportMetricsTask;
60use servers::tls::{TlsMode, TlsOption};
61use snafu::ResultExt;
62use standalone::StandaloneInformationExtension;
63use standalone::options::StandaloneOptions;
64use tracing_appender::non_blocking::WorkerGuard;
65
66use crate::error::{Result, StartFlownodeSnafu};
67use crate::options::{GlobalOptions, GreptimeOptions};
68use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
69
70pub const APP_NAME: &str = "greptime-standalone";
71
72#[derive(Parser)]
73pub struct Command {
74 #[clap(subcommand)]
75 subcmd: SubCommand,
76}
77
78impl Command {
79 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
80 self.subcmd.build(opts).await
81 }
82
83 pub fn load_options(
84 &self,
85 global_options: &GlobalOptions,
86 ) -> Result<GreptimeOptions<StandaloneOptions>> {
87 self.subcmd.load_options(global_options)
88 }
89}
90
91#[derive(Parser)]
92enum SubCommand {
93 Start(StartCommand),
94}
95
96impl SubCommand {
97 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98 match self {
99 SubCommand::Start(cmd) => cmd.build(opts).await,
100 }
101 }
102
103 fn load_options(
104 &self,
105 global_options: &GlobalOptions,
106 ) -> Result<GreptimeOptions<StandaloneOptions>> {
107 match self {
108 SubCommand::Start(cmd) => cmd.load_options(global_options),
109 }
110 }
111}
112
113pub struct Instance {
114 datanode: Datanode,
115 frontend: Frontend,
116 flownode: FlownodeInstance,
117 procedure_manager: ProcedureManagerRef,
118 wal_options_allocator: WalOptionsAllocatorRef,
119
120 #[cfg(feature = "enterprise")]
123 components: Components,
124
125 _guard: Vec<WorkerGuard>,
127}
128
129#[cfg(feature = "enterprise")]
130pub struct Components {
131 pub plugins: Plugins,
132 pub kv_backend: KvBackendRef,
133 pub frontend_client: Arc<FrontendClient>,
134 pub catalog_manager: catalog::CatalogManagerRef,
135}
136
137impl Instance {
138 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
140 self.frontend.server_handlers().addr(name)
141 }
142
143 #[cfg(feature = "enterprise")]
144 pub fn components(&self) -> &Components {
145 &self.components
146 }
147}
148
149#[async_trait]
150impl App for Instance {
151 fn name(&self) -> &str {
152 APP_NAME
153 }
154
155 async fn start(&mut self) -> Result<()> {
156 self.datanode.start_telemetry();
157
158 self.procedure_manager
159 .start()
160 .await
161 .context(error::StartProcedureManagerSnafu)?;
162
163 self.wal_options_allocator
164 .start()
165 .await
166 .context(error::StartWalOptionsAllocatorSnafu)?;
167
168 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
169 .await
170 .context(error::StartFrontendSnafu)?;
171
172 self.frontend
173 .start()
174 .await
175 .context(error::StartFrontendSnafu)?;
176
177 self.flownode.start().await.context(StartFlownodeSnafu)?;
178
179 Ok(())
180 }
181
182 async fn stop(&mut self) -> Result<()> {
183 self.frontend
184 .shutdown()
185 .await
186 .context(error::ShutdownFrontendSnafu)?;
187
188 self.procedure_manager
189 .stop()
190 .await
191 .context(error::StopProcedureManagerSnafu)?;
192
193 self.datanode
194 .shutdown()
195 .await
196 .context(error::ShutdownDatanodeSnafu)?;
197
198 self.flownode
199 .shutdown()
200 .await
201 .context(error::ShutdownFlownodeSnafu)?;
202
203 info!("Datanode instance stopped.");
204
205 Ok(())
206 }
207}
208
209#[derive(Debug, Default, Parser)]
210pub struct StartCommand {
211 #[clap(long)]
212 http_addr: Option<String>,
213 #[clap(long, alias = "rpc-addr")]
214 rpc_bind_addr: Option<String>,
215 #[clap(long)]
216 mysql_addr: Option<String>,
217 #[clap(long)]
218 postgres_addr: Option<String>,
219 #[clap(short, long)]
220 influxdb_enable: bool,
221 #[clap(short, long)]
222 pub config_file: Option<String>,
223 #[clap(long)]
224 tls_mode: Option<TlsMode>,
225 #[clap(long)]
226 tls_cert_path: Option<String>,
227 #[clap(long)]
228 tls_key_path: Option<String>,
229 #[clap(long)]
230 user_provider: Option<String>,
231 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
232 pub env_prefix: String,
233 #[clap(long)]
235 data_home: Option<String>,
236}
237
238impl StartCommand {
239 pub fn load_options(
241 &self,
242 global_options: &GlobalOptions,
243 ) -> Result<GreptimeOptions<StandaloneOptions>> {
244 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
245 self.config_file.as_deref(),
246 self.env_prefix.as_ref(),
247 )
248 .context(error::LoadLayeredConfigSnafu)?;
249
250 self.merge_with_cli_options(global_options, &mut opts.component)?;
251 opts.component.sanitize();
252
253 Ok(opts)
254 }
255
256 pub fn merge_with_cli_options(
258 &self,
259 global_options: &GlobalOptions,
260 opts: &mut StandaloneOptions,
261 ) -> Result<()> {
262 if let Some(dir) = &global_options.log_dir {
263 opts.logging.dir.clone_from(dir);
264 }
265
266 if global_options.log_level.is_some() {
267 opts.logging.level.clone_from(&global_options.log_level);
268 }
269
270 opts.tracing = TracingOptions {
271 #[cfg(feature = "tokio-console")]
272 tokio_console_addr: global_options.tokio_console_addr.clone(),
273 };
274
275 let tls_opts = TlsOption::new(
276 self.tls_mode.clone(),
277 self.tls_cert_path.clone(),
278 self.tls_key_path.clone(),
279 );
280
281 if let Some(addr) = &self.http_addr {
282 opts.http.addr.clone_from(addr);
283 }
284
285 if let Some(data_home) = &self.data_home {
286 opts.storage.data_home.clone_from(data_home);
287 }
288
289 if opts.logging.dir.is_empty() {
291 opts.logging.dir = Path::new(&opts.storage.data_home)
292 .join(DEFAULT_LOGGING_DIR)
293 .to_string_lossy()
294 .to_string();
295 }
296
297 if let Some(addr) = &self.rpc_bind_addr {
298 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
300 if addr.eq(&datanode_grpc_addr) {
301 return error::IllegalConfigSnafu {
302 msg: format!(
303 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
304 ),
305 }.fail();
306 }
307 opts.grpc.bind_addr.clone_from(addr)
308 }
309
310 if let Some(addr) = &self.mysql_addr {
311 opts.mysql.enable = true;
312 opts.mysql.addr.clone_from(addr);
313 opts.mysql.tls = tls_opts.clone();
314 }
315
316 if let Some(addr) = &self.postgres_addr {
317 opts.postgres.enable = true;
318 opts.postgres.addr.clone_from(addr);
319 opts.postgres.tls = tls_opts;
320 }
321
322 if self.influxdb_enable {
323 opts.influxdb.enable = self.influxdb_enable;
324 }
325
326 if let Some(user_provider) = &self.user_provider {
327 opts.user_provider = Some(user_provider.clone());
328 }
329
330 Ok(())
331 }
332
333 #[allow(unreachable_code)]
334 #[allow(unused_variables)]
335 #[allow(clippy::diverging_sub_expression)]
336 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
338 common_runtime::init_global_runtimes(&opts.runtime);
339
340 let guard = common_telemetry::init_global_logging(
341 APP_NAME,
342 &opts.component.logging,
343 &opts.component.tracing,
344 None,
345 Some(&opts.component.slow_query),
346 );
347
348 log_versions(verbose_version(), short_version(), APP_NAME);
349 maybe_activate_heap_profile(&opts.component.memory);
350 create_resource_limit_metrics(APP_NAME);
351
352 info!("Standalone start command: {:#?}", self);
353 info!("Standalone options: {opts:#?}");
354
355 let mut plugins = Plugins::new();
356 let plugin_opts = opts.plugins;
357 let mut opts = opts.component;
358 opts.grpc.detect_server_addr();
359 let fe_opts = opts.frontend_options();
360 let dn_opts = opts.datanode_options();
361
362 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
363 .await
364 .context(error::StartFrontendSnafu)?;
365
366 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
367 .await
368 .context(error::StartDatanodeSnafu)?;
369
370 set_default_timezone(fe_opts.default_timezone.as_deref())
371 .context(error::InitTimezoneSnafu)?;
372
373 let data_home = &dn_opts.storage.data_home;
374 fs::create_dir_all(path::Path::new(data_home))
376 .context(error::CreateDirSnafu { dir: data_home })?;
377
378 let metadata_dir = metadata_store_dir(data_home);
379 let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
380 .context(error::BuildMetadataKvbackendSnafu)?;
381 let procedure_manager =
382 standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
383
384 plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
385 .await
386 .context(error::SetupStandalonePluginsSnafu)?;
387
388 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
390 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
391 let layered_cache_registry = Arc::new(
392 with_default_composite_cache_registry(
393 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
394 )
395 .context(error::BuildCacheRegistrySnafu)?
396 .build(),
397 );
398
399 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
400 builder.with_cache_registry(layered_cache_registry.clone());
401 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
402
403 let information_extension = Arc::new(StandaloneInformationExtension::new(
404 datanode.region_server(),
405 procedure_manager.clone(),
406 ));
407
408 plugins.insert::<InformationExtensionRef>(information_extension.clone());
409
410 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
411 let builder = KvBackendCatalogManagerBuilder::new(
412 information_extension.clone(),
413 kv_backend.clone(),
414 layered_cache_registry.clone(),
415 )
416 .with_procedure_manager(procedure_manager.clone())
417 .with_process_manager(process_manager.clone());
418 #[cfg(feature = "enterprise")]
419 let builder = if let Some(factories) = plugins.get() {
420 builder.with_extra_information_table_factories(factories)
421 } else {
422 builder
423 };
424 let catalog_manager = builder.build();
425
426 let table_metadata_manager =
427 Self::create_table_metadata_manager(kv_backend.clone()).await?;
428
429 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
430 let flownode_options = FlownodeOptions {
431 flow: opts.flow.clone(),
432 ..Default::default()
433 };
434
435 let (frontend_client, frontend_instance_handler) =
438 FrontendClient::from_empty_grpc_handler(opts.query.clone());
439 let frontend_client = Arc::new(frontend_client);
440 let flow_builder = FlownodeBuilder::new(
441 flownode_options,
442 plugins.clone(),
443 table_metadata_manager.clone(),
444 catalog_manager.clone(),
445 flow_metadata_manager.clone(),
446 frontend_client.clone(),
447 );
448 let flownode = flow_builder
449 .build()
450 .await
451 .map_err(BoxedError::new)
452 .context(error::OtherSnafu)?;
453
454 {
456 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
457 information_extension
458 .set_flow_streaming_engine(flow_streaming_engine)
459 .await;
460 }
461
462 let node_manager = Arc::new(StandaloneDatanodeManager {
463 region_server: datanode.region_server(),
464 flow_server: flownode.flow_engine(),
465 });
466
467 let table_id_sequence = Arc::new(
468 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
469 .initial(MIN_USER_TABLE_ID as u64)
470 .step(10)
471 .build(),
472 );
473 let flow_id_sequence = Arc::new(
474 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
475 .initial(MIN_USER_FLOW_ID as u64)
476 .step(10)
477 .build(),
478 );
479 let kafka_options = opts
480 .wal
481 .clone()
482 .try_into()
483 .context(error::InvalidWalProviderSnafu)?;
484 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
485 .await
486 .context(error::BuildWalOptionsAllocatorSnafu)?;
487 let wal_options_allocator = Arc::new(wal_options_allocator);
488 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
489 table_id_sequence,
490 wal_options_allocator.clone(),
491 ));
492 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
493 flow_id_sequence,
494 ));
495
496 let ddl_context = DdlContext {
497 node_manager: node_manager.clone(),
498 cache_invalidator: layered_cache_registry.clone(),
499 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
500 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
501 table_metadata_manager: table_metadata_manager.clone(),
502 table_metadata_allocator: table_metadata_allocator.clone(),
503 flow_metadata_manager: flow_metadata_manager.clone(),
504 flow_metadata_allocator: flow_metadata_allocator.clone(),
505 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
506 };
507
508 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
509 .context(error::InitDdlManagerSnafu)?;
510 #[cfg(feature = "enterprise")]
511 let ddl_manager = {
512 let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
513 plugins.get();
514 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
515 };
516
517 let procedure_executor = Arc::new(LocalProcedureExecutor::new(
518 Arc::new(ddl_manager),
519 procedure_manager.clone(),
520 ));
521
522 let fe_instance = FrontendBuilder::new(
523 fe_opts.clone(),
524 kv_backend.clone(),
525 layered_cache_registry.clone(),
526 catalog_manager.clone(),
527 node_manager.clone(),
528 procedure_executor.clone(),
529 process_manager,
530 )
531 .with_plugin(plugins.clone())
532 .try_build()
533 .await
534 .context(error::StartFrontendSnafu)?;
535 let fe_instance = Arc::new(fe_instance);
536
537 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
539 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
540 frontend_instance_handler
541 .lock()
542 .unwrap()
543 .replace(weak_grpc_handler);
544
545 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
547 let invoker = FrontendInvoker::build_from(
549 flow_streaming_engine.clone(),
550 catalog_manager.clone(),
551 kv_backend.clone(),
552 layered_cache_registry.clone(),
553 procedure_executor,
554 node_manager,
555 )
556 .await
557 .context(StartFlownodeSnafu)?;
558 flow_streaming_engine.set_frontend_invoker(invoker).await;
559
560 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
561 .context(error::ServersSnafu)?;
562
563 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
564 .build()
565 .context(error::StartFrontendSnafu)?;
566
567 let frontend = Frontend {
568 instance: fe_instance,
569 servers,
570 heartbeat_task: None,
571 export_metrics_task,
572 };
573
574 #[cfg(feature = "enterprise")]
575 let components = Components {
576 plugins,
577 kv_backend,
578 frontend_client,
579 catalog_manager,
580 };
581
582 Ok(Instance {
583 datanode,
584 frontend,
585 flownode,
586 procedure_manager,
587 wal_options_allocator,
588 #[cfg(feature = "enterprise")]
589 components,
590 _guard: guard,
591 })
592 }
593
594 pub async fn create_table_metadata_manager(
595 kv_backend: KvBackendRef,
596 ) -> Result<TableMetadataManagerRef> {
597 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
598
599 table_metadata_manager
600 .init()
601 .await
602 .context(error::InitMetadataSnafu)?;
603
604 Ok(table_metadata_manager)
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use std::default::Default;
611 use std::io::Write;
612 use std::time::Duration;
613
614 use auth::{Identity, Password, UserProviderRef};
615 use common_base::readable_size::ReadableSize;
616 use common_config::ENV_VAR_SEP;
617 use common_test_util::temp_dir::create_named_temp_file;
618 use common_wal::config::DatanodeWalConfig;
619 use frontend::frontend::FrontendOptions;
620 use object_store::config::{FileConfig, GcsConfig};
621 use servers::grpc::GrpcOptions;
622
623 use super::*;
624 use crate::options::GlobalOptions;
625
626 #[tokio::test]
627 async fn test_try_from_start_command_to_anymap() {
628 let fe_opts = FrontendOptions {
629 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
630 ..Default::default()
631 };
632
633 let mut plugins = Plugins::new();
634 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
635 .await
636 .unwrap();
637
638 let provider = plugins.get::<UserProviderRef>().unwrap();
639 let result = provider
640 .authenticate(
641 Identity::UserId("test", None),
642 Password::PlainText("test".to_string().into()),
643 )
644 .await;
645 let _ = result.unwrap();
646 }
647
648 #[test]
649 fn test_toml() {
650 let opts = StandaloneOptions::default();
651 let toml_string = toml::to_string(&opts).unwrap();
652 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
653 }
654
655 #[test]
656 fn test_read_from_config_file() {
657 let mut file = create_named_temp_file();
658 let toml_str = r#"
659 enable_memory_catalog = true
660
661 [wal]
662 provider = "raft_engine"
663 dir = "./greptimedb_data/test/wal"
664 file_size = "1GB"
665 purge_threshold = "50GB"
666 purge_interval = "10m"
667 read_batch_size = 128
668 sync_write = false
669
670 [storage]
671 data_home = "./greptimedb_data/"
672 type = "File"
673
674 [[storage.providers]]
675 type = "Gcs"
676 bucket = "foo"
677 endpoint = "bar"
678
679 [[storage.providers]]
680 type = "S3"
681 access_key_id = "access_key_id"
682 secret_access_key = "secret_access_key"
683
684 [storage.compaction]
685 max_inflight_tasks = 3
686 max_files_in_level0 = 7
687 max_purge_tasks = 32
688
689 [storage.manifest]
690 checkpoint_margin = 9
691 gc_duration = '7s'
692
693 [http]
694 addr = "127.0.0.1:4000"
695 timeout = "33s"
696 body_limit = "128MB"
697
698 [opentsdb]
699 enable = true
700
701 [logging]
702 level = "debug"
703 dir = "./greptimedb_data/test/logs"
704 "#;
705 write!(file, "{}", toml_str).unwrap();
706 let cmd = StartCommand {
707 config_file: Some(file.path().to_str().unwrap().to_string()),
708 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
709 ..Default::default()
710 };
711
712 let options = cmd
713 .load_options(&GlobalOptions::default())
714 .unwrap()
715 .component;
716 let fe_opts = options.frontend_options();
717 let dn_opts = options.datanode_options();
718 let logging_opts = options.logging;
719 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
720 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
721 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
722 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
723 assert!(fe_opts.mysql.enable);
724 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
725 assert_eq!(2, fe_opts.mysql.runtime_size);
726 assert_eq!(None, fe_opts.mysql.reject_no_database);
727 assert!(fe_opts.influxdb.enable);
728 assert!(fe_opts.opentsdb.enable);
729
730 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
731 unreachable!()
732 };
733 assert_eq!(
734 "./greptimedb_data/test/wal",
735 raft_engine_config.dir.unwrap()
736 );
737
738 assert!(matches!(
739 &dn_opts.storage.store,
740 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
741 ));
742 assert_eq!(dn_opts.storage.providers.len(), 2);
743 assert!(matches!(
744 dn_opts.storage.providers[0],
745 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
746 ));
747 match &dn_opts.storage.providers[1] {
748 object_store::config::ObjectStoreConfig::S3(s3_config) => {
749 assert_eq!(
750 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
751 format!("{:?}", s3_config.connection.access_key_id)
752 );
753 }
754 _ => {
755 unreachable!()
756 }
757 }
758
759 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
760 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
761 }
762
763 #[test]
764 fn test_load_log_options_from_cli() {
765 let cmd = StartCommand {
766 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
767 ..Default::default()
768 };
769
770 let opts = cmd
771 .load_options(&GlobalOptions {
772 log_dir: Some("./greptimedb_data/test/logs".to_string()),
773 log_level: Some("debug".to_string()),
774
775 #[cfg(feature = "tokio-console")]
776 tokio_console_addr: None,
777 })
778 .unwrap()
779 .component;
780
781 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
782 assert_eq!("debug", opts.logging.level.unwrap());
783 }
784
785 #[test]
786 fn test_config_precedence_order() {
787 let mut file = create_named_temp_file();
788 let toml_str = r#"
789 [http]
790 addr = "127.0.0.1:4000"
791
792 [logging]
793 level = "debug"
794 "#;
795 write!(file, "{}", toml_str).unwrap();
796
797 let env_prefix = "STANDALONE_UT";
798 temp_env::with_vars(
799 [
800 (
801 [
803 env_prefix.to_string(),
804 "logging".to_uppercase(),
805 "dir".to_uppercase(),
806 ]
807 .join(ENV_VAR_SEP),
808 Some("/other/log/dir"),
809 ),
810 (
811 [
813 env_prefix.to_string(),
814 "logging".to_uppercase(),
815 "level".to_uppercase(),
816 ]
817 .join(ENV_VAR_SEP),
818 Some("info"),
819 ),
820 (
821 [
823 env_prefix.to_string(),
824 "http".to_uppercase(),
825 "addr".to_uppercase(),
826 ]
827 .join(ENV_VAR_SEP),
828 Some("127.0.0.1:24000"),
829 ),
830 ],
831 || {
832 let command = StartCommand {
833 config_file: Some(file.path().to_str().unwrap().to_string()),
834 http_addr: Some("127.0.0.1:14000".to_string()),
835 env_prefix: env_prefix.to_string(),
836 ..Default::default()
837 };
838
839 let opts = command.load_options(&Default::default()).unwrap().component;
840
841 assert_eq!(opts.logging.dir, "/other/log/dir");
843
844 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
846
847 let fe_opts = opts.frontend_options();
849 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
850 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
851
852 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
854 },
855 );
856 }
857
858 #[test]
859 fn test_load_default_standalone_options() {
860 let options =
861 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
862 let default_options = StandaloneOptions::default();
863 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
864 assert_eq!(options.http, default_options.http);
865 assert_eq!(options.grpc, default_options.grpc);
866 assert_eq!(options.mysql, default_options.mysql);
867 assert_eq!(options.postgres, default_options.postgres);
868 assert_eq!(options.opentsdb, default_options.opentsdb);
869 assert_eq!(options.influxdb, default_options.influxdb);
870 assert_eq!(options.prom_store, default_options.prom_store);
871 assert_eq!(options.wal, default_options.wal);
872 assert_eq!(options.metadata_store, default_options.metadata_store);
873 assert_eq!(options.procedure, default_options.procedure);
874 assert_eq!(options.logging, default_options.logging);
875 assert_eq!(options.region_engine, default_options.region_engine);
876 }
877
878 #[test]
879 fn test_cache_config() {
880 let toml_str = r#"
881 [storage]
882 data_home = "test_data_home"
883 type = "S3"
884 [storage.cache_config]
885 enable_read_cache = true
886 "#;
887 let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
888 opts.sanitize();
889 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
890 assert_eq!(
891 opts.storage.store.cache_config().unwrap().cache_path,
892 "test_data_home"
893 );
894 }
895}