1use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtension;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use client::api::v1::meta::RegionRole;
27use common_base::readable_size::ReadableSize;
28use common_base::Plugins;
29use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
30use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
31use common_error::ext::BoxedError;
32use common_meta::cache::LayeredCacheRegistryBuilder;
33use common_meta::cluster::{NodeInfo, NodeStatus};
34use common_meta::datanode::RegionStat;
35use common_meta::ddl::flow_meta::FlowMetadataAllocator;
36use common_meta::ddl::table_meta::TableMetadataAllocator;
37use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
38use common_meta::ddl_manager::DdlManager;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::peer::Peer;
44use common_meta::procedure_executor::LocalProcedureExecutor;
45use common_meta::region_keeper::MemoryRegionKeeper;
46use common_meta::region_registry::LeaderRegionRegistry;
47use common_meta::sequence::SequenceBuilder;
48use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
49use common_options::memory::MemoryOptions;
50use common_procedure::{ProcedureInfo, ProcedureManagerRef};
51use common_telemetry::info;
52use common_telemetry::logging::{
53 LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
54};
55use common_time::timezone::set_default_timezone;
56use common_version::{short_version, verbose_version};
57use common_wal::config::DatanodeWalConfig;
58use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
59use datanode::datanode::{Datanode, DatanodeBuilder};
60use datanode::region_server::RegionServer;
61use file_engine::config::EngineConfig as FileEngineConfig;
62use flow::{
63 FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
64 FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
65};
66use frontend::frontend::{Frontend, FrontendOptions};
67use frontend::instance::builder::FrontendBuilder;
68use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
69use frontend::server::Services;
70use frontend::service_config::{
71 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
72 PromStoreOptions,
73};
74use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
75use mito2::config::MitoConfig;
76use query::options::QueryOptions;
77use serde::{Deserialize, Serialize};
78use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
79use servers::grpc::GrpcOptions;
80use servers::http::HttpOptions;
81use servers::tls::{TlsMode, TlsOption};
82use snafu::ResultExt;
83use tokio::sync::RwLock;
84use tracing_appender::non_blocking::WorkerGuard;
85
86use crate::error::{Result, StartFlownodeSnafu};
87use crate::options::{GlobalOptions, GreptimeOptions};
88use crate::{create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile, App};
89
90pub const APP_NAME: &str = "greptime-standalone";
91
92#[derive(Parser)]
93pub struct Command {
94 #[clap(subcommand)]
95 subcmd: SubCommand,
96}
97
98impl Command {
99 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
100 self.subcmd.build(opts).await
101 }
102
103 pub fn load_options(
104 &self,
105 global_options: &GlobalOptions,
106 ) -> Result<GreptimeOptions<StandaloneOptions>> {
107 self.subcmd.load_options(global_options)
108 }
109}
110
111#[derive(Parser)]
112enum SubCommand {
113 Start(StartCommand),
114}
115
116impl SubCommand {
117 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
118 match self {
119 SubCommand::Start(cmd) => cmd.build(opts).await,
120 }
121 }
122
123 fn load_options(
124 &self,
125 global_options: &GlobalOptions,
126 ) -> Result<GreptimeOptions<StandaloneOptions>> {
127 match self {
128 SubCommand::Start(cmd) => cmd.load_options(global_options),
129 }
130 }
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134#[serde(default)]
135pub struct StandaloneOptions {
136 pub enable_telemetry: bool,
137 pub default_timezone: Option<String>,
138 pub http: HttpOptions,
139 pub grpc: GrpcOptions,
140 pub mysql: MysqlOptions,
141 pub postgres: PostgresOptions,
142 pub opentsdb: OpentsdbOptions,
143 pub influxdb: InfluxdbOptions,
144 pub jaeger: JaegerOptions,
145 pub prom_store: PromStoreOptions,
146 pub wal: DatanodeWalConfig,
147 pub storage: StorageConfig,
148 pub metadata_store: KvBackendConfig,
149 pub procedure: ProcedureConfig,
150 pub flow: FlowConfig,
151 pub logging: LoggingOptions,
152 pub user_provider: Option<String>,
153 pub region_engine: Vec<RegionEngineConfig>,
155 pub export_metrics: ExportMetricsOption,
156 pub tracing: TracingOptions,
157 pub init_regions_in_background: bool,
158 pub init_regions_parallelism: usize,
159 pub max_in_flight_write_bytes: Option<ReadableSize>,
160 pub slow_query: SlowQueryOptions,
161 pub query: QueryOptions,
162 pub memory: MemoryOptions,
163}
164
165impl Default for StandaloneOptions {
166 fn default() -> Self {
167 Self {
168 enable_telemetry: true,
169 default_timezone: None,
170 http: HttpOptions::default(),
171 grpc: GrpcOptions::default(),
172 mysql: MysqlOptions::default(),
173 postgres: PostgresOptions::default(),
174 opentsdb: OpentsdbOptions::default(),
175 influxdb: InfluxdbOptions::default(),
176 jaeger: JaegerOptions::default(),
177 prom_store: PromStoreOptions::default(),
178 wal: DatanodeWalConfig::default(),
179 storage: StorageConfig::default(),
180 metadata_store: KvBackendConfig::default(),
181 procedure: ProcedureConfig::default(),
182 flow: FlowConfig::default(),
183 logging: LoggingOptions::default(),
184 export_metrics: ExportMetricsOption::default(),
185 user_provider: None,
186 region_engine: vec![
187 RegionEngineConfig::Mito(MitoConfig::default()),
188 RegionEngineConfig::File(FileEngineConfig::default()),
189 ],
190 tracing: TracingOptions::default(),
191 init_regions_in_background: false,
192 init_regions_parallelism: 16,
193 max_in_flight_write_bytes: None,
194 slow_query: SlowQueryOptions::default(),
195 query: QueryOptions::default(),
196 memory: MemoryOptions::default(),
197 }
198 }
199}
200
201impl Configurable for StandaloneOptions {
202 fn env_list_keys() -> Option<&'static [&'static str]> {
203 Some(&["wal.broker_endpoints"])
204 }
205}
206
207#[allow(clippy::from_over_into)]
211impl Into<FrontendOptions> for StandaloneOptions {
212 fn into(self) -> FrontendOptions {
213 self.frontend_options()
214 }
215}
216
217impl StandaloneOptions {
218 pub fn frontend_options(&self) -> FrontendOptions {
219 let cloned_opts = self.clone();
220 FrontendOptions {
221 default_timezone: cloned_opts.default_timezone,
222 http: cloned_opts.http,
223 grpc: cloned_opts.grpc,
224 mysql: cloned_opts.mysql,
225 postgres: cloned_opts.postgres,
226 opentsdb: cloned_opts.opentsdb,
227 influxdb: cloned_opts.influxdb,
228 jaeger: cloned_opts.jaeger,
229 prom_store: cloned_opts.prom_store,
230 meta_client: None,
231 logging: cloned_opts.logging,
232 user_provider: cloned_opts.user_provider,
233 export_metrics: cloned_opts.export_metrics,
235 max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
236 slow_query: cloned_opts.slow_query,
237 ..Default::default()
238 }
239 }
240
241 pub fn datanode_options(&self) -> DatanodeOptions {
242 let cloned_opts = self.clone();
243 DatanodeOptions {
244 node_id: Some(0),
245 enable_telemetry: cloned_opts.enable_telemetry,
246 wal: cloned_opts.wal,
247 storage: cloned_opts.storage,
248 region_engine: cloned_opts.region_engine,
249 grpc: cloned_opts.grpc,
250 init_regions_in_background: cloned_opts.init_regions_in_background,
251 init_regions_parallelism: cloned_opts.init_regions_parallelism,
252 query: cloned_opts.query,
253 ..Default::default()
254 }
255 }
256}
257
258pub struct Instance {
259 datanode: Datanode,
260 frontend: Frontend,
261 flownode: FlownodeInstance,
262 procedure_manager: ProcedureManagerRef,
263 wal_options_allocator: WalOptionsAllocatorRef,
264
265 #[cfg(feature = "enterprise")]
268 components: Components,
269
270 _guard: Vec<WorkerGuard>,
272}
273
274#[cfg(feature = "enterprise")]
275pub struct Components {
276 pub plugins: Plugins,
277 pub kv_backend: KvBackendRef,
278 pub frontend_client: Arc<FrontendClient>,
279 pub catalog_manager: catalog::CatalogManagerRef,
280}
281
282impl Instance {
283 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
285 self.frontend.server_handlers().addr(name)
286 }
287
288 #[cfg(feature = "enterprise")]
289 pub fn components(&self) -> &Components {
290 &self.components
291 }
292}
293
294#[async_trait]
295impl App for Instance {
296 fn name(&self) -> &str {
297 APP_NAME
298 }
299
300 async fn start(&mut self) -> Result<()> {
301 self.datanode.start_telemetry();
302
303 self.procedure_manager
304 .start()
305 .await
306 .context(error::StartProcedureManagerSnafu)?;
307
308 self.wal_options_allocator
309 .start()
310 .await
311 .context(error::StartWalOptionsAllocatorSnafu)?;
312
313 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
314 .await
315 .context(error::StartFrontendSnafu)?;
316
317 self.frontend
318 .start()
319 .await
320 .context(error::StartFrontendSnafu)?;
321
322 self.flownode.start().await.context(StartFlownodeSnafu)?;
323
324 Ok(())
325 }
326
327 async fn stop(&mut self) -> Result<()> {
328 self.frontend
329 .shutdown()
330 .await
331 .context(error::ShutdownFrontendSnafu)?;
332
333 self.procedure_manager
334 .stop()
335 .await
336 .context(error::StopProcedureManagerSnafu)?;
337
338 self.datanode
339 .shutdown()
340 .await
341 .context(error::ShutdownDatanodeSnafu)?;
342
343 self.flownode
344 .shutdown()
345 .await
346 .context(error::ShutdownFlownodeSnafu)?;
347
348 info!("Datanode instance stopped.");
349
350 Ok(())
351 }
352}
353
354#[derive(Debug, Default, Parser)]
355pub struct StartCommand {
356 #[clap(long)]
357 http_addr: Option<String>,
358 #[clap(long, alias = "rpc-addr")]
359 rpc_bind_addr: Option<String>,
360 #[clap(long)]
361 mysql_addr: Option<String>,
362 #[clap(long)]
363 postgres_addr: Option<String>,
364 #[clap(short, long)]
365 influxdb_enable: bool,
366 #[clap(short, long)]
367 pub config_file: Option<String>,
368 #[clap(long)]
369 tls_mode: Option<TlsMode>,
370 #[clap(long)]
371 tls_cert_path: Option<String>,
372 #[clap(long)]
373 tls_key_path: Option<String>,
374 #[clap(long)]
375 user_provider: Option<String>,
376 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
377 pub env_prefix: String,
378 #[clap(long)]
380 data_home: Option<String>,
381}
382
383impl StartCommand {
384 pub fn load_options(
386 &self,
387 global_options: &GlobalOptions,
388 ) -> Result<GreptimeOptions<StandaloneOptions>> {
389 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
390 self.config_file.as_deref(),
391 self.env_prefix.as_ref(),
392 )
393 .context(error::LoadLayeredConfigSnafu)?;
394
395 self.merge_with_cli_options(global_options, &mut opts.component)?;
396
397 Ok(opts)
398 }
399
400 pub fn merge_with_cli_options(
402 &self,
403 global_options: &GlobalOptions,
404 opts: &mut StandaloneOptions,
405 ) -> Result<()> {
406 if let Some(dir) = &global_options.log_dir {
407 opts.logging.dir.clone_from(dir);
408 }
409
410 if global_options.log_level.is_some() {
411 opts.logging.level.clone_from(&global_options.log_level);
412 }
413
414 opts.tracing = TracingOptions {
415 #[cfg(feature = "tokio-console")]
416 tokio_console_addr: global_options.tokio_console_addr.clone(),
417 };
418
419 let tls_opts = TlsOption::new(
420 self.tls_mode.clone(),
421 self.tls_cert_path.clone(),
422 self.tls_key_path.clone(),
423 );
424
425 if let Some(addr) = &self.http_addr {
426 opts.http.addr.clone_from(addr);
427 }
428
429 if let Some(data_home) = &self.data_home {
430 opts.storage.data_home.clone_from(data_home);
431 }
432
433 if opts.logging.dir.is_empty() {
435 opts.logging.dir = Path::new(&opts.storage.data_home)
436 .join(DEFAULT_LOGGING_DIR)
437 .to_string_lossy()
438 .to_string();
439 }
440
441 if let Some(addr) = &self.rpc_bind_addr {
442 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
444 if addr.eq(&datanode_grpc_addr) {
445 return error::IllegalConfigSnafu {
446 msg: format!(
447 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
448 ),
449 }.fail();
450 }
451 opts.grpc.bind_addr.clone_from(addr)
452 }
453
454 if let Some(addr) = &self.mysql_addr {
455 opts.mysql.enable = true;
456 opts.mysql.addr.clone_from(addr);
457 opts.mysql.tls = tls_opts.clone();
458 }
459
460 if let Some(addr) = &self.postgres_addr {
461 opts.postgres.enable = true;
462 opts.postgres.addr.clone_from(addr);
463 opts.postgres.tls = tls_opts;
464 }
465
466 if self.influxdb_enable {
467 opts.influxdb.enable = self.influxdb_enable;
468 }
469
470 if let Some(user_provider) = &self.user_provider {
471 opts.user_provider = Some(user_provider.clone());
472 }
473
474 Ok(())
475 }
476
477 #[allow(unreachable_code)]
478 #[allow(unused_variables)]
479 #[allow(clippy::diverging_sub_expression)]
480 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
482 common_runtime::init_global_runtimes(&opts.runtime);
483
484 let guard = common_telemetry::init_global_logging(
485 APP_NAME,
486 &opts.component.logging,
487 &opts.component.tracing,
488 None,
489 Some(&opts.component.slow_query),
490 );
491
492 log_versions(verbose_version(), short_version(), APP_NAME);
493 maybe_activate_heap_profile(&opts.component.memory);
494 create_resource_limit_metrics(APP_NAME);
495
496 info!("Standalone start command: {:#?}", self);
497 info!("Standalone options: {opts:#?}");
498
499 let mut plugins = Plugins::new();
500 let plugin_opts = opts.plugins;
501 let mut opts = opts.component;
502 opts.grpc.detect_server_addr();
503 let fe_opts = opts.frontend_options();
504 let dn_opts = opts.datanode_options();
505
506 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
507 .await
508 .context(error::StartFrontendSnafu)?;
509
510 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
511 .await
512 .context(error::StartDatanodeSnafu)?;
513
514 set_default_timezone(fe_opts.default_timezone.as_deref())
515 .context(error::InitTimezoneSnafu)?;
516
517 let data_home = &dn_opts.storage.data_home;
518 fs::create_dir_all(path::Path::new(data_home))
520 .context(error::CreateDirSnafu { dir: data_home })?;
521
522 let metadata_dir = metadata_store_dir(data_home);
523 let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
524 metadata_dir,
525 opts.metadata_store,
526 opts.procedure,
527 )
528 .await
529 .context(error::StartFrontendSnafu)?;
530
531 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
533 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
534 let layered_cache_registry = Arc::new(
535 with_default_composite_cache_registry(
536 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
537 )
538 .context(error::BuildCacheRegistrySnafu)?
539 .build(),
540 );
541
542 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
543 builder.with_cache_registry(layered_cache_registry.clone());
544 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
545
546 let information_extension = Arc::new(StandaloneInformationExtension::new(
547 datanode.region_server(),
548 procedure_manager.clone(),
549 ));
550
551 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
552 let builder = KvBackendCatalogManagerBuilder::new(
553 information_extension.clone(),
554 kv_backend.clone(),
555 layered_cache_registry.clone(),
556 )
557 .with_procedure_manager(procedure_manager.clone())
558 .with_process_manager(process_manager.clone());
559 #[cfg(feature = "enterprise")]
560 let builder = if let Some(factories) = plugins.get() {
561 builder.with_extra_information_table_factories(factories)
562 } else {
563 builder
564 };
565 let catalog_manager = builder.build();
566
567 let table_metadata_manager =
568 Self::create_table_metadata_manager(kv_backend.clone()).await?;
569
570 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
571 let flownode_options = FlownodeOptions {
572 flow: opts.flow.clone(),
573 ..Default::default()
574 };
575
576 let (frontend_client, frontend_instance_handler) =
579 FrontendClient::from_empty_grpc_handler(opts.query.clone());
580 let frontend_client = Arc::new(frontend_client);
581 let flow_builder = FlownodeBuilder::new(
582 flownode_options,
583 plugins.clone(),
584 table_metadata_manager.clone(),
585 catalog_manager.clone(),
586 flow_metadata_manager.clone(),
587 frontend_client.clone(),
588 );
589 let flownode = flow_builder
590 .build()
591 .await
592 .map_err(BoxedError::new)
593 .context(error::OtherSnafu)?;
594
595 {
597 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
598 information_extension
599 .set_flow_streaming_engine(flow_streaming_engine)
600 .await;
601 }
602
603 let node_manager = Arc::new(StandaloneDatanodeManager {
604 region_server: datanode.region_server(),
605 flow_server: flownode.flow_engine(),
606 });
607
608 let table_id_sequence = Arc::new(
609 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
610 .initial(MIN_USER_TABLE_ID as u64)
611 .step(10)
612 .build(),
613 );
614 let flow_id_sequence = Arc::new(
615 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
616 .initial(MIN_USER_FLOW_ID as u64)
617 .step(10)
618 .build(),
619 );
620 let kafka_options = opts.wal.clone().into();
621 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
622 .await
623 .context(error::BuildWalOptionsAllocatorSnafu)?;
624 let wal_options_allocator = Arc::new(wal_options_allocator);
625 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
626 table_id_sequence,
627 wal_options_allocator.clone(),
628 ));
629 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
630 flow_id_sequence,
631 ));
632
633 let ddl_context = DdlContext {
634 node_manager: node_manager.clone(),
635 cache_invalidator: layered_cache_registry.clone(),
636 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
637 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
638 table_metadata_manager: table_metadata_manager.clone(),
639 table_metadata_allocator: table_metadata_allocator.clone(),
640 flow_metadata_manager: flow_metadata_manager.clone(),
641 flow_metadata_allocator: flow_metadata_allocator.clone(),
642 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
643 };
644
645 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
646 .context(error::InitDdlManagerSnafu)?;
647 #[cfg(feature = "enterprise")]
648 let ddl_manager = {
649 let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
650 plugins.get();
651 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
652 };
653
654 let procedure_executor = Arc::new(LocalProcedureExecutor::new(
655 Arc::new(ddl_manager),
656 procedure_manager.clone(),
657 ));
658
659 let fe_instance = FrontendBuilder::new(
660 fe_opts.clone(),
661 kv_backend.clone(),
662 layered_cache_registry.clone(),
663 catalog_manager.clone(),
664 node_manager.clone(),
665 procedure_executor.clone(),
666 process_manager,
667 )
668 .with_plugin(plugins.clone())
669 .try_build()
670 .await
671 .context(error::StartFrontendSnafu)?;
672 let fe_instance = Arc::new(fe_instance);
673
674 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
676 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
677 frontend_instance_handler
678 .lock()
679 .unwrap()
680 .replace(weak_grpc_handler);
681
682 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
684 let invoker = FrontendInvoker::build_from(
686 flow_streaming_engine.clone(),
687 catalog_manager.clone(),
688 kv_backend.clone(),
689 layered_cache_registry.clone(),
690 procedure_executor,
691 node_manager,
692 )
693 .await
694 .context(StartFlownodeSnafu)?;
695 flow_streaming_engine.set_frontend_invoker(invoker).await;
696
697 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
698 .context(error::ServersSnafu)?;
699
700 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
701 .build()
702 .context(error::StartFrontendSnafu)?;
703
704 let frontend = Frontend {
705 instance: fe_instance,
706 servers,
707 heartbeat_task: None,
708 export_metrics_task,
709 };
710
711 #[cfg(feature = "enterprise")]
712 let components = Components {
713 plugins,
714 kv_backend,
715 frontend_client,
716 catalog_manager,
717 };
718
719 Ok(Instance {
720 datanode,
721 frontend,
722 flownode,
723 procedure_manager,
724 wal_options_allocator,
725 #[cfg(feature = "enterprise")]
726 components,
727 _guard: guard,
728 })
729 }
730
731 pub async fn create_table_metadata_manager(
732 kv_backend: KvBackendRef,
733 ) -> Result<TableMetadataManagerRef> {
734 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
735
736 table_metadata_manager
737 .init()
738 .await
739 .context(error::InitMetadataSnafu)?;
740
741 Ok(table_metadata_manager)
742 }
743}
744
745pub struct StandaloneInformationExtension {
746 region_server: RegionServer,
747 procedure_manager: ProcedureManagerRef,
748 start_time_ms: u64,
749 flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
750}
751
752impl StandaloneInformationExtension {
753 pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
754 Self {
755 region_server,
756 procedure_manager,
757 start_time_ms: common_time::util::current_time_millis() as u64,
758 flow_streaming_engine: RwLock::new(None),
759 }
760 }
761
762 pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
764 let mut guard = self.flow_streaming_engine.write().await;
765 *guard = Some(flow_streaming_engine);
766 }
767}
768
769#[async_trait::async_trait]
770impl InformationExtension for StandaloneInformationExtension {
771 type Error = catalog::error::Error;
772
773 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
774 let build_info = common_version::build_info();
775 let node_info = NodeInfo {
776 peer: Peer {
780 id: 0,
781 addr: "".to_string(),
782 },
783 last_activity_ts: -1,
784 status: NodeStatus::Standalone,
785 version: build_info.version.to_string(),
786 git_commit: build_info.commit_short.to_string(),
787 start_time_ms: self.start_time_ms,
790 };
791 Ok(vec![node_info])
792 }
793
794 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
795 self.procedure_manager
796 .list_procedures()
797 .await
798 .map_err(BoxedError::new)
799 .map(|procedures| {
800 procedures
801 .into_iter()
802 .map(|procedure| {
803 let status = procedure.state.as_str_name().to_string();
804 (status, procedure)
805 })
806 .collect::<Vec<_>>()
807 })
808 .context(catalog::error::ListProceduresSnafu)
809 }
810
811 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
812 let stats = self
813 .region_server
814 .reportable_regions()
815 .into_iter()
816 .map(|stat| {
817 let region_stat = self
818 .region_server
819 .region_statistic(stat.region_id)
820 .unwrap_or_default();
821 RegionStat {
822 id: stat.region_id,
823 rcus: 0,
824 wcus: 0,
825 approximate_bytes: region_stat.estimated_disk_size(),
826 engine: stat.engine,
827 role: RegionRole::from(stat.role).into(),
828 num_rows: region_stat.num_rows,
829 memtable_size: region_stat.memtable_size,
830 manifest_size: region_stat.manifest_size,
831 sst_size: region_stat.sst_size,
832 sst_num: region_stat.sst_num,
833 index_size: region_stat.index_size,
834 region_manifest: region_stat.manifest.into(),
835 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
836 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
837 written_bytes: region_stat.written_bytes,
838 }
839 })
840 .collect::<Vec<_>>();
841 Ok(stats)
842 }
843
844 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
845 Ok(Some(
846 self.flow_streaming_engine
847 .read()
848 .await
849 .as_ref()
850 .unwrap()
851 .gen_state_report()
852 .await,
853 ))
854 }
855}
856
857#[cfg(test)]
858mod tests {
859 use std::default::Default;
860 use std::io::Write;
861 use std::time::Duration;
862
863 use auth::{Identity, Password, UserProviderRef};
864 use common_base::readable_size::ReadableSize;
865 use common_config::ENV_VAR_SEP;
866 use common_test_util::temp_dir::create_named_temp_file;
867 use common_wal::config::DatanodeWalConfig;
868 use object_store::config::{FileConfig, GcsConfig};
869
870 use super::*;
871 use crate::options::GlobalOptions;
872
873 #[tokio::test]
874 async fn test_try_from_start_command_to_anymap() {
875 let fe_opts = FrontendOptions {
876 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
877 ..Default::default()
878 };
879
880 let mut plugins = Plugins::new();
881 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
882 .await
883 .unwrap();
884
885 let provider = plugins.get::<UserProviderRef>().unwrap();
886 let result = provider
887 .authenticate(
888 Identity::UserId("test", None),
889 Password::PlainText("test".to_string().into()),
890 )
891 .await;
892 let _ = result.unwrap();
893 }
894
895 #[test]
896 fn test_toml() {
897 let opts = StandaloneOptions::default();
898 let toml_string = toml::to_string(&opts).unwrap();
899 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
900 }
901
902 #[test]
903 fn test_read_from_config_file() {
904 let mut file = create_named_temp_file();
905 let toml_str = r#"
906 enable_memory_catalog = true
907
908 [wal]
909 provider = "raft_engine"
910 dir = "./greptimedb_data/test/wal"
911 file_size = "1GB"
912 purge_threshold = "50GB"
913 purge_interval = "10m"
914 read_batch_size = 128
915 sync_write = false
916
917 [storage]
918 data_home = "./greptimedb_data/"
919 type = "File"
920
921 [[storage.providers]]
922 type = "Gcs"
923 bucket = "foo"
924 endpoint = "bar"
925
926 [[storage.providers]]
927 type = "S3"
928 access_key_id = "access_key_id"
929 secret_access_key = "secret_access_key"
930
931 [storage.compaction]
932 max_inflight_tasks = 3
933 max_files_in_level0 = 7
934 max_purge_tasks = 32
935
936 [storage.manifest]
937 checkpoint_margin = 9
938 gc_duration = '7s'
939
940 [http]
941 addr = "127.0.0.1:4000"
942 timeout = "33s"
943 body_limit = "128MB"
944
945 [opentsdb]
946 enable = true
947
948 [logging]
949 level = "debug"
950 dir = "./greptimedb_data/test/logs"
951 "#;
952 write!(file, "{}", toml_str).unwrap();
953 let cmd = StartCommand {
954 config_file: Some(file.path().to_str().unwrap().to_string()),
955 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
956 ..Default::default()
957 };
958
959 let options = cmd
960 .load_options(&GlobalOptions::default())
961 .unwrap()
962 .component;
963 let fe_opts = options.frontend_options();
964 let dn_opts = options.datanode_options();
965 let logging_opts = options.logging;
966 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
967 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
968 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
969 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
970 assert!(fe_opts.mysql.enable);
971 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
972 assert_eq!(2, fe_opts.mysql.runtime_size);
973 assert_eq!(None, fe_opts.mysql.reject_no_database);
974 assert!(fe_opts.influxdb.enable);
975 assert!(fe_opts.opentsdb.enable);
976
977 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
978 unreachable!()
979 };
980 assert_eq!(
981 "./greptimedb_data/test/wal",
982 raft_engine_config.dir.unwrap()
983 );
984
985 assert!(matches!(
986 &dn_opts.storage.store,
987 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
988 ));
989 assert_eq!(dn_opts.storage.providers.len(), 2);
990 assert!(matches!(
991 dn_opts.storage.providers[0],
992 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
993 ));
994 match &dn_opts.storage.providers[1] {
995 object_store::config::ObjectStoreConfig::S3(s3_config) => {
996 assert_eq!(
997 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
998 format!("{:?}", s3_config.access_key_id)
999 );
1000 }
1001 _ => {
1002 unreachable!()
1003 }
1004 }
1005
1006 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
1007 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
1008 }
1009
1010 #[test]
1011 fn test_load_log_options_from_cli() {
1012 let cmd = StartCommand {
1013 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
1014 ..Default::default()
1015 };
1016
1017 let opts = cmd
1018 .load_options(&GlobalOptions {
1019 log_dir: Some("./greptimedb_data/test/logs".to_string()),
1020 log_level: Some("debug".to_string()),
1021
1022 #[cfg(feature = "tokio-console")]
1023 tokio_console_addr: None,
1024 })
1025 .unwrap()
1026 .component;
1027
1028 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
1029 assert_eq!("debug", opts.logging.level.unwrap());
1030 }
1031
1032 #[test]
1033 fn test_config_precedence_order() {
1034 let mut file = create_named_temp_file();
1035 let toml_str = r#"
1036 [http]
1037 addr = "127.0.0.1:4000"
1038
1039 [logging]
1040 level = "debug"
1041 "#;
1042 write!(file, "{}", toml_str).unwrap();
1043
1044 let env_prefix = "STANDALONE_UT";
1045 temp_env::with_vars(
1046 [
1047 (
1048 [
1050 env_prefix.to_string(),
1051 "logging".to_uppercase(),
1052 "dir".to_uppercase(),
1053 ]
1054 .join(ENV_VAR_SEP),
1055 Some("/other/log/dir"),
1056 ),
1057 (
1058 [
1060 env_prefix.to_string(),
1061 "logging".to_uppercase(),
1062 "level".to_uppercase(),
1063 ]
1064 .join(ENV_VAR_SEP),
1065 Some("info"),
1066 ),
1067 (
1068 [
1070 env_prefix.to_string(),
1071 "http".to_uppercase(),
1072 "addr".to_uppercase(),
1073 ]
1074 .join(ENV_VAR_SEP),
1075 Some("127.0.0.1:24000"),
1076 ),
1077 ],
1078 || {
1079 let command = StartCommand {
1080 config_file: Some(file.path().to_str().unwrap().to_string()),
1081 http_addr: Some("127.0.0.1:14000".to_string()),
1082 env_prefix: env_prefix.to_string(),
1083 ..Default::default()
1084 };
1085
1086 let opts = command.load_options(&Default::default()).unwrap().component;
1087
1088 assert_eq!(opts.logging.dir, "/other/log/dir");
1090
1091 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1093
1094 let fe_opts = opts.frontend_options();
1096 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1097 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1098
1099 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1101 },
1102 );
1103 }
1104
1105 #[test]
1106 fn test_load_default_standalone_options() {
1107 let options =
1108 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1109 let default_options = StandaloneOptions::default();
1110 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1111 assert_eq!(options.http, default_options.http);
1112 assert_eq!(options.grpc, default_options.grpc);
1113 assert_eq!(options.mysql, default_options.mysql);
1114 assert_eq!(options.postgres, default_options.postgres);
1115 assert_eq!(options.opentsdb, default_options.opentsdb);
1116 assert_eq!(options.influxdb, default_options.influxdb);
1117 assert_eq!(options.prom_store, default_options.prom_store);
1118 assert_eq!(options.wal, default_options.wal);
1119 assert_eq!(options.metadata_store, default_options.metadata_store);
1120 assert_eq!(options.procedure, default_options.procedure);
1121 assert_eq!(options.logging, default_options.logging);
1122 assert_eq!(options.region_engine, default_options.region_engine);
1123 }
1124}