1use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::kvbackend::KvBackendCatalogManagerBuilder;
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use common_base::Plugins;
26use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
27use common_config::{Configurable, metadata_store_dir};
28use common_error::ext::BoxedError;
29use common_meta::cache::LayeredCacheRegistryBuilder;
30use common_meta::ddl::flow_meta::FlowMetadataAllocator;
31use common_meta::ddl::table_meta::TableMetadataAllocator;
32use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
33use common_meta::ddl_manager::DdlManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
36use common_meta::kv_backend::KvBackendRef;
37use common_meta::procedure_executor::LocalProcedureExecutor;
38use common_meta::region_keeper::MemoryRegionKeeper;
39use common_meta::region_registry::LeaderRegionRegistry;
40use common_meta::sequence::SequenceBuilder;
41use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
42use common_procedure::ProcedureManagerRef;
43use common_telemetry::info;
44use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
45use common_time::timezone::set_default_timezone;
46use common_version::{short_version, verbose_version};
47use datanode::config::DatanodeOptions;
48use datanode::datanode::{Datanode, DatanodeBuilder};
49use flow::{
50 FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
51 GrpcQueryHandlerWithBoxedError,
52};
53use frontend::frontend::Frontend;
54use frontend::instance::StandaloneDatanodeManager;
55use frontend::instance::builder::FrontendBuilder;
56use frontend::server::Services;
57use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
58use servers::export_metrics::ExportMetricsTask;
59use servers::tls::{TlsMode, TlsOption};
60use snafu::ResultExt;
61use standalone::StandaloneInformationExtension;
62use standalone::options::StandaloneOptions;
63use tracing_appender::non_blocking::WorkerGuard;
64
65use crate::error::{Result, StartFlownodeSnafu};
66use crate::options::{GlobalOptions, GreptimeOptions};
67use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
68
69pub const APP_NAME: &str = "greptime-standalone";
70
71#[derive(Parser)]
72pub struct Command {
73 #[clap(subcommand)]
74 subcmd: SubCommand,
75}
76
77impl Command {
78 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
79 self.subcmd.build(opts).await
80 }
81
82 pub fn load_options(
83 &self,
84 global_options: &GlobalOptions,
85 ) -> Result<GreptimeOptions<StandaloneOptions>> {
86 self.subcmd.load_options(global_options)
87 }
88}
89
90#[derive(Parser)]
91enum SubCommand {
92 Start(StartCommand),
93}
94
95impl SubCommand {
96 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
97 match self {
98 SubCommand::Start(cmd) => cmd.build(opts).await,
99 }
100 }
101
102 fn load_options(
103 &self,
104 global_options: &GlobalOptions,
105 ) -> Result<GreptimeOptions<StandaloneOptions>> {
106 match self {
107 SubCommand::Start(cmd) => cmd.load_options(global_options),
108 }
109 }
110}
111
112pub struct Instance {
113 datanode: Datanode,
114 frontend: Frontend,
115 flownode: FlownodeInstance,
116 procedure_manager: ProcedureManagerRef,
117 wal_options_allocator: WalOptionsAllocatorRef,
118
119 #[cfg(feature = "enterprise")]
122 components: Components,
123
124 _guard: Vec<WorkerGuard>,
126}
127
128#[cfg(feature = "enterprise")]
129pub struct Components {
130 pub plugins: Plugins,
131 pub kv_backend: KvBackendRef,
132 pub frontend_client: Arc<FrontendClient>,
133 pub catalog_manager: catalog::CatalogManagerRef,
134}
135
136impl Instance {
137 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
139 self.frontend.server_handlers().addr(name)
140 }
141
142 #[cfg(feature = "enterprise")]
143 pub fn components(&self) -> &Components {
144 &self.components
145 }
146}
147
148#[async_trait]
149impl App for Instance {
150 fn name(&self) -> &str {
151 APP_NAME
152 }
153
154 async fn start(&mut self) -> Result<()> {
155 self.datanode.start_telemetry();
156
157 self.procedure_manager
158 .start()
159 .await
160 .context(error::StartProcedureManagerSnafu)?;
161
162 self.wal_options_allocator
163 .start()
164 .await
165 .context(error::StartWalOptionsAllocatorSnafu)?;
166
167 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
168 .await
169 .context(error::StartFrontendSnafu)?;
170
171 self.frontend
172 .start()
173 .await
174 .context(error::StartFrontendSnafu)?;
175
176 self.flownode.start().await.context(StartFlownodeSnafu)?;
177
178 Ok(())
179 }
180
181 async fn stop(&mut self) -> Result<()> {
182 self.frontend
183 .shutdown()
184 .await
185 .context(error::ShutdownFrontendSnafu)?;
186
187 self.procedure_manager
188 .stop()
189 .await
190 .context(error::StopProcedureManagerSnafu)?;
191
192 self.datanode
193 .shutdown()
194 .await
195 .context(error::ShutdownDatanodeSnafu)?;
196
197 self.flownode
198 .shutdown()
199 .await
200 .context(error::ShutdownFlownodeSnafu)?;
201
202 info!("Datanode instance stopped.");
203
204 Ok(())
205 }
206}
207
208#[derive(Debug, Default, Parser)]
209pub struct StartCommand {
210 #[clap(long)]
211 http_addr: Option<String>,
212 #[clap(long, alias = "rpc-addr")]
213 rpc_bind_addr: Option<String>,
214 #[clap(long)]
215 mysql_addr: Option<String>,
216 #[clap(long)]
217 postgres_addr: Option<String>,
218 #[clap(short, long)]
219 influxdb_enable: bool,
220 #[clap(short, long)]
221 pub config_file: Option<String>,
222 #[clap(long)]
223 tls_mode: Option<TlsMode>,
224 #[clap(long)]
225 tls_cert_path: Option<String>,
226 #[clap(long)]
227 tls_key_path: Option<String>,
228 #[clap(long)]
229 user_provider: Option<String>,
230 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
231 pub env_prefix: String,
232 #[clap(long)]
234 data_home: Option<String>,
235}
236
237impl StartCommand {
238 pub fn load_options(
240 &self,
241 global_options: &GlobalOptions,
242 ) -> Result<GreptimeOptions<StandaloneOptions>> {
243 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
244 self.config_file.as_deref(),
245 self.env_prefix.as_ref(),
246 )
247 .context(error::LoadLayeredConfigSnafu)?;
248
249 self.merge_with_cli_options(global_options, &mut opts.component)?;
250 opts.component.sanitize();
251
252 Ok(opts)
253 }
254
255 pub fn merge_with_cli_options(
257 &self,
258 global_options: &GlobalOptions,
259 opts: &mut StandaloneOptions,
260 ) -> Result<()> {
261 if let Some(dir) = &global_options.log_dir {
262 opts.logging.dir.clone_from(dir);
263 }
264
265 if global_options.log_level.is_some() {
266 opts.logging.level.clone_from(&global_options.log_level);
267 }
268
269 opts.tracing = TracingOptions {
270 #[cfg(feature = "tokio-console")]
271 tokio_console_addr: global_options.tokio_console_addr.clone(),
272 };
273
274 let tls_opts = TlsOption::new(
275 self.tls_mode.clone(),
276 self.tls_cert_path.clone(),
277 self.tls_key_path.clone(),
278 );
279
280 if let Some(addr) = &self.http_addr {
281 opts.http.addr.clone_from(addr);
282 }
283
284 if let Some(data_home) = &self.data_home {
285 opts.storage.data_home.clone_from(data_home);
286 }
287
288 if opts.logging.dir.is_empty() {
290 opts.logging.dir = Path::new(&opts.storage.data_home)
291 .join(DEFAULT_LOGGING_DIR)
292 .to_string_lossy()
293 .to_string();
294 }
295
296 if let Some(addr) = &self.rpc_bind_addr {
297 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
299 if addr.eq(&datanode_grpc_addr) {
300 return error::IllegalConfigSnafu {
301 msg: format!(
302 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
303 ),
304 }.fail();
305 }
306 opts.grpc.bind_addr.clone_from(addr)
307 }
308
309 if let Some(addr) = &self.mysql_addr {
310 opts.mysql.enable = true;
311 opts.mysql.addr.clone_from(addr);
312 opts.mysql.tls = tls_opts.clone();
313 }
314
315 if let Some(addr) = &self.postgres_addr {
316 opts.postgres.enable = true;
317 opts.postgres.addr.clone_from(addr);
318 opts.postgres.tls = tls_opts;
319 }
320
321 if self.influxdb_enable {
322 opts.influxdb.enable = self.influxdb_enable;
323 }
324
325 if let Some(user_provider) = &self.user_provider {
326 opts.user_provider = Some(user_provider.clone());
327 }
328
329 Ok(())
330 }
331
332 #[allow(unreachable_code)]
333 #[allow(unused_variables)]
334 #[allow(clippy::diverging_sub_expression)]
335 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
337 common_runtime::init_global_runtimes(&opts.runtime);
338
339 let guard = common_telemetry::init_global_logging(
340 APP_NAME,
341 &opts.component.logging,
342 &opts.component.tracing,
343 None,
344 Some(&opts.component.slow_query),
345 );
346
347 log_versions(verbose_version(), short_version(), APP_NAME);
348 maybe_activate_heap_profile(&opts.component.memory);
349 create_resource_limit_metrics(APP_NAME);
350
351 info!("Standalone start command: {:#?}", self);
352 info!("Standalone options: {opts:#?}");
353
354 let mut plugins = Plugins::new();
355 let plugin_opts = opts.plugins;
356 let mut opts = opts.component;
357 opts.grpc.detect_server_addr();
358 let fe_opts = opts.frontend_options();
359 let dn_opts = opts.datanode_options();
360
361 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
362 .await
363 .context(error::StartFrontendSnafu)?;
364
365 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
366 .await
367 .context(error::StartDatanodeSnafu)?;
368
369 set_default_timezone(fe_opts.default_timezone.as_deref())
370 .context(error::InitTimezoneSnafu)?;
371
372 let data_home = &dn_opts.storage.data_home;
373 fs::create_dir_all(path::Path::new(data_home))
375 .context(error::CreateDirSnafu { dir: data_home })?;
376
377 let metadata_dir = metadata_store_dir(data_home);
378 let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
379 .context(error::BuildMetadataKvbackendSnafu)?;
380 let procedure_manager =
381 standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
382
383 plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
384 .await
385 .context(error::SetupStandalonePluginsSnafu)?;
386
387 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
389 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
390 let layered_cache_registry = Arc::new(
391 with_default_composite_cache_registry(
392 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
393 )
394 .context(error::BuildCacheRegistrySnafu)?
395 .build(),
396 );
397
398 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
399 builder.with_cache_registry(layered_cache_registry.clone());
400 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
401
402 let information_extension = Arc::new(StandaloneInformationExtension::new(
403 datanode.region_server(),
404 procedure_manager.clone(),
405 ));
406
407 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
408 let builder = KvBackendCatalogManagerBuilder::new(
409 information_extension.clone(),
410 kv_backend.clone(),
411 layered_cache_registry.clone(),
412 )
413 .with_procedure_manager(procedure_manager.clone())
414 .with_process_manager(process_manager.clone());
415 #[cfg(feature = "enterprise")]
416 let builder = if let Some(factories) = plugins.get() {
417 builder.with_extra_information_table_factories(factories)
418 } else {
419 builder
420 };
421 let catalog_manager = builder.build();
422
423 let table_metadata_manager =
424 Self::create_table_metadata_manager(kv_backend.clone()).await?;
425
426 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
427 let flownode_options = FlownodeOptions {
428 flow: opts.flow.clone(),
429 ..Default::default()
430 };
431
432 let (frontend_client, frontend_instance_handler) =
435 FrontendClient::from_empty_grpc_handler(opts.query.clone());
436 let frontend_client = Arc::new(frontend_client);
437 let flow_builder = FlownodeBuilder::new(
438 flownode_options,
439 plugins.clone(),
440 table_metadata_manager.clone(),
441 catalog_manager.clone(),
442 flow_metadata_manager.clone(),
443 frontend_client.clone(),
444 );
445 let flownode = flow_builder
446 .build()
447 .await
448 .map_err(BoxedError::new)
449 .context(error::OtherSnafu)?;
450
451 {
453 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
454 information_extension
455 .set_flow_streaming_engine(flow_streaming_engine)
456 .await;
457 }
458
459 let node_manager = Arc::new(StandaloneDatanodeManager {
460 region_server: datanode.region_server(),
461 flow_server: flownode.flow_engine(),
462 });
463
464 let table_id_sequence = Arc::new(
465 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
466 .initial(MIN_USER_TABLE_ID as u64)
467 .step(10)
468 .build(),
469 );
470 let flow_id_sequence = Arc::new(
471 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
472 .initial(MIN_USER_FLOW_ID as u64)
473 .step(10)
474 .build(),
475 );
476 let kafka_options = opts.wal.clone().into();
477 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
478 .await
479 .context(error::BuildWalOptionsAllocatorSnafu)?;
480 let wal_options_allocator = Arc::new(wal_options_allocator);
481 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
482 table_id_sequence,
483 wal_options_allocator.clone(),
484 ));
485 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
486 flow_id_sequence,
487 ));
488
489 let ddl_context = DdlContext {
490 node_manager: node_manager.clone(),
491 cache_invalidator: layered_cache_registry.clone(),
492 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
493 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
494 table_metadata_manager: table_metadata_manager.clone(),
495 table_metadata_allocator: table_metadata_allocator.clone(),
496 flow_metadata_manager: flow_metadata_manager.clone(),
497 flow_metadata_allocator: flow_metadata_allocator.clone(),
498 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
499 };
500
501 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
502 .context(error::InitDdlManagerSnafu)?;
503 #[cfg(feature = "enterprise")]
504 let ddl_manager = {
505 let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
506 plugins.get();
507 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
508 };
509
510 let procedure_executor = Arc::new(LocalProcedureExecutor::new(
511 Arc::new(ddl_manager),
512 procedure_manager.clone(),
513 ));
514
515 let fe_instance = FrontendBuilder::new(
516 fe_opts.clone(),
517 kv_backend.clone(),
518 layered_cache_registry.clone(),
519 catalog_manager.clone(),
520 node_manager.clone(),
521 procedure_executor.clone(),
522 process_manager,
523 )
524 .with_plugin(plugins.clone())
525 .try_build()
526 .await
527 .context(error::StartFrontendSnafu)?;
528 let fe_instance = Arc::new(fe_instance);
529
530 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
532 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
533 frontend_instance_handler
534 .lock()
535 .unwrap()
536 .replace(weak_grpc_handler);
537
538 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
540 let invoker = FrontendInvoker::build_from(
542 flow_streaming_engine.clone(),
543 catalog_manager.clone(),
544 kv_backend.clone(),
545 layered_cache_registry.clone(),
546 procedure_executor,
547 node_manager,
548 )
549 .await
550 .context(StartFlownodeSnafu)?;
551 flow_streaming_engine.set_frontend_invoker(invoker).await;
552
553 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
554 .context(error::ServersSnafu)?;
555
556 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
557 .build()
558 .context(error::StartFrontendSnafu)?;
559
560 let frontend = Frontend {
561 instance: fe_instance,
562 servers,
563 heartbeat_task: None,
564 export_metrics_task,
565 };
566
567 #[cfg(feature = "enterprise")]
568 let components = Components {
569 plugins,
570 kv_backend,
571 frontend_client,
572 catalog_manager,
573 };
574
575 Ok(Instance {
576 datanode,
577 frontend,
578 flownode,
579 procedure_manager,
580 wal_options_allocator,
581 #[cfg(feature = "enterprise")]
582 components,
583 _guard: guard,
584 })
585 }
586
587 pub async fn create_table_metadata_manager(
588 kv_backend: KvBackendRef,
589 ) -> Result<TableMetadataManagerRef> {
590 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
591
592 table_metadata_manager
593 .init()
594 .await
595 .context(error::InitMetadataSnafu)?;
596
597 Ok(table_metadata_manager)
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use std::default::Default;
604 use std::io::Write;
605 use std::time::Duration;
606
607 use auth::{Identity, Password, UserProviderRef};
608 use common_base::readable_size::ReadableSize;
609 use common_config::ENV_VAR_SEP;
610 use common_test_util::temp_dir::create_named_temp_file;
611 use common_wal::config::DatanodeWalConfig;
612 use frontend::frontend::FrontendOptions;
613 use object_store::config::{FileConfig, GcsConfig};
614 use servers::grpc::GrpcOptions;
615
616 use super::*;
617 use crate::options::GlobalOptions;
618
619 #[tokio::test]
620 async fn test_try_from_start_command_to_anymap() {
621 let fe_opts = FrontendOptions {
622 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
623 ..Default::default()
624 };
625
626 let mut plugins = Plugins::new();
627 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
628 .await
629 .unwrap();
630
631 let provider = plugins.get::<UserProviderRef>().unwrap();
632 let result = provider
633 .authenticate(
634 Identity::UserId("test", None),
635 Password::PlainText("test".to_string().into()),
636 )
637 .await;
638 let _ = result.unwrap();
639 }
640
641 #[test]
642 fn test_toml() {
643 let opts = StandaloneOptions::default();
644 let toml_string = toml::to_string(&opts).unwrap();
645 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
646 }
647
648 #[test]
649 fn test_read_from_config_file() {
650 let mut file = create_named_temp_file();
651 let toml_str = r#"
652 enable_memory_catalog = true
653
654 [wal]
655 provider = "raft_engine"
656 dir = "./greptimedb_data/test/wal"
657 file_size = "1GB"
658 purge_threshold = "50GB"
659 purge_interval = "10m"
660 read_batch_size = 128
661 sync_write = false
662
663 [storage]
664 data_home = "./greptimedb_data/"
665 type = "File"
666
667 [[storage.providers]]
668 type = "Gcs"
669 bucket = "foo"
670 endpoint = "bar"
671
672 [[storage.providers]]
673 type = "S3"
674 access_key_id = "access_key_id"
675 secret_access_key = "secret_access_key"
676
677 [storage.compaction]
678 max_inflight_tasks = 3
679 max_files_in_level0 = 7
680 max_purge_tasks = 32
681
682 [storage.manifest]
683 checkpoint_margin = 9
684 gc_duration = '7s'
685
686 [http]
687 addr = "127.0.0.1:4000"
688 timeout = "33s"
689 body_limit = "128MB"
690
691 [opentsdb]
692 enable = true
693
694 [logging]
695 level = "debug"
696 dir = "./greptimedb_data/test/logs"
697 "#;
698 write!(file, "{}", toml_str).unwrap();
699 let cmd = StartCommand {
700 config_file: Some(file.path().to_str().unwrap().to_string()),
701 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
702 ..Default::default()
703 };
704
705 let options = cmd
706 .load_options(&GlobalOptions::default())
707 .unwrap()
708 .component;
709 let fe_opts = options.frontend_options();
710 let dn_opts = options.datanode_options();
711 let logging_opts = options.logging;
712 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
713 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
714 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
715 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
716 assert!(fe_opts.mysql.enable);
717 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
718 assert_eq!(2, fe_opts.mysql.runtime_size);
719 assert_eq!(None, fe_opts.mysql.reject_no_database);
720 assert!(fe_opts.influxdb.enable);
721 assert!(fe_opts.opentsdb.enable);
722
723 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
724 unreachable!()
725 };
726 assert_eq!(
727 "./greptimedb_data/test/wal",
728 raft_engine_config.dir.unwrap()
729 );
730
731 assert!(matches!(
732 &dn_opts.storage.store,
733 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
734 ));
735 assert_eq!(dn_opts.storage.providers.len(), 2);
736 assert!(matches!(
737 dn_opts.storage.providers[0],
738 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
739 ));
740 match &dn_opts.storage.providers[1] {
741 object_store::config::ObjectStoreConfig::S3(s3_config) => {
742 assert_eq!(
743 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
744 format!("{:?}", s3_config.connection.access_key_id)
745 );
746 }
747 _ => {
748 unreachable!()
749 }
750 }
751
752 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
753 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
754 }
755
756 #[test]
757 fn test_load_log_options_from_cli() {
758 let cmd = StartCommand {
759 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
760 ..Default::default()
761 };
762
763 let opts = cmd
764 .load_options(&GlobalOptions {
765 log_dir: Some("./greptimedb_data/test/logs".to_string()),
766 log_level: Some("debug".to_string()),
767
768 #[cfg(feature = "tokio-console")]
769 tokio_console_addr: None,
770 })
771 .unwrap()
772 .component;
773
774 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
775 assert_eq!("debug", opts.logging.level.unwrap());
776 }
777
778 #[test]
779 fn test_config_precedence_order() {
780 let mut file = create_named_temp_file();
781 let toml_str = r#"
782 [http]
783 addr = "127.0.0.1:4000"
784
785 [logging]
786 level = "debug"
787 "#;
788 write!(file, "{}", toml_str).unwrap();
789
790 let env_prefix = "STANDALONE_UT";
791 temp_env::with_vars(
792 [
793 (
794 [
796 env_prefix.to_string(),
797 "logging".to_uppercase(),
798 "dir".to_uppercase(),
799 ]
800 .join(ENV_VAR_SEP),
801 Some("/other/log/dir"),
802 ),
803 (
804 [
806 env_prefix.to_string(),
807 "logging".to_uppercase(),
808 "level".to_uppercase(),
809 ]
810 .join(ENV_VAR_SEP),
811 Some("info"),
812 ),
813 (
814 [
816 env_prefix.to_string(),
817 "http".to_uppercase(),
818 "addr".to_uppercase(),
819 ]
820 .join(ENV_VAR_SEP),
821 Some("127.0.0.1:24000"),
822 ),
823 ],
824 || {
825 let command = StartCommand {
826 config_file: Some(file.path().to_str().unwrap().to_string()),
827 http_addr: Some("127.0.0.1:14000".to_string()),
828 env_prefix: env_prefix.to_string(),
829 ..Default::default()
830 };
831
832 let opts = command.load_options(&Default::default()).unwrap().component;
833
834 assert_eq!(opts.logging.dir, "/other/log/dir");
836
837 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
839
840 let fe_opts = opts.frontend_options();
842 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
843 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
844
845 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
847 },
848 );
849 }
850
851 #[test]
852 fn test_load_default_standalone_options() {
853 let options =
854 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
855 let default_options = StandaloneOptions::default();
856 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
857 assert_eq!(options.http, default_options.http);
858 assert_eq!(options.grpc, default_options.grpc);
859 assert_eq!(options.mysql, default_options.mysql);
860 assert_eq!(options.postgres, default_options.postgres);
861 assert_eq!(options.opentsdb, default_options.opentsdb);
862 assert_eq!(options.influxdb, default_options.influxdb);
863 assert_eq!(options.prom_store, default_options.prom_store);
864 assert_eq!(options.wal, default_options.wal);
865 assert_eq!(options.metadata_store, default_options.metadata_store);
866 assert_eq!(options.procedure, default_options.procedure);
867 assert_eq!(options.logging, default_options.logging);
868 assert_eq!(options.region_engine, default_options.region_engine);
869 }
870
871 #[test]
872 fn test_cache_config() {
873 let toml_str = r#"
874 [storage]
875 data_home = "test_data_home"
876 type = "S3"
877 [storage.cache_config]
878 enable_read_cache = true
879 "#;
880 let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
881 opts.sanitize();
882 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
883 assert_eq!(
884 opts.storage.store.cache_config().unwrap().cache_path,
885 "test_data_home"
886 );
887 }
888}