1use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtensionRef;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{Configurable, metadata_store_dir};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::ddl::flow_meta::FlowMetadataAllocator;
32use common_meta::ddl::table_meta::TableMetadataAllocator;
33use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
34use common_meta::ddl_manager::DdlManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
37use common_meta::kv_backend::KvBackendRef;
38use common_meta::procedure_executor::LocalProcedureExecutor;
39use common_meta::region_keeper::MemoryRegionKeeper;
40use common_meta::region_registry::LeaderRegionRegistry;
41use common_meta::sequence::SequenceBuilder;
42use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
43use common_procedure::ProcedureManagerRef;
44use common_query::prelude::set_default_prefix;
45use common_telemetry::info;
46use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
47use common_time::timezone::set_default_timezone;
48use common_version::{short_version, verbose_version};
49use datanode::config::DatanodeOptions;
50use datanode::datanode::{Datanode, DatanodeBuilder};
51use flow::{
52 FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
53 GrpcQueryHandlerWithBoxedError,
54};
55use frontend::frontend::Frontend;
56use frontend::instance::StandaloneDatanodeManager;
57use frontend::instance::builder::FrontendBuilder;
58use frontend::server::Services;
59use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
60use servers::tls::{TlsMode, TlsOption};
61use snafu::ResultExt;
62use standalone::StandaloneInformationExtension;
63use standalone::options::StandaloneOptions;
64use tracing_appender::non_blocking::WorkerGuard;
65
66use crate::error::{Result, StartFlownodeSnafu};
67use crate::options::{GlobalOptions, GreptimeOptions};
68use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
69
70pub const APP_NAME: &str = "greptime-standalone";
71
72#[derive(Parser)]
73pub struct Command {
74 #[clap(subcommand)]
75 subcmd: SubCommand,
76}
77
78impl Command {
79 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
80 self.subcmd.build(opts).await
81 }
82
83 pub fn load_options(
84 &self,
85 global_options: &GlobalOptions,
86 ) -> Result<GreptimeOptions<StandaloneOptions>> {
87 self.subcmd.load_options(global_options)
88 }
89}
90
91#[derive(Parser)]
92enum SubCommand {
93 Start(StartCommand),
94}
95
96impl SubCommand {
97 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98 match self {
99 SubCommand::Start(cmd) => cmd.build(opts).await,
100 }
101 }
102
103 fn load_options(
104 &self,
105 global_options: &GlobalOptions,
106 ) -> Result<GreptimeOptions<StandaloneOptions>> {
107 match self {
108 SubCommand::Start(cmd) => cmd.load_options(global_options),
109 }
110 }
111}
112
113pub struct Instance {
114 datanode: Datanode,
115 frontend: Frontend,
116 flownode: FlownodeInstance,
117 procedure_manager: ProcedureManagerRef,
118 wal_options_allocator: WalOptionsAllocatorRef,
119
120 #[cfg(feature = "enterprise")]
123 components: Components,
124
125 _guard: Vec<WorkerGuard>,
127}
128
129#[cfg(feature = "enterprise")]
130pub struct Components {
131 pub plugins: Plugins,
132 pub kv_backend: KvBackendRef,
133 pub frontend_client: Arc<FrontendClient>,
134 pub catalog_manager: catalog::CatalogManagerRef,
135}
136
137impl Instance {
138 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
140 self.frontend.server_handlers().addr(name)
141 }
142
143 #[cfg(feature = "enterprise")]
144 pub fn components(&self) -> &Components {
145 &self.components
146 }
147}
148
149#[async_trait]
150impl App for Instance {
151 fn name(&self) -> &str {
152 APP_NAME
153 }
154
155 async fn start(&mut self) -> Result<()> {
156 self.datanode.start_telemetry();
157
158 self.procedure_manager
159 .start()
160 .await
161 .context(error::StartProcedureManagerSnafu)?;
162
163 self.wal_options_allocator
164 .start()
165 .await
166 .context(error::StartWalOptionsAllocatorSnafu)?;
167
168 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
169 .await
170 .context(error::StartFrontendSnafu)?;
171
172 self.frontend
173 .start()
174 .await
175 .context(error::StartFrontendSnafu)?;
176
177 self.flownode.start().await.context(StartFlownodeSnafu)?;
178
179 Ok(())
180 }
181
182 async fn stop(&mut self) -> Result<()> {
183 self.frontend
184 .shutdown()
185 .await
186 .context(error::ShutdownFrontendSnafu)?;
187
188 self.procedure_manager
189 .stop()
190 .await
191 .context(error::StopProcedureManagerSnafu)?;
192
193 self.datanode
194 .shutdown()
195 .await
196 .context(error::ShutdownDatanodeSnafu)?;
197
198 self.flownode
199 .shutdown()
200 .await
201 .context(error::ShutdownFlownodeSnafu)?;
202
203 info!("Datanode instance stopped.");
204
205 Ok(())
206 }
207}
208
209#[derive(Debug, Default, Parser)]
210pub struct StartCommand {
211 #[clap(long)]
212 http_addr: Option<String>,
213 #[clap(long, alias = "rpc-addr")]
214 rpc_bind_addr: Option<String>,
215 #[clap(long)]
216 mysql_addr: Option<String>,
217 #[clap(long)]
218 postgres_addr: Option<String>,
219 #[clap(short, long)]
220 influxdb_enable: bool,
221 #[clap(short, long)]
222 pub config_file: Option<String>,
223 #[clap(long)]
224 tls_mode: Option<TlsMode>,
225 #[clap(long)]
226 tls_cert_path: Option<String>,
227 #[clap(long)]
228 tls_key_path: Option<String>,
229 #[clap(long)]
230 tls_watch: bool,
231 #[clap(long)]
232 user_provider: Option<String>,
233 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
234 pub env_prefix: String,
235 #[clap(long)]
237 data_home: Option<String>,
238}
239
240impl StartCommand {
241 pub fn load_options(
243 &self,
244 global_options: &GlobalOptions,
245 ) -> Result<GreptimeOptions<StandaloneOptions>> {
246 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
247 self.config_file.as_deref(),
248 self.env_prefix.as_ref(),
249 )
250 .context(error::LoadLayeredConfigSnafu)?;
251
252 self.merge_with_cli_options(global_options, &mut opts.component)?;
253 opts.component.sanitize();
254
255 Ok(opts)
256 }
257
258 pub fn merge_with_cli_options(
260 &self,
261 global_options: &GlobalOptions,
262 opts: &mut StandaloneOptions,
263 ) -> Result<()> {
264 if let Some(dir) = &global_options.log_dir {
265 opts.logging.dir.clone_from(dir);
266 }
267
268 if global_options.log_level.is_some() {
269 opts.logging.level.clone_from(&global_options.log_level);
270 }
271
272 opts.tracing = TracingOptions {
273 #[cfg(feature = "tokio-console")]
274 tokio_console_addr: global_options.tokio_console_addr.clone(),
275 };
276
277 let tls_opts = TlsOption::new(
278 self.tls_mode.clone(),
279 self.tls_cert_path.clone(),
280 self.tls_key_path.clone(),
281 self.tls_watch,
282 );
283
284 if let Some(addr) = &self.http_addr {
285 opts.http.addr.clone_from(addr);
286 }
287
288 if let Some(data_home) = &self.data_home {
289 opts.storage.data_home.clone_from(data_home);
290 }
291
292 if opts.logging.dir.is_empty() {
294 opts.logging.dir = Path::new(&opts.storage.data_home)
295 .join(DEFAULT_LOGGING_DIR)
296 .to_string_lossy()
297 .to_string();
298 }
299
300 if let Some(addr) = &self.rpc_bind_addr {
301 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
303 if addr.eq(&datanode_grpc_addr) {
304 return error::IllegalConfigSnafu {
305 msg: format!(
306 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
307 ),
308 }.fail();
309 }
310 opts.grpc.bind_addr.clone_from(addr)
311 }
312
313 if let Some(addr) = &self.mysql_addr {
314 opts.mysql.enable = true;
315 opts.mysql.addr.clone_from(addr);
316 opts.mysql.tls = tls_opts.clone();
317 }
318
319 if let Some(addr) = &self.postgres_addr {
320 opts.postgres.enable = true;
321 opts.postgres.addr.clone_from(addr);
322 opts.postgres.tls = tls_opts;
323 }
324
325 if self.influxdb_enable {
326 opts.influxdb.enable = self.influxdb_enable;
327 }
328
329 if let Some(user_provider) = &self.user_provider {
330 opts.user_provider = Some(user_provider.clone());
331 }
332
333 Ok(())
334 }
335
336 #[allow(unreachable_code)]
337 #[allow(unused_variables)]
338 #[allow(clippy::diverging_sub_expression)]
339 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
341 common_runtime::init_global_runtimes(&opts.runtime);
342
343 let guard = common_telemetry::init_global_logging(
344 APP_NAME,
345 &opts.component.logging,
346 &opts.component.tracing,
347 None,
348 Some(&opts.component.slow_query),
349 );
350
351 log_versions(verbose_version(), short_version(), APP_NAME);
352 maybe_activate_heap_profile(&opts.component.memory);
353 create_resource_limit_metrics(APP_NAME);
354
355 info!("Standalone start command: {:#?}", self);
356 info!("Standalone options: {opts:#?}");
357
358 let mut plugins = Plugins::new();
359 let plugin_opts = opts.plugins;
360 let mut opts = opts.component;
361 set_default_prefix(opts.default_column_prefix.as_deref())
362 .map_err(BoxedError::new)
363 .context(error::BuildCliSnafu)?;
364
365 opts.grpc.detect_server_addr();
366 let fe_opts = opts.frontend_options();
367 let dn_opts = opts.datanode_options();
368
369 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
370 .await
371 .context(error::StartFrontendSnafu)?;
372
373 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
374 .await
375 .context(error::StartDatanodeSnafu)?;
376
377 set_default_timezone(fe_opts.default_timezone.as_deref())
378 .context(error::InitTimezoneSnafu)?;
379
380 let data_home = &dn_opts.storage.data_home;
381 fs::create_dir_all(path::Path::new(data_home))
383 .context(error::CreateDirSnafu { dir: data_home })?;
384
385 let metadata_dir = metadata_store_dir(data_home);
386 let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
387 .context(error::BuildMetadataKvbackendSnafu)?;
388 let procedure_manager =
389 standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
390
391 plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
392 .await
393 .context(error::SetupStandalonePluginsSnafu)?;
394
395 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
397 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
398 let layered_cache_registry = Arc::new(
399 with_default_composite_cache_registry(
400 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
401 )
402 .context(error::BuildCacheRegistrySnafu)?
403 .build(),
404 );
405
406 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
407 builder.with_cache_registry(layered_cache_registry.clone());
408 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
409
410 let information_extension = Arc::new(StandaloneInformationExtension::new(
411 datanode.region_server(),
412 procedure_manager.clone(),
413 ));
414
415 plugins.insert::<InformationExtensionRef>(information_extension.clone());
416
417 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
418 let builder = KvBackendCatalogManagerBuilder::new(
419 information_extension.clone(),
420 kv_backend.clone(),
421 layered_cache_registry.clone(),
422 )
423 .with_procedure_manager(procedure_manager.clone())
424 .with_process_manager(process_manager.clone());
425 #[cfg(feature = "enterprise")]
426 let builder = if let Some(factories) = plugins.get() {
427 builder.with_extra_information_table_factories(factories)
428 } else {
429 builder
430 };
431 let catalog_manager = builder.build();
432
433 let table_metadata_manager =
434 Self::create_table_metadata_manager(kv_backend.clone()).await?;
435
436 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
437 let flownode_options = FlownodeOptions {
438 flow: opts.flow.clone(),
439 ..Default::default()
440 };
441
442 let (frontend_client, frontend_instance_handler) =
445 FrontendClient::from_empty_grpc_handler(opts.query.clone());
446 let frontend_client = Arc::new(frontend_client);
447 let flow_builder = FlownodeBuilder::new(
448 flownode_options,
449 plugins.clone(),
450 table_metadata_manager.clone(),
451 catalog_manager.clone(),
452 flow_metadata_manager.clone(),
453 frontend_client.clone(),
454 );
455 let flownode = flow_builder
456 .build()
457 .await
458 .map_err(BoxedError::new)
459 .context(error::OtherSnafu)?;
460
461 {
463 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
464 information_extension
465 .set_flow_streaming_engine(flow_streaming_engine)
466 .await;
467 }
468
469 let node_manager = Arc::new(StandaloneDatanodeManager {
470 region_server: datanode.region_server(),
471 flow_server: flownode.flow_engine(),
472 });
473
474 let table_id_sequence = Arc::new(
475 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
476 .initial(MIN_USER_TABLE_ID as u64)
477 .step(10)
478 .build(),
479 );
480 let flow_id_sequence = Arc::new(
481 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
482 .initial(MIN_USER_FLOW_ID as u64)
483 .step(10)
484 .build(),
485 );
486 let kafka_options = opts
487 .wal
488 .clone()
489 .try_into()
490 .context(error::InvalidWalProviderSnafu)?;
491 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
492 .await
493 .context(error::BuildWalOptionsAllocatorSnafu)?;
494 let wal_options_allocator = Arc::new(wal_options_allocator);
495 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
496 table_id_sequence,
497 wal_options_allocator.clone(),
498 ));
499 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
500 flow_id_sequence,
501 ));
502
503 let ddl_context = DdlContext {
504 node_manager: node_manager.clone(),
505 cache_invalidator: layered_cache_registry.clone(),
506 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
507 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
508 table_metadata_manager: table_metadata_manager.clone(),
509 table_metadata_allocator: table_metadata_allocator.clone(),
510 flow_metadata_manager: flow_metadata_manager.clone(),
511 flow_metadata_allocator: flow_metadata_allocator.clone(),
512 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
513 };
514
515 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
516 .context(error::InitDdlManagerSnafu)?;
517 #[cfg(feature = "enterprise")]
518 let ddl_manager = {
519 let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
520 plugins.get();
521 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
522 };
523
524 let procedure_executor = Arc::new(LocalProcedureExecutor::new(
525 Arc::new(ddl_manager),
526 procedure_manager.clone(),
527 ));
528
529 let fe_instance = FrontendBuilder::new(
530 fe_opts.clone(),
531 kv_backend.clone(),
532 layered_cache_registry.clone(),
533 catalog_manager.clone(),
534 node_manager.clone(),
535 procedure_executor.clone(),
536 process_manager,
537 )
538 .with_plugin(plugins.clone())
539 .try_build()
540 .await
541 .context(error::StartFrontendSnafu)?;
542 let fe_instance = Arc::new(fe_instance);
543
544 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
546 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
547 frontend_instance_handler
548 .lock()
549 .unwrap()
550 .replace(weak_grpc_handler);
551
552 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
554 let invoker = FrontendInvoker::build_from(
556 flow_streaming_engine.clone(),
557 catalog_manager.clone(),
558 kv_backend.clone(),
559 layered_cache_registry.clone(),
560 procedure_executor,
561 node_manager,
562 )
563 .await
564 .context(StartFlownodeSnafu)?;
565 flow_streaming_engine.set_frontend_invoker(invoker).await;
566
567 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
568 .build()
569 .context(error::StartFrontendSnafu)?;
570
571 let frontend = Frontend {
572 instance: fe_instance,
573 servers,
574 heartbeat_task: None,
575 };
576
577 #[cfg(feature = "enterprise")]
578 let components = Components {
579 plugins,
580 kv_backend,
581 frontend_client,
582 catalog_manager,
583 };
584
585 Ok(Instance {
586 datanode,
587 frontend,
588 flownode,
589 procedure_manager,
590 wal_options_allocator,
591 #[cfg(feature = "enterprise")]
592 components,
593 _guard: guard,
594 })
595 }
596
597 pub async fn create_table_metadata_manager(
598 kv_backend: KvBackendRef,
599 ) -> Result<TableMetadataManagerRef> {
600 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
601
602 table_metadata_manager
603 .init()
604 .await
605 .context(error::InitMetadataSnafu)?;
606
607 Ok(table_metadata_manager)
608 }
609}
610
611#[cfg(test)]
612mod tests {
613 use std::default::Default;
614 use std::io::Write;
615 use std::time::Duration;
616
617 use auth::{Identity, Password, UserProviderRef};
618 use common_base::readable_size::ReadableSize;
619 use common_config::ENV_VAR_SEP;
620 use common_test_util::temp_dir::create_named_temp_file;
621 use common_wal::config::DatanodeWalConfig;
622 use frontend::frontend::FrontendOptions;
623 use object_store::config::{FileConfig, GcsConfig};
624 use servers::grpc::GrpcOptions;
625
626 use super::*;
627 use crate::options::GlobalOptions;
628
629 #[tokio::test]
630 async fn test_try_from_start_command_to_anymap() {
631 let fe_opts = FrontendOptions {
632 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
633 ..Default::default()
634 };
635
636 let mut plugins = Plugins::new();
637 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
638 .await
639 .unwrap();
640
641 let provider = plugins.get::<UserProviderRef>().unwrap();
642 let result = provider
643 .authenticate(
644 Identity::UserId("test", None),
645 Password::PlainText("test".to_string().into()),
646 )
647 .await;
648 let _ = result.unwrap();
649 }
650
651 #[test]
652 fn test_toml() {
653 let opts = StandaloneOptions::default();
654 let toml_string = toml::to_string(&opts).unwrap();
655 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
656 }
657
658 #[test]
659 fn test_read_from_config_file() {
660 let mut file = create_named_temp_file();
661 let toml_str = r#"
662 enable_memory_catalog = true
663
664 [wal]
665 provider = "raft_engine"
666 dir = "./greptimedb_data/test/wal"
667 file_size = "1GB"
668 purge_threshold = "50GB"
669 purge_interval = "10m"
670 read_batch_size = 128
671 sync_write = false
672
673 [storage]
674 data_home = "./greptimedb_data/"
675 type = "File"
676
677 [[storage.providers]]
678 type = "Gcs"
679 bucket = "foo"
680 endpoint = "bar"
681
682 [[storage.providers]]
683 type = "S3"
684 access_key_id = "access_key_id"
685 secret_access_key = "secret_access_key"
686
687 [storage.compaction]
688 max_inflight_tasks = 3
689 max_files_in_level0 = 7
690 max_purge_tasks = 32
691
692 [storage.manifest]
693 checkpoint_margin = 9
694 gc_duration = '7s'
695
696 [http]
697 addr = "127.0.0.1:4000"
698 timeout = "33s"
699 body_limit = "128MB"
700
701 [opentsdb]
702 enable = true
703
704 [logging]
705 level = "debug"
706 dir = "./greptimedb_data/test/logs"
707 "#;
708 write!(file, "{}", toml_str).unwrap();
709 let cmd = StartCommand {
710 config_file: Some(file.path().to_str().unwrap().to_string()),
711 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
712 ..Default::default()
713 };
714
715 let options = cmd
716 .load_options(&GlobalOptions::default())
717 .unwrap()
718 .component;
719 let fe_opts = options.frontend_options();
720 let dn_opts = options.datanode_options();
721 let logging_opts = options.logging;
722 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
723 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
724 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
725 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
726 assert!(fe_opts.mysql.enable);
727 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
728 assert_eq!(2, fe_opts.mysql.runtime_size);
729 assert_eq!(None, fe_opts.mysql.reject_no_database);
730 assert!(fe_opts.influxdb.enable);
731 assert!(fe_opts.opentsdb.enable);
732
733 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
734 unreachable!()
735 };
736 assert_eq!(
737 "./greptimedb_data/test/wal",
738 raft_engine_config.dir.unwrap()
739 );
740
741 assert!(matches!(
742 &dn_opts.storage.store,
743 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
744 ));
745 assert_eq!(dn_opts.storage.providers.len(), 2);
746 assert!(matches!(
747 dn_opts.storage.providers[0],
748 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
749 ));
750 match &dn_opts.storage.providers[1] {
751 object_store::config::ObjectStoreConfig::S3(s3_config) => {
752 assert_eq!(
753 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
754 format!("{:?}", s3_config.connection.access_key_id)
755 );
756 }
757 _ => {
758 unreachable!()
759 }
760 }
761
762 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
763 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
764 }
765
766 #[test]
767 fn test_load_log_options_from_cli() {
768 let cmd = StartCommand {
769 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
770 mysql_addr: Some("127.0.0.1:4002".to_string()),
771 postgres_addr: Some("127.0.0.1:4003".to_string()),
772 tls_watch: true,
773 ..Default::default()
774 };
775
776 let opts = cmd
777 .load_options(&GlobalOptions {
778 log_dir: Some("./greptimedb_data/test/logs".to_string()),
779 log_level: Some("debug".to_string()),
780
781 #[cfg(feature = "tokio-console")]
782 tokio_console_addr: None,
783 })
784 .unwrap()
785 .component;
786
787 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
788 assert_eq!("debug", opts.logging.level.unwrap());
789 assert!(opts.mysql.tls.watch);
790 assert!(opts.postgres.tls.watch);
791 }
792
793 #[test]
794 fn test_config_precedence_order() {
795 let mut file = create_named_temp_file();
796 let toml_str = r#"
797 [http]
798 addr = "127.0.0.1:4000"
799
800 [logging]
801 level = "debug"
802 "#;
803 write!(file, "{}", toml_str).unwrap();
804
805 let env_prefix = "STANDALONE_UT";
806 temp_env::with_vars(
807 [
808 (
809 [
811 env_prefix.to_string(),
812 "logging".to_uppercase(),
813 "dir".to_uppercase(),
814 ]
815 .join(ENV_VAR_SEP),
816 Some("/other/log/dir"),
817 ),
818 (
819 [
821 env_prefix.to_string(),
822 "logging".to_uppercase(),
823 "level".to_uppercase(),
824 ]
825 .join(ENV_VAR_SEP),
826 Some("info"),
827 ),
828 (
829 [
831 env_prefix.to_string(),
832 "http".to_uppercase(),
833 "addr".to_uppercase(),
834 ]
835 .join(ENV_VAR_SEP),
836 Some("127.0.0.1:24000"),
837 ),
838 ],
839 || {
840 let command = StartCommand {
841 config_file: Some(file.path().to_str().unwrap().to_string()),
842 http_addr: Some("127.0.0.1:14000".to_string()),
843 env_prefix: env_prefix.to_string(),
844 ..Default::default()
845 };
846
847 let opts = command.load_options(&Default::default()).unwrap().component;
848
849 assert_eq!(opts.logging.dir, "/other/log/dir");
851
852 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
854
855 let fe_opts = opts.frontend_options();
857 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
858 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
859
860 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
862 },
863 );
864 }
865
866 #[test]
867 fn test_load_default_standalone_options() {
868 let options =
869 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
870 let default_options = StandaloneOptions::default();
871 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
872 assert_eq!(options.http, default_options.http);
873 assert_eq!(options.grpc, default_options.grpc);
874 assert_eq!(options.mysql, default_options.mysql);
875 assert_eq!(options.postgres, default_options.postgres);
876 assert_eq!(options.opentsdb, default_options.opentsdb);
877 assert_eq!(options.influxdb, default_options.influxdb);
878 assert_eq!(options.prom_store, default_options.prom_store);
879 assert_eq!(options.wal, default_options.wal);
880 assert_eq!(options.metadata_store, default_options.metadata_store);
881 assert_eq!(options.procedure, default_options.procedure);
882 assert_eq!(options.logging, default_options.logging);
883 assert_eq!(options.region_engine, default_options.region_engine);
884 }
885
886 #[test]
887 fn test_cache_config() {
888 let toml_str = r#"
889 [storage]
890 data_home = "test_data_home"
891 type = "S3"
892 [storage.cache_config]
893 enable_read_cache = true
894 "#;
895 let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
896 opts.sanitize();
897 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
898 assert_eq!(
899 opts.storage.store.cache_config().unwrap().cache_path,
900 "test_data_home"
901 );
902 }
903}