1use std::fmt::Debug;
16use std::net::SocketAddr;
17use std::path::Path;
18use std::sync::Arc;
19use std::{fs, path};
20
21use async_trait::async_trait;
22use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
25use catalog::process_manager::ProcessManager;
26use clap::Parser;
27use common_base::Plugins;
28use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
29use common_config::{Configurable, metadata_store_dir};
30use common_error::ext::BoxedError;
31use common_meta::cache::LayeredCacheRegistryBuilder;
32use common_meta::ddl::flow_meta::FlowMetadataAllocator;
33use common_meta::ddl::table_meta::TableMetadataAllocator;
34use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerRef};
36use common_meta::key::flow::FlowMetadataManager;
37use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
38use common_meta::kv_backend::KvBackendRef;
39use common_meta::node_manager::{FlownodeRef, NodeManagerRef};
40use common_meta::procedure_executor::{LocalProcedureExecutor, ProcedureExecutorRef};
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::{Sequence, SequenceBuilder};
44use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
45use common_procedure::ProcedureManagerRef;
46use common_query::prelude::set_default_prefix;
47use common_telemetry::info;
48use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
49use common_time::timezone::set_default_timezone;
50use common_version::{short_version, verbose_version};
51use datanode::config::DatanodeOptions;
52use datanode::datanode::{Datanode, DatanodeBuilder};
53use datanode::region_server::RegionServer;
54use flow::{
55 FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
56 GrpcQueryHandlerWithBoxedError,
57};
58use frontend::frontend::Frontend;
59use frontend::instance::StandaloneDatanodeManager;
60use frontend::instance::builder::FrontendBuilder;
61use frontend::server::Services;
62use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
63use plugins::PluginOptions;
64use plugins::frontend::context::{
65 CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
66};
67use plugins::standalone::context::DdlManagerConfigureContext;
68use servers::tls::{TlsMode, TlsOption, merge_tls_option};
69use snafu::ResultExt;
70use standalone::options::StandaloneOptions;
71use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
72use tracing_appender::non_blocking::WorkerGuard;
73
74use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
75use crate::options::{GlobalOptions, GreptimeOptions};
76use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
77
78pub const APP_NAME: &str = "greptime-standalone";
79
80#[derive(Parser)]
81pub struct Command {
82 #[clap(subcommand)]
83 subcmd: SubCommand,
84}
85
86impl Command {
87 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
88 self.subcmd.build(opts).await
89 }
90
91 pub fn load_options(
92 &self,
93 global_options: &GlobalOptions,
94 ) -> Result<GreptimeOptions<StandaloneOptions>> {
95 self.subcmd.load_options(global_options)
96 }
97}
98
99#[derive(Parser)]
100enum SubCommand {
101 Start(StartCommand),
102}
103
104impl SubCommand {
105 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
106 match self {
107 SubCommand::Start(cmd) => cmd.build(opts).await,
108 }
109 }
110
111 fn load_options(
112 &self,
113 global_options: &GlobalOptions,
114 ) -> Result<GreptimeOptions<StandaloneOptions>> {
115 match self {
116 SubCommand::Start(cmd) => cmd.load_options(global_options),
117 }
118 }
119}
120
121pub struct Instance {
122 datanode: Datanode,
123 frontend: Frontend,
124 flownode: FlownodeInstance,
125 procedure_manager: ProcedureManagerRef,
126 wal_provider: WalProviderRef,
127 _guard: Vec<WorkerGuard>,
129}
130
131impl Instance {
132 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
134 self.frontend.server_handlers().addr(name)
135 }
136
137 pub fn mut_frontend(&mut self) -> &mut Frontend {
140 &mut self.frontend
141 }
142
143 pub fn datanode(&self) -> &Datanode {
146 &self.datanode
147 }
148}
149
150#[async_trait]
151impl App for Instance {
152 fn name(&self) -> &str {
153 APP_NAME
154 }
155
156 async fn start(&mut self) -> Result<()> {
157 self.datanode.start_telemetry();
158
159 self.procedure_manager
160 .start()
161 .await
162 .context(error::StartProcedureManagerSnafu)?;
163
164 self.wal_provider
165 .start()
166 .await
167 .context(error::StartWalProviderSnafu)?;
168
169 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
170 .await
171 .context(error::StartFrontendSnafu)?;
172
173 self.frontend
174 .start()
175 .await
176 .context(error::StartFrontendSnafu)?;
177
178 self.flownode.start().await.context(StartFlownodeSnafu)?;
179
180 Ok(())
181 }
182
183 async fn stop(&mut self) -> Result<()> {
184 self.frontend
185 .shutdown()
186 .await
187 .context(error::ShutdownFrontendSnafu)?;
188
189 self.procedure_manager
190 .stop()
191 .await
192 .context(error::StopProcedureManagerSnafu)?;
193
194 self.datanode
195 .shutdown()
196 .await
197 .context(error::ShutdownDatanodeSnafu)?;
198
199 self.flownode
200 .shutdown()
201 .await
202 .context(error::ShutdownFlownodeSnafu)?;
203
204 info!("Datanode instance stopped.");
205
206 Ok(())
207 }
208}
209
210#[derive(Debug, Default, Parser)]
211pub struct StartCommand {
212 #[clap(long)]
213 http_addr: Option<String>,
214 #[clap(long, alias = "rpc-addr")]
215 rpc_bind_addr: Option<String>,
216 #[clap(long)]
217 mysql_addr: Option<String>,
218 #[clap(long)]
219 postgres_addr: Option<String>,
220 #[clap(short, long)]
221 influxdb_enable: bool,
222 #[clap(short, long)]
223 pub config_file: Option<String>,
224 #[clap(long)]
225 tls_mode: Option<TlsMode>,
226 #[clap(long)]
227 tls_cert_path: Option<String>,
228 #[clap(long)]
229 tls_key_path: Option<String>,
230 #[clap(long)]
231 tls_watch: bool,
232 #[clap(long)]
233 user_provider: Option<String>,
234 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
235 pub env_prefix: String,
236 #[clap(long)]
238 data_home: Option<String>,
239}
240
241impl StartCommand {
242 pub fn load_options(
244 &self,
245 global_options: &GlobalOptions,
246 ) -> Result<GreptimeOptions<StandaloneOptions>> {
247 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
248 self.config_file.as_deref(),
249 self.env_prefix.as_ref(),
250 )
251 .context(error::LoadLayeredConfigSnafu)?;
252
253 self.merge_with_cli_options(global_options, &mut opts.component)?;
254 opts.component.sanitize();
255
256 Ok(opts)
257 }
258
259 pub fn merge_with_cli_options(
261 &self,
262 global_options: &GlobalOptions,
263 opts: &mut StandaloneOptions,
264 ) -> Result<()> {
265 if let Some(dir) = &global_options.log_dir {
266 opts.logging.dir.clone_from(dir);
267 }
268
269 if global_options.log_level.is_some() {
270 opts.logging.level.clone_from(&global_options.log_level);
271 }
272
273 opts.tracing = TracingOptions {
274 #[cfg(feature = "tokio-console")]
275 tokio_console_addr: global_options.tokio_console_addr.clone(),
276 };
277
278 let tls_opts = TlsOption::new(
279 self.tls_mode,
280 self.tls_cert_path.clone(),
281 self.tls_key_path.clone(),
282 self.tls_watch,
283 );
284
285 if let Some(addr) = &self.http_addr {
286 opts.http.addr.clone_from(addr);
287 }
288
289 if let Some(data_home) = &self.data_home {
290 opts.storage.data_home.clone_from(data_home);
291 }
292
293 if opts.logging.dir.is_empty() {
295 opts.logging.dir = Path::new(&opts.storage.data_home)
296 .join(DEFAULT_LOGGING_DIR)
297 .to_string_lossy()
298 .to_string();
299 }
300
301 if let Some(addr) = &self.rpc_bind_addr {
302 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
304 if addr.eq(&datanode_grpc_addr) {
305 return error::IllegalConfigSnafu {
306 msg: format!(
307 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
308 ),
309 }.fail();
310 }
311 opts.grpc.bind_addr.clone_from(addr);
312 opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
313 }
314
315 if let Some(addr) = &self.mysql_addr {
316 opts.mysql.enable = true;
317 opts.mysql.addr.clone_from(addr);
318 opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
319 }
320
321 if let Some(addr) = &self.postgres_addr {
322 opts.postgres.enable = true;
323 opts.postgres.addr.clone_from(addr);
324 opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
325 }
326
327 if self.influxdb_enable {
328 opts.influxdb.enable = self.influxdb_enable;
329 }
330
331 if let Some(user_provider) = &self.user_provider {
332 opts.user_provider = Some(user_provider.clone());
333 }
334
335 Ok(())
336 }
337
338 #[allow(unreachable_code)]
339 #[allow(unused_variables)]
340 #[allow(clippy::diverging_sub_expression)]
341 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
343 common_runtime::init_global_runtimes(&opts.runtime);
344
345 let guard = common_telemetry::init_global_logging(
346 APP_NAME,
347 &opts.component.logging,
348 &opts.component.tracing,
349 None,
350 Some(&opts.component.slow_query),
351 );
352
353 log_versions(verbose_version(), short_version(), APP_NAME);
354 maybe_activate_heap_profile(&opts.component.memory);
355 create_resource_limit_metrics(APP_NAME);
356
357 info!("Standalone start command: {:#?}", self);
358 info!("Standalone options: {opts:#?}");
359
360 let (mut instance, _) =
361 Self::build_with(opts.component, opts.plugins, InstanceCreator::default()).await?;
362 instance._guard.extend(guard);
363 Ok(instance)
364 }
365
366 pub async fn build_with(
367 mut opts: StandaloneOptions,
368 plugin_opts: Vec<PluginOptions>,
369 creator: InstanceCreator,
370 ) -> Result<(Instance, InstanceCreatorResult)> {
371 let mut plugins = Plugins::new();
372 set_default_prefix(opts.default_column_prefix.as_deref())
373 .map_err(BoxedError::new)
374 .context(error::BuildCliSnafu)?;
375
376 opts.grpc.detect_server_addr();
377 let fe_opts = opts.frontend_options();
378 let dn_opts = opts.datanode_options();
379
380 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
381 .await
382 .context(error::StartFrontendSnafu)?;
383
384 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
385 .await
386 .context(error::StartDatanodeSnafu)?;
387
388 set_default_timezone(fe_opts.default_timezone.as_deref())
389 .context(error::InitTimezoneSnafu)?;
390
391 let data_home = &dn_opts.storage.data_home;
392 fs::create_dir_all(path::Path::new(data_home))
394 .context(error::CreateDirSnafu { dir: data_home })?;
395
396 let metadata_dir = metadata_store_dir(data_home);
397 let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
398 .context(error::BuildMetadataKvbackendSnafu)?;
399 let procedure_manager =
400 standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
401
402 plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
403 .await
404 .context(error::SetupStandalonePluginsSnafu)?;
405
406 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
408 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
409 let layered_cache_registry = Arc::new(
410 with_default_composite_cache_registry(
411 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
412 )
413 .context(error::BuildCacheRegistrySnafu)?
414 .build(),
415 );
416
417 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
418 builder.with_cache_registry(layered_cache_registry.clone());
419 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
420
421 let information_extension = Arc::new(StandaloneInformationExtension::new(
422 datanode.region_server(),
423 procedure_manager.clone(),
424 ));
425
426 plugins.insert::<InformationExtensionRef>(information_extension.clone());
427
428 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
429
430 let (frontend_client, frontend_instance_handler) =
433 FrontendClient::from_empty_grpc_handler(opts.query.clone());
434 let frontend_client = Arc::new(frontend_client);
435
436 let builder = KvBackendCatalogManagerBuilder::new(
437 information_extension.clone(),
438 kv_backend.clone(),
439 layered_cache_registry.clone(),
440 )
441 .with_procedure_manager(procedure_manager.clone())
442 .with_process_manager(process_manager.clone());
443 let builder = if let Some(configurator) =
444 plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
445 {
446 let ctx = StandaloneCatalogManagerConfigureContext {
447 fe_client: frontend_client.clone(),
448 };
449 let ctx = CatalogManagerConfigureContext::Standalone(ctx);
450 configurator
451 .configure(builder, ctx)
452 .await
453 .context(OtherSnafu)?
454 } else {
455 builder
456 };
457 let catalog_manager = builder.build();
458
459 let table_metadata_manager =
460 Self::create_table_metadata_manager(kv_backend.clone()).await?;
461
462 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
463 let flownode_options = FlownodeOptions {
464 flow: opts.flow.clone(),
465 ..Default::default()
466 };
467
468 let flow_builder = FlownodeBuilder::new(
469 flownode_options,
470 plugins.clone(),
471 table_metadata_manager.clone(),
472 catalog_manager.clone(),
473 flow_metadata_manager.clone(),
474 frontend_client.clone(),
475 );
476 let flownode = flow_builder
477 .build()
478 .await
479 .map_err(BoxedError::new)
480 .context(error::OtherSnafu)?;
481
482 {
484 information_extension
485 .set_flow_engine(flownode.flow_engine())
486 .await;
487 }
488
489 let node_manager = creator
490 .node_manager_creator
491 .create(
492 &kv_backend,
493 datanode.region_server(),
494 flownode.flow_engine(),
495 )
496 .await?;
497
498 let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend);
499 let flow_id_sequence = Arc::new(
500 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
501 .initial(MIN_USER_FLOW_ID as u64)
502 .step(10)
503 .build(),
504 );
505 let kafka_options = opts
506 .wal
507 .clone()
508 .try_into()
509 .context(error::InvalidWalProviderSnafu)?;
510 let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
511 .await
512 .context(error::BuildWalProviderSnafu)?;
513 let wal_provider = Arc::new(wal_provider);
514 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
515 table_id_allocator.clone(),
516 wal_provider.clone(),
517 ));
518 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
519 flow_id_sequence,
520 ));
521
522 let ddl_context = DdlContext {
523 node_manager: node_manager.clone(),
524 cache_invalidator: layered_cache_registry.clone(),
525 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
526 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
527 table_metadata_manager: table_metadata_manager.clone(),
528 table_metadata_allocator: table_metadata_allocator.clone(),
529 flow_metadata_manager: flow_metadata_manager.clone(),
530 flow_metadata_allocator: flow_metadata_allocator.clone(),
531 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
532 };
533
534 let ddl_manager = DdlManager::try_new(
535 ddl_context,
536 procedure_manager.clone(),
537 Arc::new(StandaloneRepartitionProcedureFactory),
538 true,
539 )
540 .context(error::InitDdlManagerSnafu)?;
541
542 let ddl_manager = if let Some(configurator) =
543 plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
544 {
545 let ctx = DdlManagerConfigureContext {
546 kv_backend: kv_backend.clone(),
547 fe_client: frontend_client.clone(),
548 catalog_manager: catalog_manager.clone(),
549 };
550 configurator
551 .configure(ddl_manager, ctx)
552 .await
553 .context(OtherSnafu)?
554 } else {
555 ddl_manager
556 };
557
558 let procedure_executor = creator
559 .procedure_executor_creator
560 .create(Arc::new(ddl_manager), procedure_manager.clone())
561 .await?;
562
563 let fe_instance = FrontendBuilder::new(
564 fe_opts.clone(),
565 kv_backend.clone(),
566 layered_cache_registry.clone(),
567 catalog_manager.clone(),
568 node_manager.clone(),
569 procedure_executor.clone(),
570 process_manager,
571 )
572 .with_plugin(plugins.clone())
573 .try_build()
574 .await
575 .context(error::StartFrontendSnafu)?;
576 let fe_instance = Arc::new(fe_instance);
577
578 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
580 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
581 frontend_instance_handler
582 .set_handler(weak_grpc_handler)
583 .await;
584
585 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
587 let invoker = FrontendInvoker::build_from(
589 flow_streaming_engine.clone(),
590 catalog_manager.clone(),
591 kv_backend.clone(),
592 layered_cache_registry.clone(),
593 procedure_executor,
594 node_manager.clone(),
595 )
596 .await
597 .context(StartFlownodeSnafu)?;
598 flow_streaming_engine.set_frontend_invoker(invoker).await;
599
600 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
601 .build()
602 .context(error::StartFrontendSnafu)?;
603
604 let frontend = Frontend {
605 instance: fe_instance,
606 servers,
607 heartbeat_task: None,
608 };
609
610 let instance = Instance {
611 datanode,
612 frontend,
613 flownode,
614 procedure_manager,
615 wal_provider,
616 _guard: vec![],
617 };
618 let result = InstanceCreatorResult {
619 kv_backend,
620 node_manager,
621 table_id_allocator,
622 };
623 Ok((instance, result))
624 }
625
626 pub async fn create_table_metadata_manager(
627 kv_backend: KvBackendRef,
628 ) -> Result<TableMetadataManagerRef> {
629 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
630
631 table_metadata_manager
632 .init()
633 .await
634 .context(error::InitMetadataSnafu)?;
635
636 Ok(table_metadata_manager)
637 }
638}
639
640#[async_trait]
641pub trait NodeManagerCreator {
642 async fn create(
643 &self,
644 kv_backend: &KvBackendRef,
645 region_server: RegionServer,
646 flow_server: FlownodeRef,
647 ) -> Result<NodeManagerRef>;
648}
649
650pub struct DefaultNodeManagerCreator;
651
652#[async_trait]
653impl NodeManagerCreator for DefaultNodeManagerCreator {
654 async fn create(
655 &self,
656 _: &KvBackendRef,
657 region_server: RegionServer,
658 flow_server: FlownodeRef,
659 ) -> Result<NodeManagerRef> {
660 Ok(Arc::new(StandaloneDatanodeManager {
661 region_server,
662 flow_server,
663 }))
664 }
665}
666
667pub trait TableIdAllocatorCreator {
668 fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
669}
670
671struct DefaultTableIdAllocatorCreator;
672
673impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
674 fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence> {
675 Arc::new(
676 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
677 .initial(MIN_USER_TABLE_ID as u64)
678 .step(10)
679 .build(),
680 )
681 }
682}
683
684#[async_trait]
685pub trait ProcedureExecutorCreator {
686 async fn create(
687 &self,
688 ddl_manager: DdlManagerRef,
689 procedure_manager: ProcedureManagerRef,
690 ) -> Result<ProcedureExecutorRef>;
691}
692
693pub struct DefaultProcedureExecutorCreator;
694
695#[async_trait]
696impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
697 async fn create(
698 &self,
699 ddl_manager: DdlManagerRef,
700 procedure_manager: ProcedureManagerRef,
701 ) -> Result<ProcedureExecutorRef> {
702 Ok(Arc::new(LocalProcedureExecutor::new(
703 ddl_manager,
704 procedure_manager,
705 )))
706 }
707}
708
709pub struct InstanceCreator {
712 node_manager_creator: Box<dyn NodeManagerCreator>,
713 table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
714 procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
715}
716
717impl InstanceCreator {
718 pub fn new(
719 node_manager_creator: Box<dyn NodeManagerCreator>,
720 table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
721 procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
722 ) -> Self {
723 Self {
724 node_manager_creator,
725 table_id_allocator_creator,
726 procedure_executor_creator,
727 }
728 }
729}
730
731impl Default for InstanceCreator {
732 fn default() -> Self {
733 Self {
734 node_manager_creator: Box::new(DefaultNodeManagerCreator),
735 table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
736 procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
737 }
738 }
739}
740
741pub struct InstanceCreatorResult {
744 pub kv_backend: KvBackendRef,
745 pub node_manager: NodeManagerRef,
746 pub table_id_allocator: Arc<Sequence>,
747}
748
749#[cfg(test)]
750mod tests {
751 use std::default::Default;
752 use std::io::Write;
753 use std::time::Duration;
754
755 use auth::{Identity, Password, UserProviderRef};
756 use common_base::readable_size::ReadableSize;
757 use common_config::ENV_VAR_SEP;
758 use common_test_util::temp_dir::create_named_temp_file;
759 use common_wal::config::DatanodeWalConfig;
760 use frontend::frontend::FrontendOptions;
761 use object_store::config::{FileConfig, GcsConfig};
762 use servers::grpc::GrpcOptions;
763
764 use super::*;
765 use crate::options::GlobalOptions;
766
767 #[tokio::test]
768 async fn test_try_from_start_command_to_anymap() {
769 let fe_opts = FrontendOptions {
770 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
771 ..Default::default()
772 };
773
774 let mut plugins = Plugins::new();
775 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
776 .await
777 .unwrap();
778
779 let provider = plugins.get::<UserProviderRef>().unwrap();
780 let result = provider
781 .authenticate(
782 Identity::UserId("test", None),
783 Password::PlainText("test".to_string().into()),
784 )
785 .await;
786 let _ = result.unwrap();
787 }
788
789 #[test]
790 fn test_toml() {
791 let opts = StandaloneOptions::default();
792 let toml_string = toml::to_string(&opts).unwrap();
793 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
794 }
795
796 #[test]
797 fn test_read_from_config_file() {
798 let mut file = create_named_temp_file();
799 let toml_str = r#"
800 enable_memory_catalog = true
801
802 [wal]
803 provider = "raft_engine"
804 dir = "./greptimedb_data/test/wal"
805 file_size = "1GB"
806 purge_threshold = "50GB"
807 purge_interval = "10m"
808 read_batch_size = 128
809 sync_write = false
810
811 [storage]
812 data_home = "./greptimedb_data/"
813 type = "File"
814
815 [[storage.providers]]
816 type = "Gcs"
817 bucket = "foo"
818 endpoint = "bar"
819
820 [[storage.providers]]
821 type = "S3"
822 access_key_id = "access_key_id"
823 secret_access_key = "secret_access_key"
824
825 [storage.compaction]
826 max_inflight_tasks = 3
827 max_files_in_level0 = 7
828 max_purge_tasks = 32
829
830 [storage.manifest]
831 checkpoint_margin = 9
832 gc_duration = '7s'
833
834 [http]
835 addr = "127.0.0.1:4000"
836 timeout = "33s"
837 body_limit = "128MB"
838
839 [opentsdb]
840 enable = true
841
842 [logging]
843 level = "debug"
844 dir = "./greptimedb_data/test/logs"
845 "#;
846 write!(file, "{}", toml_str).unwrap();
847 let cmd = StartCommand {
848 config_file: Some(file.path().to_str().unwrap().to_string()),
849 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
850 ..Default::default()
851 };
852
853 let options = cmd
854 .load_options(&GlobalOptions::default())
855 .unwrap()
856 .component;
857 let fe_opts = options.frontend_options();
858 let dn_opts = options.datanode_options();
859 let logging_opts = options.logging;
860 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
861 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
862 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
863 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
864 assert!(fe_opts.mysql.enable);
865 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
866 assert_eq!(2, fe_opts.mysql.runtime_size);
867 assert_eq!(None, fe_opts.mysql.reject_no_database);
868 assert!(fe_opts.influxdb.enable);
869 assert!(fe_opts.opentsdb.enable);
870
871 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
872 unreachable!()
873 };
874 assert_eq!(
875 "./greptimedb_data/test/wal",
876 raft_engine_config.dir.unwrap()
877 );
878
879 assert!(matches!(
880 &dn_opts.storage.store,
881 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
882 ));
883 assert_eq!(dn_opts.storage.providers.len(), 2);
884 assert!(matches!(
885 dn_opts.storage.providers[0],
886 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
887 ));
888 match &dn_opts.storage.providers[1] {
889 object_store::config::ObjectStoreConfig::S3(s3_config) => {
890 assert_eq!(
891 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
892 format!("{:?}", s3_config.connection.access_key_id)
893 );
894 }
895 _ => {
896 unreachable!()
897 }
898 }
899
900 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
901 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
902 }
903
904 #[test]
905 fn test_load_log_options_from_cli() {
906 let cmd = StartCommand {
907 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
908 mysql_addr: Some("127.0.0.1:4002".to_string()),
909 postgres_addr: Some("127.0.0.1:4003".to_string()),
910 ..Default::default()
911 };
912
913 let opts = cmd
914 .load_options(&GlobalOptions {
915 log_dir: Some("./greptimedb_data/test/logs".to_string()),
916 log_level: Some("debug".to_string()),
917
918 #[cfg(feature = "tokio-console")]
919 tokio_console_addr: None,
920 })
921 .unwrap()
922 .component;
923
924 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
925 assert_eq!("debug", opts.logging.level.unwrap());
926 }
927
928 #[test]
929 fn test_config_precedence_order() {
930 let mut file = create_named_temp_file();
931 let toml_str = r#"
932 [http]
933 addr = "127.0.0.1:4000"
934
935 [logging]
936 level = "debug"
937 "#;
938 write!(file, "{}", toml_str).unwrap();
939
940 let env_prefix = "STANDALONE_UT";
941 temp_env::with_vars(
942 [
943 (
944 [
946 env_prefix.to_string(),
947 "logging".to_uppercase(),
948 "dir".to_uppercase(),
949 ]
950 .join(ENV_VAR_SEP),
951 Some("/other/log/dir"),
952 ),
953 (
954 [
956 env_prefix.to_string(),
957 "logging".to_uppercase(),
958 "level".to_uppercase(),
959 ]
960 .join(ENV_VAR_SEP),
961 Some("info"),
962 ),
963 (
964 [
966 env_prefix.to_string(),
967 "http".to_uppercase(),
968 "addr".to_uppercase(),
969 ]
970 .join(ENV_VAR_SEP),
971 Some("127.0.0.1:24000"),
972 ),
973 ],
974 || {
975 let command = StartCommand {
976 config_file: Some(file.path().to_str().unwrap().to_string()),
977 http_addr: Some("127.0.0.1:14000".to_string()),
978 env_prefix: env_prefix.to_string(),
979 ..Default::default()
980 };
981
982 let opts = command.load_options(&Default::default()).unwrap().component;
983
984 assert_eq!(opts.logging.dir, "/other/log/dir");
986
987 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
989
990 let fe_opts = opts.frontend_options();
992 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
993 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
994
995 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
997 },
998 );
999 }
1000
1001 #[test]
1002 fn test_load_default_standalone_options() {
1003 let options =
1004 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1005 let default_options = StandaloneOptions::default();
1006 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1007 assert_eq!(options.http, default_options.http);
1008 assert_eq!(options.grpc, default_options.grpc);
1009 assert_eq!(options.mysql, default_options.mysql);
1010 assert_eq!(options.postgres, default_options.postgres);
1011 assert_eq!(options.opentsdb, default_options.opentsdb);
1012 assert_eq!(options.influxdb, default_options.influxdb);
1013 assert_eq!(options.prom_store, default_options.prom_store);
1014 assert_eq!(options.wal, default_options.wal);
1015 assert_eq!(options.metadata_store, default_options.metadata_store);
1016 assert_eq!(options.procedure, default_options.procedure);
1017 assert_eq!(options.logging, default_options.logging);
1018 assert_eq!(options.region_engine, default_options.region_engine);
1019 }
1020
1021 #[test]
1022 fn test_cache_config() {
1023 let toml_str = r#"
1024 [storage]
1025 data_home = "test_data_home"
1026 type = "S3"
1027 [storage.cache_config]
1028 enable_read_cache = true
1029 "#;
1030 let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
1031 opts.sanitize();
1032 assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
1033 assert_eq!(
1034 opts.storage.store.cache_config().unwrap().cache_path,
1035 "test_data_home"
1036 );
1037 }
1038}