1use std::fmt::Debug;
16use std::net::SocketAddr;
17use std::path::Path;
18use std::sync::Arc;
19use std::{fs, path};
20
21use async_trait::async_trait;
22use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
25use catalog::process_manager::ProcessManager;
26use clap::Parser;
27use common_base::Plugins;
28use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
29use common_config::{Configurable, metadata_store_dir};
30use common_error::ext::BoxedError;
31use common_meta::cache::LayeredCacheRegistryBuilder;
32use common_meta::ddl::flow_meta::FlowMetadataAllocator;
33use common_meta::ddl::table_meta::TableMetadataAllocator;
34use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
36use common_meta::key::flow::FlowMetadataManager;
37use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
38use common_meta::kv_backend::KvBackendRef;
39use common_meta::procedure_executor::LocalProcedureExecutor;
40use common_meta::region_keeper::MemoryRegionKeeper;
41use common_meta::region_registry::LeaderRegionRegistry;
42use common_meta::sequence::SequenceBuilder;
43use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
44use common_procedure::ProcedureManagerRef;
45use common_query::prelude::set_default_prefix;
46use common_telemetry::info;
47use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
48use common_time::timezone::set_default_timezone;
49use common_version::{short_version, verbose_version};
50use datanode::config::DatanodeOptions;
51use datanode::datanode::{Datanode, DatanodeBuilder};
52use flow::{
53 FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
54 GrpcQueryHandlerWithBoxedError,
55};
56use frontend::frontend::Frontend;
57use frontend::instance::StandaloneDatanodeManager;
58use frontend::instance::builder::FrontendBuilder;
59use frontend::server::Services;
60use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
61use plugins::frontend::context::{
62 CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
63};
64use plugins::standalone::context::DdlManagerConfigureContext;
65use servers::tls::{TlsMode, TlsOption, merge_tls_option};
66use snafu::ResultExt;
67use standalone::options::StandaloneOptions;
68use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
69use tracing_appender::non_blocking::WorkerGuard;
70
71use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
72use crate::options::{GlobalOptions, GreptimeOptions};
73use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
74
75pub const APP_NAME: &str = "greptime-standalone";
76
77#[derive(Parser)]
78pub struct Command {
79 #[clap(subcommand)]
80 subcmd: SubCommand,
81}
82
83impl Command {
84 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
85 self.subcmd.build(opts).await
86 }
87
88 pub fn load_options(
89 &self,
90 global_options: &GlobalOptions,
91 ) -> Result<GreptimeOptions<StandaloneOptions>> {
92 self.subcmd.load_options(global_options)
93 }
94}
95
96#[derive(Parser)]
97enum SubCommand {
98 Start(StartCommand),
99}
100
101impl SubCommand {
102 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
103 match self {
104 SubCommand::Start(cmd) => cmd.build(opts).await,
105 }
106 }
107
108 fn load_options(
109 &self,
110 global_options: &GlobalOptions,
111 ) -> Result<GreptimeOptions<StandaloneOptions>> {
112 match self {
113 SubCommand::Start(cmd) => cmd.load_options(global_options),
114 }
115 }
116}
117
118pub struct Instance {
119 datanode: Datanode,
120 frontend: Frontend,
121 flownode: FlownodeInstance,
122 procedure_manager: ProcedureManagerRef,
123 wal_provider: WalProviderRef,
124 _guard: Vec<WorkerGuard>,
126}
127
128impl Instance {
129 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
131 self.frontend.server_handlers().addr(name)
132 }
133}
134
135#[async_trait]
136impl App for Instance {
137 fn name(&self) -> &str {
138 APP_NAME
139 }
140
141 async fn start(&mut self) -> Result<()> {
142 self.datanode.start_telemetry();
143
144 self.procedure_manager
145 .start()
146 .await
147 .context(error::StartProcedureManagerSnafu)?;
148
149 self.wal_provider
150 .start()
151 .await
152 .context(error::StartWalProviderSnafu)?;
153
154 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
155 .await
156 .context(error::StartFrontendSnafu)?;
157
158 self.frontend
159 .start()
160 .await
161 .context(error::StartFrontendSnafu)?;
162
163 self.flownode.start().await.context(StartFlownodeSnafu)?;
164
165 Ok(())
166 }
167
168 async fn stop(&mut self) -> Result<()> {
169 self.frontend
170 .shutdown()
171 .await
172 .context(error::ShutdownFrontendSnafu)?;
173
174 self.procedure_manager
175 .stop()
176 .await
177 .context(error::StopProcedureManagerSnafu)?;
178
179 self.datanode
180 .shutdown()
181 .await
182 .context(error::ShutdownDatanodeSnafu)?;
183
184 self.flownode
185 .shutdown()
186 .await
187 .context(error::ShutdownFlownodeSnafu)?;
188
189 info!("Datanode instance stopped.");
190
191 Ok(())
192 }
193}
194
195#[derive(Debug, Default, Parser)]
196pub struct StartCommand {
197 #[clap(long)]
198 http_addr: Option<String>,
199 #[clap(long, alias = "rpc-addr")]
200 rpc_bind_addr: Option<String>,
201 #[clap(long)]
202 mysql_addr: Option<String>,
203 #[clap(long)]
204 postgres_addr: Option<String>,
205 #[clap(short, long)]
206 influxdb_enable: bool,
207 #[clap(short, long)]
208 pub config_file: Option<String>,
209 #[clap(long)]
210 tls_mode: Option<TlsMode>,
211 #[clap(long)]
212 tls_cert_path: Option<String>,
213 #[clap(long)]
214 tls_key_path: Option<String>,
215 #[clap(long)]
216 tls_watch: bool,
217 #[clap(long)]
218 user_provider: Option<String>,
219 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
220 pub env_prefix: String,
221 #[clap(long)]
223 data_home: Option<String>,
224}
225
226impl StartCommand {
227 pub fn load_options(
229 &self,
230 global_options: &GlobalOptions,
231 ) -> Result<GreptimeOptions<StandaloneOptions>> {
232 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
233 self.config_file.as_deref(),
234 self.env_prefix.as_ref(),
235 )
236 .context(error::LoadLayeredConfigSnafu)?;
237
238 self.merge_with_cli_options(global_options, &mut opts.component)?;
239 opts.component.sanitize();
240
241 Ok(opts)
242 }
243
244 pub fn merge_with_cli_options(
246 &self,
247 global_options: &GlobalOptions,
248 opts: &mut StandaloneOptions,
249 ) -> Result<()> {
250 if let Some(dir) = &global_options.log_dir {
251 opts.logging.dir.clone_from(dir);
252 }
253
254 if global_options.log_level.is_some() {
255 opts.logging.level.clone_from(&global_options.log_level);
256 }
257
258 opts.tracing = TracingOptions {
259 #[cfg(feature = "tokio-console")]
260 tokio_console_addr: global_options.tokio_console_addr.clone(),
261 };
262
263 let tls_opts = TlsOption::new(
264 self.tls_mode,
265 self.tls_cert_path.clone(),
266 self.tls_key_path.clone(),
267 self.tls_watch,
268 );
269
270 if let Some(addr) = &self.http_addr {
271 opts.http.addr.clone_from(addr);
272 }
273
274 if let Some(data_home) = &self.data_home {
275 opts.storage.data_home.clone_from(data_home);
276 }
277
278 if opts.logging.dir.is_empty() {
280 opts.logging.dir = Path::new(&opts.storage.data_home)
281 .join(DEFAULT_LOGGING_DIR)
282 .to_string_lossy()
283 .to_string();
284 }
285
286 if let Some(addr) = &self.rpc_bind_addr {
287 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
289 if addr.eq(&datanode_grpc_addr) {
290 return error::IllegalConfigSnafu {
291 msg: format!(
292 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
293 ),
294 }.fail();
295 }
296 opts.grpc.bind_addr.clone_from(addr);
297 opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
298 }
299
300 if let Some(addr) = &self.mysql_addr {
301 opts.mysql.enable = true;
302 opts.mysql.addr.clone_from(addr);
303 opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
304 }
305
306 if let Some(addr) = &self.postgres_addr {
307 opts.postgres.enable = true;
308 opts.postgres.addr.clone_from(addr);
309 opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
310 }
311
312 if self.influxdb_enable {
313 opts.influxdb.enable = self.influxdb_enable;
314 }
315
316 if let Some(user_provider) = &self.user_provider {
317 opts.user_provider = Some(user_provider.clone());
318 }
319
320 Ok(())
321 }
322
323 #[allow(unreachable_code)]
324 #[allow(unused_variables)]
325 #[allow(clippy::diverging_sub_expression)]
326 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
328 common_runtime::init_global_runtimes(&opts.runtime);
329
330 let guard = common_telemetry::init_global_logging(
331 APP_NAME,
332 &opts.component.logging,
333 &opts.component.tracing,
334 None,
335 Some(&opts.component.slow_query),
336 );
337
338 log_versions(verbose_version(), short_version(), APP_NAME);
339 maybe_activate_heap_profile(&opts.component.memory);
340 create_resource_limit_metrics(APP_NAME);
341
342 info!("Standalone start command: {:#?}", self);
343 info!("Standalone options: {opts:#?}");
344
345 let mut plugins = Plugins::new();
346 let plugin_opts = opts.plugins;
347 let mut opts = opts.component;
348 set_default_prefix(opts.default_column_prefix.as_deref())
349 .map_err(BoxedError::new)
350 .context(error::BuildCliSnafu)?;
351
352 opts.grpc.detect_server_addr();
353 let fe_opts = opts.frontend_options();
354 let dn_opts = opts.datanode_options();
355
356 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
357 .await
358 .context(error::StartFrontendSnafu)?;
359
360 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
361 .await
362 .context(error::StartDatanodeSnafu)?;
363
364 set_default_timezone(fe_opts.default_timezone.as_deref())
365 .context(error::InitTimezoneSnafu)?;
366
367 let data_home = &dn_opts.storage.data_home;
368 fs::create_dir_all(path::Path::new(data_home))
370 .context(error::CreateDirSnafu { dir: data_home })?;
371
372 let metadata_dir = metadata_store_dir(data_home);
373 let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
374 .context(error::BuildMetadataKvbackendSnafu)?;
375 let procedure_manager =
376 standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
377
378 plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
379 .await
380 .context(error::SetupStandalonePluginsSnafu)?;
381
382 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
384 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
385 let layered_cache_registry = Arc::new(
386 with_default_composite_cache_registry(
387 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
388 )
389 .context(error::BuildCacheRegistrySnafu)?
390 .build(),
391 );
392
393 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
394 builder.with_cache_registry(layered_cache_registry.clone());
395 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
396
397 let information_extension = Arc::new(StandaloneInformationExtension::new(
398 datanode.region_server(),
399 procedure_manager.clone(),
400 ));
401
402 plugins.insert::<InformationExtensionRef>(information_extension.clone());
403
404 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
405
406 let (frontend_client, frontend_instance_handler) =
409 FrontendClient::from_empty_grpc_handler(opts.query.clone());
410 let frontend_client = Arc::new(frontend_client);
411
412 let builder = KvBackendCatalogManagerBuilder::new(
413 information_extension.clone(),
414 kv_backend.clone(),
415 layered_cache_registry.clone(),
416 )
417 .with_procedure_manager(procedure_manager.clone())
418 .with_process_manager(process_manager.clone());
419 let builder = if let Some(configurator) =
420 plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
421 {
422 let ctx = StandaloneCatalogManagerConfigureContext {
423 fe_client: frontend_client.clone(),
424 };
425 let ctx = CatalogManagerConfigureContext::Standalone(ctx);
426 configurator
427 .configure(builder, ctx)
428 .await
429 .context(OtherSnafu)?
430 } else {
431 builder
432 };
433 let catalog_manager = builder.build();
434
435 let table_metadata_manager =
436 Self::create_table_metadata_manager(kv_backend.clone()).await?;
437
438 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
439 let flownode_options = FlownodeOptions {
440 flow: opts.flow.clone(),
441 ..Default::default()
442 };
443
444 let flow_builder = FlownodeBuilder::new(
445 flownode_options,
446 plugins.clone(),
447 table_metadata_manager.clone(),
448 catalog_manager.clone(),
449 flow_metadata_manager.clone(),
450 frontend_client.clone(),
451 );
452 let flownode = flow_builder
453 .build()
454 .await
455 .map_err(BoxedError::new)
456 .context(error::OtherSnafu)?;
457
458 {
460 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
461 information_extension
462 .set_flow_streaming_engine(flow_streaming_engine)
463 .await;
464 }
465
466 let node_manager = Arc::new(StandaloneDatanodeManager {
467 region_server: datanode.region_server(),
468 flow_server: flownode.flow_engine(),
469 });
470
471 let table_id_allocator = Arc::new(
472 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
473 .initial(MIN_USER_TABLE_ID as u64)
474 .step(10)
475 .build(),
476 );
477 let flow_id_sequence = Arc::new(
478 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
479 .initial(MIN_USER_FLOW_ID as u64)
480 .step(10)
481 .build(),
482 );
483 let kafka_options = opts
484 .wal
485 .clone()
486 .try_into()
487 .context(error::InvalidWalProviderSnafu)?;
488 let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
489 .await
490 .context(error::BuildWalProviderSnafu)?;
491 let wal_provider = Arc::new(wal_provider);
492 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
493 table_id_allocator,
494 wal_provider.clone(),
495 ));
496 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
497 flow_id_sequence,
498 ));
499
500 let ddl_context = DdlContext {
501 node_manager: node_manager.clone(),
502 cache_invalidator: layered_cache_registry.clone(),
503 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
504 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
505 table_metadata_manager: table_metadata_manager.clone(),
506 table_metadata_allocator: table_metadata_allocator.clone(),
507 flow_metadata_manager: flow_metadata_manager.clone(),
508 flow_metadata_allocator: flow_metadata_allocator.clone(),
509 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
510 };
511
512 let ddl_manager = DdlManager::try_new(
513 ddl_context,
514 procedure_manager.clone(),
515 Arc::new(StandaloneRepartitionProcedureFactory),
516 true,
517 )
518 .context(error::InitDdlManagerSnafu)?;
519
520 let ddl_manager = if let Some(configurator) =
521 plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
522 {
523 let ctx = DdlManagerConfigureContext {
524 kv_backend: kv_backend.clone(),
525 fe_client: frontend_client.clone(),
526 catalog_manager: catalog_manager.clone(),
527 };
528 configurator
529 .configure(ddl_manager, ctx)
530 .await
531 .context(OtherSnafu)?
532 } else {
533 ddl_manager
534 };
535
536 let procedure_executor = Arc::new(LocalProcedureExecutor::new(
537 Arc::new(ddl_manager),
538 procedure_manager.clone(),
539 ));
540
541 let fe_instance = FrontendBuilder::new(
542 fe_opts.clone(),
543 kv_backend.clone(),
544 layered_cache_registry.clone(),
545 catalog_manager.clone(),
546 node_manager.clone(),
547 procedure_executor.clone(),
548 process_manager,
549 )
550 .with_plugin(plugins.clone())
551 .try_build()
552 .await
553 .context(error::StartFrontendSnafu)?;
554 let fe_instance = Arc::new(fe_instance);
555
556 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
558 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
559 frontend_instance_handler
560 .set_handler(weak_grpc_handler)
561 .await;
562
563 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
565 let invoker = FrontendInvoker::build_from(
567 flow_streaming_engine.clone(),
568 catalog_manager.clone(),
569 kv_backend.clone(),
570 layered_cache_registry.clone(),
571 procedure_executor,
572 node_manager,
573 )
574 .await
575 .context(StartFlownodeSnafu)?;
576 flow_streaming_engine.set_frontend_invoker(invoker).await;
577
578 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
579 .build()
580 .context(error::StartFrontendSnafu)?;
581
582 let frontend = Frontend {
583 instance: fe_instance,
584 servers,
585 heartbeat_task: None,
586 };
587
588 Ok(Instance {
589 datanode,
590 frontend,
591 flownode,
592 procedure_manager,
593 wal_provider,
594 _guard: guard,
595 })
596 }
597
598 pub async fn create_table_metadata_manager(
599 kv_backend: KvBackendRef,
600 ) -> Result<TableMetadataManagerRef> {
601 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
602
603 table_metadata_manager
604 .init()
605 .await
606 .context(error::InitMetadataSnafu)?;
607
608 Ok(table_metadata_manager)
609 }
610}
611
612#[cfg(test)]
613mod tests {
614 use std::default::Default;
615 use std::io::Write;
616 use std::time::Duration;
617
618 use auth::{Identity, Password, UserProviderRef};
619 use common_base::readable_size::ReadableSize;
620 use common_config::ENV_VAR_SEP;
621 use common_test_util::temp_dir::create_named_temp_file;
622 use common_wal::config::DatanodeWalConfig;
623 use frontend::frontend::FrontendOptions;
624 use object_store::config::{FileConfig, GcsConfig};
625 use servers::grpc::GrpcOptions;
626
627 use super::*;
628 use crate::options::GlobalOptions;
629
630 #[tokio::test]
631 async fn test_try_from_start_command_to_anymap() {
632 let fe_opts = FrontendOptions {
633 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
634 ..Default::default()
635 };
636
637 let mut plugins = Plugins::new();
638 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
639 .await
640 .unwrap();
641
642 let provider = plugins.get::<UserProviderRef>().unwrap();
643 let result = provider
644 .authenticate(
645 Identity::UserId("test", None),
646 Password::PlainText("test".to_string().into()),
647 )
648 .await;
649 let _ = result.unwrap();
650 }
651
652 #[test]
653 fn test_toml() {
654 let opts = StandaloneOptions::default();
655 let toml_string = toml::to_string(&opts).unwrap();
656 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
657 }
658
659 #[test]
660 fn test_read_from_config_file() {
661 let mut file = create_named_temp_file();
662 let toml_str = r#"
663 enable_memory_catalog = true
664
665 [wal]
666 provider = "raft_engine"
667 dir = "./greptimedb_data/test/wal"
668 file_size = "1GB"
669 purge_threshold = "50GB"
670 purge_interval = "10m"
671 read_batch_size = 128
672 sync_write = false
673
674 [storage]
675 data_home = "./greptimedb_data/"
676 type = "File"
677
678 [[storage.providers]]
679 type = "Gcs"
680 bucket = "foo"
681 endpoint = "bar"
682
683 [[storage.providers]]
684 type = "S3"
685 access_key_id = "access_key_id"
686 secret_access_key = "secret_access_key"
687
688 [storage.compaction]
689 max_inflight_tasks = 3
690 max_files_in_level0 = 7
691 max_purge_tasks = 32
692
693 [storage.manifest]
694 checkpoint_margin = 9
695 gc_duration = '7s'
696
697 [http]
698 addr = "127.0.0.1:4000"
699 timeout = "33s"
700 body_limit = "128MB"
701
702 [opentsdb]
703 enable = true
704
705 [logging]
706 level = "debug"
707 dir = "./greptimedb_data/test/logs"
708 "#;
709 write!(file, "{}", toml_str).unwrap();
710 let cmd = StartCommand {
711 config_file: Some(file.path().to_str().unwrap().to_string()),
712 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
713 ..Default::default()
714 };
715
716 let options = cmd
717 .load_options(&GlobalOptions::default())
718 .unwrap()
719 .component;
720 let fe_opts = options.frontend_options();
721 let dn_opts = options.datanode_options();
722 let logging_opts = options.logging;
723 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
724 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
725 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
726 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
727 assert!(fe_opts.mysql.enable);
728 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
729 assert_eq!(2, fe_opts.mysql.runtime_size);
730 assert_eq!(None, fe_opts.mysql.reject_no_database);
731 assert!(fe_opts.influxdb.enable);
732 assert!(fe_opts.opentsdb.enable);
733
734 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
735 unreachable!()
736 };
737 assert_eq!(
738 "./greptimedb_data/test/wal",
739 raft_engine_config.dir.unwrap()
740 );
741
742 assert!(matches!(
743 &dn_opts.storage.store,
744 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
745 ));
746 assert_eq!(dn_opts.storage.providers.len(), 2);
747 assert!(matches!(
748 dn_opts.storage.providers[0],
749 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
750 ));
751 match &dn_opts.storage.providers[1] {
752 object_store::config::ObjectStoreConfig::S3(s3_config) => {
753 assert_eq!(
754 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
755 format!("{:?}", s3_config.connection.access_key_id)
756 );
757 }
758 _ => {
759 unreachable!()
760 }
761 }
762
763 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
764 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
765 }
766
767 #[test]
768 fn test_load_log_options_from_cli() {
769 let cmd = StartCommand {
770 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
771 mysql_addr: Some("127.0.0.1:4002".to_string()),
772 postgres_addr: Some("127.0.0.1:4003".to_string()),
773 ..Default::default()
774 };
775
776 let opts = cmd
777 .load_options(&GlobalOptions {
778 log_dir: Some("./greptimedb_data/test/logs".to_string()),
779 log_level: Some("debug".to_string()),
780
781 #[cfg(feature = "tokio-console")]
782 tokio_console_addr: None,
783 })
784 .unwrap()
785 .component;
786
787 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
788 assert_eq!("debug", opts.logging.level.unwrap());
789 }
790
791 #[test]
792 fn test_config_precedence_order() {
793 let mut file = create_named_temp_file();
794 let toml_str = r#"
795 [http]
796 addr = "127.0.0.1:4000"
797
798 [logging]
799 level = "debug"
800 "#;
801 write!(file, "{}", toml_str).unwrap();
802
803 let env_prefix = "STANDALONE_UT";
804 temp_env::with_vars(
805 [
806 (
807 [
809 env_prefix.to_string(),
810 "logging".to_uppercase(),
811 "dir".to_uppercase(),
812 ]
813 .join(ENV_VAR_SEP),
814 Some("/other/log/dir"),
815 ),
816 (
817 [
819 env_prefix.to_string(),
820 "logging".to_uppercase(),
821 "level".to_uppercase(),
822 ]
823 .join(ENV_VAR_SEP),
824 Some("info"),
825 ),
826 (
827 [
829 env_prefix.to_string(),
830 "http".to_uppercase(),
831 "addr".to_uppercase(),
832 ]
833 .join(ENV_VAR_SEP),
834 Some("127.0.0.1:24000"),
835 ),
836 ],
837 || {
838 let command = StartCommand {
839 config_file: Some(file.path().to_str().unwrap().to_string()),
840 http_addr: Some("127.0.0.1:14000".to_string()),
841 env_prefix: env_prefix.to_string(),
842 ..Default::default()
843 };
844
845 let opts = command.load_options(&Default::default()).unwrap().component;
846
847 assert_eq!(opts.logging.dir, "/other/log/dir");
849
850 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
852
853 let fe_opts = opts.frontend_options();
855 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
856 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
857
858 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
860 },
861 );
862 }
863
864 #[test]
865 fn test_load_default_standalone_options() {
866 let options =
867 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
868 let default_options = StandaloneOptions::default();
869 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
870 assert_eq!(options.http, default_options.http);
871 assert_eq!(options.grpc, default_options.grpc);
872 assert_eq!(options.mysql, default_options.mysql);
873 assert_eq!(options.postgres, default_options.postgres);
874 assert_eq!(options.opentsdb, default_options.opentsdb);
875 assert_eq!(options.influxdb, default_options.influxdb);
876 assert_eq!(options.prom_store, default_options.prom_store);
877 assert_eq!(options.wal, default_options.wal);
878 assert_eq!(options.metadata_store, default_options.metadata_store);
879 assert_eq!(options.procedure, default_options.procedure);
880 assert_eq!(options.logging, default_options.logging);
881 assert_eq!(options.region_engine, default_options.region_engine);
882 }
883
884 #[test]
885 fn test_cache_config() {
886 let toml_str = r#"
887 [storage]
888 data_home = "test_data_home"
889 type = "S3"
890 [storage.cache_config]
891 enable_read_cache = true
892 "#;
893 let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
894 opts.sanitize();
895 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
896 assert_eq!(
897 opts.storage.store.cache_config().unwrap().cache_path,
898 "test_data_home"
899 );
900 }
901}