1use std::net::SocketAddr;
16use std::sync::Arc;
17use std::{fs, path};
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_schema::InformationExtension;
22use catalog::kvbackend::KvBackendCatalogManager;
23use clap::Parser;
24use client::api::v1::meta::RegionRole;
25use common_base::readable_size::ReadableSize;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::cluster::{NodeInfo, NodeStatus};
33use common_meta::datanode::RegionStat;
34use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
35use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
36use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
37use common_meta::ddl_manager::DdlManager;
38use common_meta::key::flow::flow_state::FlowStat;
39use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
40use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::peer::Peer;
44use common_meta::region_keeper::MemoryRegionKeeper;
45use common_meta::region_registry::LeaderRegionRegistry;
46use common_meta::sequence::SequenceBuilder;
47use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
48use common_procedure::{ProcedureInfo, ProcedureManagerRef};
49use common_telemetry::info;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_time::timezone::set_default_timezone;
52use common_version::{short_version, version};
53use common_wal::config::DatanodeWalConfig;
54use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
55use datanode::datanode::{Datanode, DatanodeBuilder};
56use datanode::region_server::RegionServer;
57use file_engine::config::EngineConfig as FileEngineConfig;
58use flow::{
59 FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
60 FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
61};
62use frontend::frontend::{Frontend, FrontendOptions};
63use frontend::instance::builder::FrontendBuilder;
64use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
65use frontend::server::Services;
66use frontend::service_config::{
67 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
68 PromStoreOptions,
69};
70use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
71use mito2::config::MitoConfig;
72use query::stats::StatementStatistics;
73use serde::{Deserialize, Serialize};
74use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
75use servers::grpc::GrpcOptions;
76use servers::http::HttpOptions;
77use servers::tls::{TlsMode, TlsOption};
78use snafu::ResultExt;
79use tokio::sync::RwLock;
80use tracing_appender::non_blocking::WorkerGuard;
81
82use crate::error::{Result, StartFlownodeSnafu};
83use crate::options::{GlobalOptions, GreptimeOptions};
84use crate::{error, log_versions, App};
85
86pub const APP_NAME: &str = "greptime-standalone";
87
88#[derive(Parser)]
89pub struct Command {
90 #[clap(subcommand)]
91 subcmd: SubCommand,
92}
93
94impl Command {
95 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
96 self.subcmd.build(opts).await
97 }
98
99 pub fn load_options(
100 &self,
101 global_options: &GlobalOptions,
102 ) -> Result<GreptimeOptions<StandaloneOptions>> {
103 self.subcmd.load_options(global_options)
104 }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109 Start(StartCommand),
110}
111
112impl SubCommand {
113 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
114 match self {
115 SubCommand::Start(cmd) => cmd.build(opts).await,
116 }
117 }
118
119 fn load_options(
120 &self,
121 global_options: &GlobalOptions,
122 ) -> Result<GreptimeOptions<StandaloneOptions>> {
123 match self {
124 SubCommand::Start(cmd) => cmd.load_options(global_options),
125 }
126 }
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
130#[serde(default)]
131pub struct StandaloneOptions {
132 pub enable_telemetry: bool,
133 pub default_timezone: Option<String>,
134 pub http: HttpOptions,
135 pub grpc: GrpcOptions,
136 pub mysql: MysqlOptions,
137 pub postgres: PostgresOptions,
138 pub opentsdb: OpentsdbOptions,
139 pub influxdb: InfluxdbOptions,
140 pub jaeger: JaegerOptions,
141 pub prom_store: PromStoreOptions,
142 pub wal: DatanodeWalConfig,
143 pub storage: StorageConfig,
144 pub metadata_store: KvBackendConfig,
145 pub procedure: ProcedureConfig,
146 pub flow: FlowConfig,
147 pub logging: LoggingOptions,
148 pub user_provider: Option<String>,
149 pub region_engine: Vec<RegionEngineConfig>,
151 pub export_metrics: ExportMetricsOption,
152 pub tracing: TracingOptions,
153 pub init_regions_in_background: bool,
154 pub init_regions_parallelism: usize,
155 pub max_in_flight_write_bytes: Option<ReadableSize>,
156}
157
158impl Default for StandaloneOptions {
159 fn default() -> Self {
160 Self {
161 enable_telemetry: true,
162 default_timezone: None,
163 http: HttpOptions::default(),
164 grpc: GrpcOptions::default(),
165 mysql: MysqlOptions::default(),
166 postgres: PostgresOptions::default(),
167 opentsdb: OpentsdbOptions::default(),
168 influxdb: InfluxdbOptions::default(),
169 jaeger: JaegerOptions::default(),
170 prom_store: PromStoreOptions::default(),
171 wal: DatanodeWalConfig::default(),
172 storage: StorageConfig::default(),
173 metadata_store: KvBackendConfig::default(),
174 procedure: ProcedureConfig::default(),
175 flow: FlowConfig::default(),
176 logging: LoggingOptions::default(),
177 export_metrics: ExportMetricsOption::default(),
178 user_provider: None,
179 region_engine: vec![
180 RegionEngineConfig::Mito(MitoConfig::default()),
181 RegionEngineConfig::File(FileEngineConfig::default()),
182 ],
183 tracing: TracingOptions::default(),
184 init_regions_in_background: false,
185 init_regions_parallelism: 16,
186 max_in_flight_write_bytes: None,
187 }
188 }
189}
190
191impl Configurable for StandaloneOptions {
192 fn env_list_keys() -> Option<&'static [&'static str]> {
193 Some(&["wal.broker_endpoints"])
194 }
195}
196
197#[allow(clippy::from_over_into)]
201impl Into<FrontendOptions> for StandaloneOptions {
202 fn into(self) -> FrontendOptions {
203 self.frontend_options()
204 }
205}
206
207impl StandaloneOptions {
208 pub fn frontend_options(&self) -> FrontendOptions {
209 let cloned_opts = self.clone();
210 FrontendOptions {
211 default_timezone: cloned_opts.default_timezone,
212 http: cloned_opts.http,
213 grpc: cloned_opts.grpc,
214 mysql: cloned_opts.mysql,
215 postgres: cloned_opts.postgres,
216 opentsdb: cloned_opts.opentsdb,
217 influxdb: cloned_opts.influxdb,
218 jaeger: cloned_opts.jaeger,
219 prom_store: cloned_opts.prom_store,
220 meta_client: None,
221 logging: cloned_opts.logging,
222 user_provider: cloned_opts.user_provider,
223 export_metrics: cloned_opts.export_metrics,
225 max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
226 ..Default::default()
227 }
228 }
229
230 pub fn datanode_options(&self) -> DatanodeOptions {
231 let cloned_opts = self.clone();
232 DatanodeOptions {
233 node_id: Some(0),
234 enable_telemetry: cloned_opts.enable_telemetry,
235 wal: cloned_opts.wal,
236 storage: cloned_opts.storage,
237 region_engine: cloned_opts.region_engine,
238 grpc: cloned_opts.grpc,
239 init_regions_in_background: cloned_opts.init_regions_in_background,
240 init_regions_parallelism: cloned_opts.init_regions_parallelism,
241 ..Default::default()
242 }
243 }
244}
245
246pub struct Instance {
247 datanode: Datanode,
248 frontend: Frontend,
249 flownode: FlownodeInstance,
250 procedure_manager: ProcedureManagerRef,
251 wal_options_allocator: WalOptionsAllocatorRef,
252 _guard: Vec<WorkerGuard>,
254}
255
256impl Instance {
257 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
259 self.frontend.server_handlers().addr(name)
260 }
261}
262
263#[async_trait]
264impl App for Instance {
265 fn name(&self) -> &str {
266 APP_NAME
267 }
268
269 async fn start(&mut self) -> Result<()> {
270 self.datanode.start_telemetry();
271
272 self.procedure_manager
273 .start()
274 .await
275 .context(error::StartProcedureManagerSnafu)?;
276
277 self.wal_options_allocator
278 .start()
279 .await
280 .context(error::StartWalOptionsAllocatorSnafu)?;
281
282 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
283 .await
284 .context(error::StartFrontendSnafu)?;
285
286 self.frontend
287 .start()
288 .await
289 .context(error::StartFrontendSnafu)?;
290
291 self.flownode.start().await.context(StartFlownodeSnafu)?;
292
293 Ok(())
294 }
295
296 async fn stop(&mut self) -> Result<()> {
297 self.frontend
298 .shutdown()
299 .await
300 .context(error::ShutdownFrontendSnafu)?;
301
302 self.procedure_manager
303 .stop()
304 .await
305 .context(error::StopProcedureManagerSnafu)?;
306
307 self.datanode
308 .shutdown()
309 .await
310 .context(error::ShutdownDatanodeSnafu)?;
311
312 self.flownode
313 .shutdown()
314 .await
315 .context(error::ShutdownFlownodeSnafu)?;
316
317 info!("Datanode instance stopped.");
318
319 Ok(())
320 }
321}
322
323#[derive(Debug, Default, Parser)]
324pub struct StartCommand {
325 #[clap(long)]
326 http_addr: Option<String>,
327 #[clap(long, alias = "rpc-addr")]
328 rpc_bind_addr: Option<String>,
329 #[clap(long)]
330 mysql_addr: Option<String>,
331 #[clap(long)]
332 postgres_addr: Option<String>,
333 #[clap(short, long)]
334 influxdb_enable: bool,
335 #[clap(short, long)]
336 pub config_file: Option<String>,
337 #[clap(long)]
338 tls_mode: Option<TlsMode>,
339 #[clap(long)]
340 tls_cert_path: Option<String>,
341 #[clap(long)]
342 tls_key_path: Option<String>,
343 #[clap(long)]
344 user_provider: Option<String>,
345 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
346 pub env_prefix: String,
347 #[clap(long)]
349 data_home: Option<String>,
350}
351
352impl StartCommand {
353 pub fn load_options(
355 &self,
356 global_options: &GlobalOptions,
357 ) -> Result<GreptimeOptions<StandaloneOptions>> {
358 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
359 self.config_file.as_deref(),
360 self.env_prefix.as_ref(),
361 )
362 .context(error::LoadLayeredConfigSnafu)?;
363
364 self.merge_with_cli_options(global_options, &mut opts.component)?;
365
366 Ok(opts)
367 }
368
369 pub fn merge_with_cli_options(
371 &self,
372 global_options: &GlobalOptions,
373 opts: &mut StandaloneOptions,
374 ) -> Result<()> {
375 if let Some(dir) = &global_options.log_dir {
376 opts.logging.dir.clone_from(dir);
377 }
378
379 if global_options.log_level.is_some() {
380 opts.logging.level.clone_from(&global_options.log_level);
381 }
382
383 opts.tracing = TracingOptions {
384 #[cfg(feature = "tokio-console")]
385 tokio_console_addr: global_options.tokio_console_addr.clone(),
386 };
387
388 let tls_opts = TlsOption::new(
389 self.tls_mode.clone(),
390 self.tls_cert_path.clone(),
391 self.tls_key_path.clone(),
392 );
393
394 if let Some(addr) = &self.http_addr {
395 opts.http.addr.clone_from(addr);
396 }
397
398 if let Some(data_home) = &self.data_home {
399 opts.storage.data_home.clone_from(data_home);
400 }
401
402 if let Some(addr) = &self.rpc_bind_addr {
403 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
405 if addr.eq(&datanode_grpc_addr) {
406 return error::IllegalConfigSnafu {
407 msg: format!(
408 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
409 ),
410 }.fail();
411 }
412 opts.grpc.bind_addr.clone_from(addr)
413 }
414
415 if let Some(addr) = &self.mysql_addr {
416 opts.mysql.enable = true;
417 opts.mysql.addr.clone_from(addr);
418 opts.mysql.tls = tls_opts.clone();
419 }
420
421 if let Some(addr) = &self.postgres_addr {
422 opts.postgres.enable = true;
423 opts.postgres.addr.clone_from(addr);
424 opts.postgres.tls = tls_opts;
425 }
426
427 if self.influxdb_enable {
428 opts.influxdb.enable = self.influxdb_enable;
429 }
430
431 if let Some(user_provider) = &self.user_provider {
432 opts.user_provider = Some(user_provider.clone());
433 }
434
435 Ok(())
436 }
437
438 #[allow(unreachable_code)]
439 #[allow(unused_variables)]
440 #[allow(clippy::diverging_sub_expression)]
441 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
443 common_runtime::init_global_runtimes(&opts.runtime);
444
445 let guard = common_telemetry::init_global_logging(
446 APP_NAME,
447 &opts.component.logging,
448 &opts.component.tracing,
449 None,
450 );
451 log_versions(version(), short_version(), APP_NAME);
452
453 info!("Standalone start command: {:#?}", self);
454 info!("Standalone options: {opts:#?}");
455
456 let mut plugins = Plugins::new();
457 let plugin_opts = opts.plugins;
458 let mut opts = opts.component;
459 opts.grpc.detect_server_addr();
460 let fe_opts = opts.frontend_options();
461 let dn_opts = opts.datanode_options();
462
463 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
464 .await
465 .context(error::StartFrontendSnafu)?;
466
467 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
468 .await
469 .context(error::StartDatanodeSnafu)?;
470
471 set_default_timezone(fe_opts.default_timezone.as_deref())
472 .context(error::InitTimezoneSnafu)?;
473
474 let data_home = &dn_opts.storage.data_home;
475 fs::create_dir_all(path::Path::new(data_home))
477 .context(error::CreateDirSnafu { dir: data_home })?;
478
479 let metadata_dir = metadata_store_dir(data_home);
480 let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
481 metadata_dir,
482 opts.metadata_store,
483 opts.procedure,
484 )
485 .await
486 .context(error::StartFrontendSnafu)?;
487
488 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
490 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
491 let layered_cache_registry = Arc::new(
492 with_default_composite_cache_registry(
493 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
494 )
495 .context(error::BuildCacheRegistrySnafu)?
496 .build(),
497 );
498
499 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
500 builder.with_cache_registry(layered_cache_registry.clone());
501 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
502
503 let information_extension = Arc::new(StandaloneInformationExtension::new(
504 datanode.region_server(),
505 procedure_manager.clone(),
506 ));
507 let catalog_manager = KvBackendCatalogManager::new(
508 information_extension.clone(),
509 kv_backend.clone(),
510 layered_cache_registry.clone(),
511 Some(procedure_manager.clone()),
512 );
513
514 let table_metadata_manager =
515 Self::create_table_metadata_manager(kv_backend.clone()).await?;
516
517 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
518 let flownode_options = FlownodeOptions {
519 flow: opts.flow.clone(),
520 ..Default::default()
521 };
522
523 let (frontend_client, frontend_instance_handler) =
526 FrontendClient::from_empty_grpc_handler();
527 let flow_builder = FlownodeBuilder::new(
528 flownode_options,
529 plugins.clone(),
530 table_metadata_manager.clone(),
531 catalog_manager.clone(),
532 flow_metadata_manager.clone(),
533 Arc::new(frontend_client.clone()),
534 );
535 let flownode = flow_builder
536 .build()
537 .await
538 .map_err(BoxedError::new)
539 .context(error::OtherSnafu)?;
540
541 {
543 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
544 information_extension
545 .set_flow_streaming_engine(flow_streaming_engine)
546 .await;
547 }
548
549 let node_manager = Arc::new(StandaloneDatanodeManager {
550 region_server: datanode.region_server(),
551 flow_server: flownode.flow_engine(),
552 });
553
554 let table_id_sequence = Arc::new(
555 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
556 .initial(MIN_USER_TABLE_ID as u64)
557 .step(10)
558 .build(),
559 );
560 let flow_id_sequence = Arc::new(
561 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
562 .initial(MIN_USER_FLOW_ID as u64)
563 .step(10)
564 .build(),
565 );
566 let kafka_options = opts.wal.clone().into();
567 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
568 .await
569 .context(error::BuildWalOptionsAllocatorSnafu)?;
570 let wal_options_allocator = Arc::new(wal_options_allocator);
571 let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
572 table_id_sequence,
573 wal_options_allocator.clone(),
574 ));
575 let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
576 flow_id_sequence,
577 ));
578
579 let ddl_task_executor = Self::create_ddl_task_executor(
580 procedure_manager.clone(),
581 node_manager.clone(),
582 layered_cache_registry.clone(),
583 table_metadata_manager,
584 table_meta_allocator,
585 flow_metadata_manager,
586 flow_meta_allocator,
587 )
588 .await?;
589
590 let fe_instance = FrontendBuilder::new(
591 fe_opts.clone(),
592 kv_backend.clone(),
593 layered_cache_registry.clone(),
594 catalog_manager.clone(),
595 node_manager.clone(),
596 ddl_task_executor.clone(),
597 StatementStatistics::new(opts.logging.slow_query.clone()),
598 )
599 .with_plugin(plugins.clone())
600 .try_build()
601 .await
602 .context(error::StartFrontendSnafu)?;
603 let fe_instance = Arc::new(fe_instance);
604
605 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
607 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
608 frontend_instance_handler
609 .lock()
610 .unwrap()
611 .replace(weak_grpc_handler);
612
613 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
615 let invoker = FrontendInvoker::build_from(
617 flow_streaming_engine.clone(),
618 catalog_manager.clone(),
619 kv_backend.clone(),
620 layered_cache_registry.clone(),
621 ddl_task_executor.clone(),
622 node_manager,
623 )
624 .await
625 .context(error::StartFlownodeSnafu)?;
626 flow_streaming_engine.set_frontend_invoker(invoker).await;
627
628 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
629 .context(error::ServersSnafu)?;
630
631 let servers = Services::new(opts, fe_instance.clone(), plugins)
632 .build()
633 .context(error::StartFrontendSnafu)?;
634
635 let frontend = Frontend {
636 instance: fe_instance,
637 servers,
638 heartbeat_task: None,
639 export_metrics_task,
640 };
641
642 Ok(Instance {
643 datanode,
644 frontend,
645 flownode,
646 procedure_manager,
647 wal_options_allocator,
648 _guard: guard,
649 })
650 }
651
652 pub async fn create_ddl_task_executor(
653 procedure_manager: ProcedureManagerRef,
654 node_manager: NodeManagerRef,
655 cache_invalidator: CacheInvalidatorRef,
656 table_metadata_manager: TableMetadataManagerRef,
657 table_metadata_allocator: TableMetadataAllocatorRef,
658 flow_metadata_manager: FlowMetadataManagerRef,
659 flow_metadata_allocator: FlowMetadataAllocatorRef,
660 ) -> Result<ProcedureExecutorRef> {
661 let procedure_executor: ProcedureExecutorRef = Arc::new(
662 DdlManager::try_new(
663 DdlContext {
664 node_manager,
665 cache_invalidator,
666 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
667 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
668 table_metadata_manager,
669 table_metadata_allocator,
670 flow_metadata_manager,
671 flow_metadata_allocator,
672 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
673 },
674 procedure_manager,
675 true,
676 )
677 .context(error::InitDdlManagerSnafu)?,
678 );
679
680 Ok(procedure_executor)
681 }
682
683 pub async fn create_table_metadata_manager(
684 kv_backend: KvBackendRef,
685 ) -> Result<TableMetadataManagerRef> {
686 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
687
688 table_metadata_manager
689 .init()
690 .await
691 .context(error::InitMetadataSnafu)?;
692
693 Ok(table_metadata_manager)
694 }
695}
696
697pub struct StandaloneInformationExtension {
698 region_server: RegionServer,
699 procedure_manager: ProcedureManagerRef,
700 start_time_ms: u64,
701 flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
702}
703
704impl StandaloneInformationExtension {
705 pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
706 Self {
707 region_server,
708 procedure_manager,
709 start_time_ms: common_time::util::current_time_millis() as u64,
710 flow_streaming_engine: RwLock::new(None),
711 }
712 }
713
714 pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
716 let mut guard = self.flow_streaming_engine.write().await;
717 *guard = Some(flow_streaming_engine);
718 }
719}
720
721#[async_trait::async_trait]
722impl InformationExtension for StandaloneInformationExtension {
723 type Error = catalog::error::Error;
724
725 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
726 let build_info = common_version::build_info();
727 let node_info = NodeInfo {
728 peer: Peer {
732 id: 0,
733 addr: "".to_string(),
734 },
735 last_activity_ts: -1,
736 status: NodeStatus::Standalone,
737 version: build_info.version.to_string(),
738 git_commit: build_info.commit_short.to_string(),
739 start_time_ms: self.start_time_ms,
742 };
743 Ok(vec![node_info])
744 }
745
746 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
747 self.procedure_manager
748 .list_procedures()
749 .await
750 .map_err(BoxedError::new)
751 .map(|procedures| {
752 procedures
753 .into_iter()
754 .map(|procedure| {
755 let status = procedure.state.as_str_name().to_string();
756 (status, procedure)
757 })
758 .collect::<Vec<_>>()
759 })
760 .context(catalog::error::ListProceduresSnafu)
761 }
762
763 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
764 let stats = self
765 .region_server
766 .reportable_regions()
767 .into_iter()
768 .map(|stat| {
769 let region_stat = self
770 .region_server
771 .region_statistic(stat.region_id)
772 .unwrap_or_default();
773 RegionStat {
774 id: stat.region_id,
775 rcus: 0,
776 wcus: 0,
777 approximate_bytes: region_stat.estimated_disk_size(),
778 engine: stat.engine,
779 role: RegionRole::from(stat.role).into(),
780 num_rows: region_stat.num_rows,
781 memtable_size: region_stat.memtable_size,
782 manifest_size: region_stat.manifest_size,
783 sst_size: region_stat.sst_size,
784 index_size: region_stat.index_size,
785 region_manifest: region_stat.manifest.into(),
786 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
787 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
788 }
789 })
790 .collect::<Vec<_>>();
791 Ok(stats)
792 }
793
794 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
795 Ok(Some(
796 self.flow_streaming_engine
797 .read()
798 .await
799 .as_ref()
800 .unwrap()
801 .gen_state_report()
802 .await,
803 ))
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use std::default::Default;
810 use std::io::Write;
811 use std::time::Duration;
812
813 use auth::{Identity, Password, UserProviderRef};
814 use common_base::readable_size::ReadableSize;
815 use common_config::ENV_VAR_SEP;
816 use common_test_util::temp_dir::create_named_temp_file;
817 use common_wal::config::DatanodeWalConfig;
818 use datanode::config::{FileConfig, GcsConfig};
819
820 use super::*;
821 use crate::options::GlobalOptions;
822
823 #[tokio::test]
824 async fn test_try_from_start_command_to_anymap() {
825 let fe_opts = FrontendOptions {
826 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
827 ..Default::default()
828 };
829
830 let mut plugins = Plugins::new();
831 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
832 .await
833 .unwrap();
834
835 let provider = plugins.get::<UserProviderRef>().unwrap();
836 let result = provider
837 .authenticate(
838 Identity::UserId("test", None),
839 Password::PlainText("test".to_string().into()),
840 )
841 .await;
842 let _ = result.unwrap();
843 }
844
845 #[test]
846 fn test_toml() {
847 let opts = StandaloneOptions::default();
848 let toml_string = toml::to_string(&opts).unwrap();
849 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
850 }
851
852 #[test]
853 fn test_read_from_config_file() {
854 let mut file = create_named_temp_file();
855 let toml_str = r#"
856 enable_memory_catalog = true
857
858 [wal]
859 provider = "raft_engine"
860 dir = "./greptimedb_data/test/wal"
861 file_size = "1GB"
862 purge_threshold = "50GB"
863 purge_interval = "10m"
864 read_batch_size = 128
865 sync_write = false
866
867 [storage]
868 data_home = "./greptimedb_data/"
869 type = "File"
870
871 [[storage.providers]]
872 type = "Gcs"
873 bucket = "foo"
874 endpoint = "bar"
875
876 [[storage.providers]]
877 type = "S3"
878 access_key_id = "access_key_id"
879 secret_access_key = "secret_access_key"
880
881 [storage.compaction]
882 max_inflight_tasks = 3
883 max_files_in_level0 = 7
884 max_purge_tasks = 32
885
886 [storage.manifest]
887 checkpoint_margin = 9
888 gc_duration = '7s'
889
890 [http]
891 addr = "127.0.0.1:4000"
892 timeout = "33s"
893 body_limit = "128MB"
894
895 [opentsdb]
896 enable = true
897
898 [logging]
899 level = "debug"
900 dir = "./greptimedb_data/test/logs"
901 "#;
902 write!(file, "{}", toml_str).unwrap();
903 let cmd = StartCommand {
904 config_file: Some(file.path().to_str().unwrap().to_string()),
905 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
906 ..Default::default()
907 };
908
909 let options = cmd
910 .load_options(&GlobalOptions::default())
911 .unwrap()
912 .component;
913 let fe_opts = options.frontend_options();
914 let dn_opts = options.datanode_options();
915 let logging_opts = options.logging;
916 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
917 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
918 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
919 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
920 assert!(fe_opts.mysql.enable);
921 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
922 assert_eq!(2, fe_opts.mysql.runtime_size);
923 assert_eq!(None, fe_opts.mysql.reject_no_database);
924 assert!(fe_opts.influxdb.enable);
925 assert!(fe_opts.opentsdb.enable);
926
927 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
928 unreachable!()
929 };
930 assert_eq!(
931 "./greptimedb_data/test/wal",
932 raft_engine_config.dir.unwrap()
933 );
934
935 assert!(matches!(
936 &dn_opts.storage.store,
937 datanode::config::ObjectStoreConfig::File(FileConfig { .. })
938 ));
939 assert_eq!(dn_opts.storage.providers.len(), 2);
940 assert!(matches!(
941 dn_opts.storage.providers[0],
942 datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
943 ));
944 match &dn_opts.storage.providers[1] {
945 datanode::config::ObjectStoreConfig::S3(s3_config) => {
946 assert_eq!(
947 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
948 format!("{:?}", s3_config.access_key_id)
949 );
950 }
951 _ => {
952 unreachable!()
953 }
954 }
955
956 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
957 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
958 }
959
960 #[test]
961 fn test_load_log_options_from_cli() {
962 let cmd = StartCommand {
963 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
964 ..Default::default()
965 };
966
967 let opts = cmd
968 .load_options(&GlobalOptions {
969 log_dir: Some("./greptimedb_data/test/logs".to_string()),
970 log_level: Some("debug".to_string()),
971
972 #[cfg(feature = "tokio-console")]
973 tokio_console_addr: None,
974 })
975 .unwrap()
976 .component;
977
978 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
979 assert_eq!("debug", opts.logging.level.unwrap());
980 }
981
982 #[test]
983 fn test_config_precedence_order() {
984 let mut file = create_named_temp_file();
985 let toml_str = r#"
986 [http]
987 addr = "127.0.0.1:4000"
988
989 [logging]
990 level = "debug"
991 "#;
992 write!(file, "{}", toml_str).unwrap();
993
994 let env_prefix = "STANDALONE_UT";
995 temp_env::with_vars(
996 [
997 (
998 [
1000 env_prefix.to_string(),
1001 "logging".to_uppercase(),
1002 "dir".to_uppercase(),
1003 ]
1004 .join(ENV_VAR_SEP),
1005 Some("/other/log/dir"),
1006 ),
1007 (
1008 [
1010 env_prefix.to_string(),
1011 "logging".to_uppercase(),
1012 "level".to_uppercase(),
1013 ]
1014 .join(ENV_VAR_SEP),
1015 Some("info"),
1016 ),
1017 (
1018 [
1020 env_prefix.to_string(),
1021 "http".to_uppercase(),
1022 "addr".to_uppercase(),
1023 ]
1024 .join(ENV_VAR_SEP),
1025 Some("127.0.0.1:24000"),
1026 ),
1027 ],
1028 || {
1029 let command = StartCommand {
1030 config_file: Some(file.path().to_str().unwrap().to_string()),
1031 http_addr: Some("127.0.0.1:14000".to_string()),
1032 env_prefix: env_prefix.to_string(),
1033 ..Default::default()
1034 };
1035
1036 let opts = command.load_options(&Default::default()).unwrap().component;
1037
1038 assert_eq!(opts.logging.dir, "/other/log/dir");
1040
1041 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1043
1044 let fe_opts = opts.frontend_options();
1046 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1047 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1048
1049 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1051 },
1052 );
1053 }
1054
1055 #[test]
1056 fn test_load_default_standalone_options() {
1057 let options =
1058 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1059 let default_options = StandaloneOptions::default();
1060 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1061 assert_eq!(options.http, default_options.http);
1062 assert_eq!(options.grpc, default_options.grpc);
1063 assert_eq!(options.mysql, default_options.mysql);
1064 assert_eq!(options.postgres, default_options.postgres);
1065 assert_eq!(options.opentsdb, default_options.opentsdb);
1066 assert_eq!(options.influxdb, default_options.influxdb);
1067 assert_eq!(options.prom_store, default_options.prom_store);
1068 assert_eq!(options.wal, default_options.wal);
1069 assert_eq!(options.metadata_store, default_options.metadata_store);
1070 assert_eq!(options.procedure, default_options.procedure);
1071 assert_eq!(options.logging, default_options.logging);
1072 assert_eq!(options.region_engine, default_options.region_engine);
1073 }
1074}