1use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtension;
23use catalog::kvbackend::KvBackendCatalogManager;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use client::api::v1::meta::RegionRole;
27use common_base::readable_size::ReadableSize;
28use common_base::Plugins;
29use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
30use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
31use common_error::ext::BoxedError;
32use common_meta::cache::LayeredCacheRegistryBuilder;
33use common_meta::cluster::{NodeInfo, NodeStatus};
34use common_meta::datanode::RegionStat;
35use common_meta::ddl::flow_meta::FlowMetadataAllocator;
36use common_meta::ddl::table_meta::TableMetadataAllocator;
37use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
38use common_meta::ddl_manager::DdlManager;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::peer::Peer;
44use common_meta::region_keeper::MemoryRegionKeeper;
45use common_meta::region_registry::LeaderRegionRegistry;
46use common_meta::sequence::SequenceBuilder;
47use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
48use common_procedure::{ProcedureInfo, ProcedureManagerRef};
49use common_telemetry::info;
50use common_telemetry::logging::{
51 LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
52};
53use common_time::timezone::set_default_timezone;
54use common_version::{short_version, version};
55use common_wal::config::DatanodeWalConfig;
56use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
57use datanode::datanode::{Datanode, DatanodeBuilder};
58use datanode::region_server::RegionServer;
59use file_engine::config::EngineConfig as FileEngineConfig;
60use flow::{
61 FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
62 FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
63};
64use frontend::frontend::{Frontend, FrontendOptions};
65use frontend::instance::builder::FrontendBuilder;
66use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
67use frontend::server::Services;
68use frontend::service_config::{
69 InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
70 PromStoreOptions,
71};
72use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
73use mito2::config::MitoConfig;
74use query::options::QueryOptions;
75use serde::{Deserialize, Serialize};
76use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
77use servers::grpc::GrpcOptions;
78use servers::http::HttpOptions;
79use servers::tls::{TlsMode, TlsOption};
80use snafu::ResultExt;
81use tokio::sync::RwLock;
82use tracing_appender::non_blocking::WorkerGuard;
83
84use crate::error::{Result, StartFlownodeSnafu};
85use crate::options::{GlobalOptions, GreptimeOptions};
86use crate::{create_resource_limit_metrics, error, log_versions, App};
87
88pub const APP_NAME: &str = "greptime-standalone";
89
90#[derive(Parser)]
91pub struct Command {
92 #[clap(subcommand)]
93 subcmd: SubCommand,
94}
95
96impl Command {
97 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98 self.subcmd.build(opts).await
99 }
100
101 pub fn load_options(
102 &self,
103 global_options: &GlobalOptions,
104 ) -> Result<GreptimeOptions<StandaloneOptions>> {
105 self.subcmd.load_options(global_options)
106 }
107}
108
109#[derive(Parser)]
110enum SubCommand {
111 Start(StartCommand),
112}
113
114impl SubCommand {
115 async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
116 match self {
117 SubCommand::Start(cmd) => cmd.build(opts).await,
118 }
119 }
120
121 fn load_options(
122 &self,
123 global_options: &GlobalOptions,
124 ) -> Result<GreptimeOptions<StandaloneOptions>> {
125 match self {
126 SubCommand::Start(cmd) => cmd.load_options(global_options),
127 }
128 }
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132#[serde(default)]
133pub struct StandaloneOptions {
134 pub enable_telemetry: bool,
135 pub default_timezone: Option<String>,
136 pub http: HttpOptions,
137 pub grpc: GrpcOptions,
138 pub mysql: MysqlOptions,
139 pub postgres: PostgresOptions,
140 pub opentsdb: OpentsdbOptions,
141 pub influxdb: InfluxdbOptions,
142 pub jaeger: JaegerOptions,
143 pub prom_store: PromStoreOptions,
144 pub wal: DatanodeWalConfig,
145 pub storage: StorageConfig,
146 pub metadata_store: KvBackendConfig,
147 pub procedure: ProcedureConfig,
148 pub flow: FlowConfig,
149 pub logging: LoggingOptions,
150 pub user_provider: Option<String>,
151 pub region_engine: Vec<RegionEngineConfig>,
153 pub export_metrics: ExportMetricsOption,
154 pub tracing: TracingOptions,
155 pub init_regions_in_background: bool,
156 pub init_regions_parallelism: usize,
157 pub max_in_flight_write_bytes: Option<ReadableSize>,
158 pub slow_query: Option<SlowQueryOptions>,
159 pub query: QueryOptions,
160}
161
162impl Default for StandaloneOptions {
163 fn default() -> Self {
164 Self {
165 enable_telemetry: true,
166 default_timezone: None,
167 http: HttpOptions::default(),
168 grpc: GrpcOptions::default(),
169 mysql: MysqlOptions::default(),
170 postgres: PostgresOptions::default(),
171 opentsdb: OpentsdbOptions::default(),
172 influxdb: InfluxdbOptions::default(),
173 jaeger: JaegerOptions::default(),
174 prom_store: PromStoreOptions::default(),
175 wal: DatanodeWalConfig::default(),
176 storage: StorageConfig::default(),
177 metadata_store: KvBackendConfig::default(),
178 procedure: ProcedureConfig::default(),
179 flow: FlowConfig::default(),
180 logging: LoggingOptions::default(),
181 export_metrics: ExportMetricsOption::default(),
182 user_provider: None,
183 region_engine: vec![
184 RegionEngineConfig::Mito(MitoConfig::default()),
185 RegionEngineConfig::File(FileEngineConfig::default()),
186 ],
187 tracing: TracingOptions::default(),
188 init_regions_in_background: false,
189 init_regions_parallelism: 16,
190 max_in_flight_write_bytes: None,
191 slow_query: Some(SlowQueryOptions::default()),
192 query: QueryOptions::default(),
193 }
194 }
195}
196
197impl Configurable for StandaloneOptions {
198 fn env_list_keys() -> Option<&'static [&'static str]> {
199 Some(&["wal.broker_endpoints"])
200 }
201}
202
203#[allow(clippy::from_over_into)]
207impl Into<FrontendOptions> for StandaloneOptions {
208 fn into(self) -> FrontendOptions {
209 self.frontend_options()
210 }
211}
212
213impl StandaloneOptions {
214 pub fn frontend_options(&self) -> FrontendOptions {
215 let cloned_opts = self.clone();
216 FrontendOptions {
217 default_timezone: cloned_opts.default_timezone,
218 http: cloned_opts.http,
219 grpc: cloned_opts.grpc,
220 mysql: cloned_opts.mysql,
221 postgres: cloned_opts.postgres,
222 opentsdb: cloned_opts.opentsdb,
223 influxdb: cloned_opts.influxdb,
224 jaeger: cloned_opts.jaeger,
225 prom_store: cloned_opts.prom_store,
226 meta_client: None,
227 logging: cloned_opts.logging,
228 user_provider: cloned_opts.user_provider,
229 export_metrics: cloned_opts.export_metrics,
231 max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
232 slow_query: cloned_opts.slow_query,
233 ..Default::default()
234 }
235 }
236
237 pub fn datanode_options(&self) -> DatanodeOptions {
238 let cloned_opts = self.clone();
239 DatanodeOptions {
240 node_id: Some(0),
241 enable_telemetry: cloned_opts.enable_telemetry,
242 wal: cloned_opts.wal,
243 storage: cloned_opts.storage,
244 region_engine: cloned_opts.region_engine,
245 grpc: cloned_opts.grpc,
246 init_regions_in_background: cloned_opts.init_regions_in_background,
247 init_regions_parallelism: cloned_opts.init_regions_parallelism,
248 query: cloned_opts.query,
249 ..Default::default()
250 }
251 }
252}
253
254pub struct Instance {
255 datanode: Datanode,
256 frontend: Frontend,
257 flownode: FlownodeInstance,
258 procedure_manager: ProcedureManagerRef,
259 wal_options_allocator: WalOptionsAllocatorRef,
260
261 #[cfg(feature = "enterprise")]
264 components: Components,
265
266 _guard: Vec<WorkerGuard>,
268}
269
270#[cfg(feature = "enterprise")]
271pub struct Components {
272 pub plugins: Plugins,
273 pub kv_backend: KvBackendRef,
274 pub frontend_client: Arc<FrontendClient>,
275 pub catalog_manager: catalog::CatalogManagerRef,
276}
277
278impl Instance {
279 pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
281 self.frontend.server_handlers().addr(name)
282 }
283
284 #[cfg(feature = "enterprise")]
285 pub fn components(&self) -> &Components {
286 &self.components
287 }
288}
289
290#[async_trait]
291impl App for Instance {
292 fn name(&self) -> &str {
293 APP_NAME
294 }
295
296 async fn start(&mut self) -> Result<()> {
297 self.datanode.start_telemetry();
298
299 self.procedure_manager
300 .start()
301 .await
302 .context(error::StartProcedureManagerSnafu)?;
303
304 self.wal_options_allocator
305 .start()
306 .await
307 .context(error::StartWalOptionsAllocatorSnafu)?;
308
309 plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
310 .await
311 .context(error::StartFrontendSnafu)?;
312
313 self.frontend
314 .start()
315 .await
316 .context(error::StartFrontendSnafu)?;
317
318 self.flownode.start().await.context(StartFlownodeSnafu)?;
319
320 Ok(())
321 }
322
323 async fn stop(&mut self) -> Result<()> {
324 self.frontend
325 .shutdown()
326 .await
327 .context(error::ShutdownFrontendSnafu)?;
328
329 self.procedure_manager
330 .stop()
331 .await
332 .context(error::StopProcedureManagerSnafu)?;
333
334 self.datanode
335 .shutdown()
336 .await
337 .context(error::ShutdownDatanodeSnafu)?;
338
339 self.flownode
340 .shutdown()
341 .await
342 .context(error::ShutdownFlownodeSnafu)?;
343
344 info!("Datanode instance stopped.");
345
346 Ok(())
347 }
348}
349
350#[derive(Debug, Default, Parser)]
351pub struct StartCommand {
352 #[clap(long)]
353 http_addr: Option<String>,
354 #[clap(long, alias = "rpc-addr")]
355 rpc_bind_addr: Option<String>,
356 #[clap(long)]
357 mysql_addr: Option<String>,
358 #[clap(long)]
359 postgres_addr: Option<String>,
360 #[clap(short, long)]
361 influxdb_enable: bool,
362 #[clap(short, long)]
363 pub config_file: Option<String>,
364 #[clap(long)]
365 tls_mode: Option<TlsMode>,
366 #[clap(long)]
367 tls_cert_path: Option<String>,
368 #[clap(long)]
369 tls_key_path: Option<String>,
370 #[clap(long)]
371 user_provider: Option<String>,
372 #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
373 pub env_prefix: String,
374 #[clap(long)]
376 data_home: Option<String>,
377}
378
379impl StartCommand {
380 pub fn load_options(
382 &self,
383 global_options: &GlobalOptions,
384 ) -> Result<GreptimeOptions<StandaloneOptions>> {
385 let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
386 self.config_file.as_deref(),
387 self.env_prefix.as_ref(),
388 )
389 .context(error::LoadLayeredConfigSnafu)?;
390
391 self.merge_with_cli_options(global_options, &mut opts.component)?;
392
393 Ok(opts)
394 }
395
396 pub fn merge_with_cli_options(
398 &self,
399 global_options: &GlobalOptions,
400 opts: &mut StandaloneOptions,
401 ) -> Result<()> {
402 if let Some(dir) = &global_options.log_dir {
403 opts.logging.dir.clone_from(dir);
404 }
405
406 if global_options.log_level.is_some() {
407 opts.logging.level.clone_from(&global_options.log_level);
408 }
409
410 opts.tracing = TracingOptions {
411 #[cfg(feature = "tokio-console")]
412 tokio_console_addr: global_options.tokio_console_addr.clone(),
413 };
414
415 let tls_opts = TlsOption::new(
416 self.tls_mode.clone(),
417 self.tls_cert_path.clone(),
418 self.tls_key_path.clone(),
419 );
420
421 if let Some(addr) = &self.http_addr {
422 opts.http.addr.clone_from(addr);
423 }
424
425 if let Some(data_home) = &self.data_home {
426 opts.storage.data_home.clone_from(data_home);
427 }
428
429 if opts.logging.dir.is_empty() {
431 opts.logging.dir = Path::new(&opts.storage.data_home)
432 .join(DEFAULT_LOGGING_DIR)
433 .to_string_lossy()
434 .to_string();
435 }
436
437 if let Some(addr) = &self.rpc_bind_addr {
438 let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
440 if addr.eq(&datanode_grpc_addr) {
441 return error::IllegalConfigSnafu {
442 msg: format!(
443 "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
444 ),
445 }.fail();
446 }
447 opts.grpc.bind_addr.clone_from(addr)
448 }
449
450 if let Some(addr) = &self.mysql_addr {
451 opts.mysql.enable = true;
452 opts.mysql.addr.clone_from(addr);
453 opts.mysql.tls = tls_opts.clone();
454 }
455
456 if let Some(addr) = &self.postgres_addr {
457 opts.postgres.enable = true;
458 opts.postgres.addr.clone_from(addr);
459 opts.postgres.tls = tls_opts;
460 }
461
462 if self.influxdb_enable {
463 opts.influxdb.enable = self.influxdb_enable;
464 }
465
466 if let Some(user_provider) = &self.user_provider {
467 opts.user_provider = Some(user_provider.clone());
468 }
469
470 Ok(())
471 }
472
473 #[allow(unreachable_code)]
474 #[allow(unused_variables)]
475 #[allow(clippy::diverging_sub_expression)]
476 pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
478 common_runtime::init_global_runtimes(&opts.runtime);
479
480 let guard = common_telemetry::init_global_logging(
481 APP_NAME,
482 &opts.component.logging,
483 &opts.component.tracing,
484 None,
485 opts.component.slow_query.as_ref(),
486 );
487
488 log_versions(version(), short_version(), APP_NAME);
489 create_resource_limit_metrics(APP_NAME);
490
491 info!("Standalone start command: {:#?}", self);
492 info!("Standalone options: {opts:#?}");
493
494 let mut plugins = Plugins::new();
495 let plugin_opts = opts.plugins;
496 let mut opts = opts.component;
497 opts.grpc.detect_server_addr();
498 let fe_opts = opts.frontend_options();
499 let dn_opts = opts.datanode_options();
500
501 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
502 .await
503 .context(error::StartFrontendSnafu)?;
504
505 plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
506 .await
507 .context(error::StartDatanodeSnafu)?;
508
509 set_default_timezone(fe_opts.default_timezone.as_deref())
510 .context(error::InitTimezoneSnafu)?;
511
512 let data_home = &dn_opts.storage.data_home;
513 fs::create_dir_all(path::Path::new(data_home))
515 .context(error::CreateDirSnafu { dir: data_home })?;
516
517 let metadata_dir = metadata_store_dir(data_home);
518 let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
519 metadata_dir,
520 opts.metadata_store,
521 opts.procedure,
522 )
523 .await
524 .context(error::StartFrontendSnafu)?;
525
526 let layered_cache_builder = LayeredCacheRegistryBuilder::default();
528 let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
529 let layered_cache_registry = Arc::new(
530 with_default_composite_cache_registry(
531 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
532 )
533 .context(error::BuildCacheRegistrySnafu)?
534 .build(),
535 );
536
537 let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
538 builder.with_cache_registry(layered_cache_registry.clone());
539 let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
540
541 let information_extension = Arc::new(StandaloneInformationExtension::new(
542 datanode.region_server(),
543 procedure_manager.clone(),
544 ));
545
546 let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
547 let catalog_manager = KvBackendCatalogManager::new(
548 information_extension.clone(),
549 kv_backend.clone(),
550 layered_cache_registry.clone(),
551 Some(procedure_manager.clone()),
552 Some(process_manager.clone()),
553 );
554
555 let table_metadata_manager =
556 Self::create_table_metadata_manager(kv_backend.clone()).await?;
557
558 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
559 let flownode_options = FlownodeOptions {
560 flow: opts.flow.clone(),
561 ..Default::default()
562 };
563
564 let (frontend_client, frontend_instance_handler) =
567 FrontendClient::from_empty_grpc_handler(opts.query.clone());
568 let frontend_client = Arc::new(frontend_client);
569 let flow_builder = FlownodeBuilder::new(
570 flownode_options,
571 plugins.clone(),
572 table_metadata_manager.clone(),
573 catalog_manager.clone(),
574 flow_metadata_manager.clone(),
575 frontend_client.clone(),
576 );
577 let flownode = flow_builder
578 .build()
579 .await
580 .map_err(BoxedError::new)
581 .context(error::OtherSnafu)?;
582
583 {
585 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
586 information_extension
587 .set_flow_streaming_engine(flow_streaming_engine)
588 .await;
589 }
590
591 let node_manager = Arc::new(StandaloneDatanodeManager {
592 region_server: datanode.region_server(),
593 flow_server: flownode.flow_engine(),
594 });
595
596 let table_id_sequence = Arc::new(
597 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
598 .initial(MIN_USER_TABLE_ID as u64)
599 .step(10)
600 .build(),
601 );
602 let flow_id_sequence = Arc::new(
603 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
604 .initial(MIN_USER_FLOW_ID as u64)
605 .step(10)
606 .build(),
607 );
608 let kafka_options = opts.wal.clone().into();
609 let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
610 .await
611 .context(error::BuildWalOptionsAllocatorSnafu)?;
612 let wal_options_allocator = Arc::new(wal_options_allocator);
613 let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
614 table_id_sequence,
615 wal_options_allocator.clone(),
616 ));
617 let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
618 flow_id_sequence,
619 ));
620
621 let ddl_context = DdlContext {
622 node_manager: node_manager.clone(),
623 cache_invalidator: layered_cache_registry.clone(),
624 memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
625 leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
626 table_metadata_manager: table_metadata_manager.clone(),
627 table_metadata_allocator: table_metadata_allocator.clone(),
628 flow_metadata_manager: flow_metadata_manager.clone(),
629 flow_metadata_allocator: flow_metadata_allocator.clone(),
630 region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
631 };
632 let procedure_manager_c = procedure_manager.clone();
633
634 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
635 .context(error::InitDdlManagerSnafu)?;
636 #[cfg(feature = "enterprise")]
637 let ddl_manager = {
638 let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
639 plugins.get();
640 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
641 };
642 let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager);
643
644 let fe_instance = FrontendBuilder::new(
645 fe_opts.clone(),
646 kv_backend.clone(),
647 layered_cache_registry.clone(),
648 catalog_manager.clone(),
649 node_manager.clone(),
650 ddl_task_executor.clone(),
651 process_manager,
652 )
653 .with_plugin(plugins.clone())
654 .try_build()
655 .await
656 .context(error::StartFrontendSnafu)?;
657 let fe_instance = Arc::new(fe_instance);
658
659 let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
661 let weak_grpc_handler = Arc::downgrade(&grpc_handler);
662 frontend_instance_handler
663 .lock()
664 .unwrap()
665 .replace(weak_grpc_handler);
666
667 let flow_streaming_engine = flownode.flow_engine().streaming_engine();
669 let invoker = FrontendInvoker::build_from(
671 flow_streaming_engine.clone(),
672 catalog_manager.clone(),
673 kv_backend.clone(),
674 layered_cache_registry.clone(),
675 ddl_task_executor.clone(),
676 node_manager,
677 )
678 .await
679 .context(StartFlownodeSnafu)?;
680 flow_streaming_engine.set_frontend_invoker(invoker).await;
681
682 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
683 .context(error::ServersSnafu)?;
684
685 let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
686 .build()
687 .context(error::StartFrontendSnafu)?;
688
689 let frontend = Frontend {
690 instance: fe_instance,
691 servers,
692 heartbeat_task: None,
693 export_metrics_task,
694 };
695
696 #[cfg(feature = "enterprise")]
697 let components = Components {
698 plugins,
699 kv_backend,
700 frontend_client,
701 catalog_manager,
702 };
703
704 Ok(Instance {
705 datanode,
706 frontend,
707 flownode,
708 procedure_manager,
709 wal_options_allocator,
710 #[cfg(feature = "enterprise")]
711 components,
712 _guard: guard,
713 })
714 }
715
716 pub async fn create_table_metadata_manager(
717 kv_backend: KvBackendRef,
718 ) -> Result<TableMetadataManagerRef> {
719 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
720
721 table_metadata_manager
722 .init()
723 .await
724 .context(error::InitMetadataSnafu)?;
725
726 Ok(table_metadata_manager)
727 }
728}
729
730pub struct StandaloneInformationExtension {
731 region_server: RegionServer,
732 procedure_manager: ProcedureManagerRef,
733 start_time_ms: u64,
734 flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
735}
736
737impl StandaloneInformationExtension {
738 pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
739 Self {
740 region_server,
741 procedure_manager,
742 start_time_ms: common_time::util::current_time_millis() as u64,
743 flow_streaming_engine: RwLock::new(None),
744 }
745 }
746
747 pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
749 let mut guard = self.flow_streaming_engine.write().await;
750 *guard = Some(flow_streaming_engine);
751 }
752}
753
754#[async_trait::async_trait]
755impl InformationExtension for StandaloneInformationExtension {
756 type Error = catalog::error::Error;
757
758 async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
759 let build_info = common_version::build_info();
760 let node_info = NodeInfo {
761 peer: Peer {
765 id: 0,
766 addr: "".to_string(),
767 },
768 last_activity_ts: -1,
769 status: NodeStatus::Standalone,
770 version: build_info.version.to_string(),
771 git_commit: build_info.commit_short.to_string(),
772 start_time_ms: self.start_time_ms,
775 };
776 Ok(vec![node_info])
777 }
778
779 async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
780 self.procedure_manager
781 .list_procedures()
782 .await
783 .map_err(BoxedError::new)
784 .map(|procedures| {
785 procedures
786 .into_iter()
787 .map(|procedure| {
788 let status = procedure.state.as_str_name().to_string();
789 (status, procedure)
790 })
791 .collect::<Vec<_>>()
792 })
793 .context(catalog::error::ListProceduresSnafu)
794 }
795
796 async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
797 let stats = self
798 .region_server
799 .reportable_regions()
800 .into_iter()
801 .map(|stat| {
802 let region_stat = self
803 .region_server
804 .region_statistic(stat.region_id)
805 .unwrap_or_default();
806 RegionStat {
807 id: stat.region_id,
808 rcus: 0,
809 wcus: 0,
810 approximate_bytes: region_stat.estimated_disk_size(),
811 engine: stat.engine,
812 role: RegionRole::from(stat.role).into(),
813 num_rows: region_stat.num_rows,
814 memtable_size: region_stat.memtable_size,
815 manifest_size: region_stat.manifest_size,
816 sst_size: region_stat.sst_size,
817 index_size: region_stat.index_size,
818 region_manifest: region_stat.manifest.into(),
819 data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
820 metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
821 }
822 })
823 .collect::<Vec<_>>();
824 Ok(stats)
825 }
826
827 async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
828 Ok(Some(
829 self.flow_streaming_engine
830 .read()
831 .await
832 .as_ref()
833 .unwrap()
834 .gen_state_report()
835 .await,
836 ))
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use std::default::Default;
843 use std::io::Write;
844 use std::time::Duration;
845
846 use auth::{Identity, Password, UserProviderRef};
847 use common_base::readable_size::ReadableSize;
848 use common_config::ENV_VAR_SEP;
849 use common_test_util::temp_dir::create_named_temp_file;
850 use common_wal::config::DatanodeWalConfig;
851 use object_store::config::{FileConfig, GcsConfig};
852
853 use super::*;
854 use crate::options::GlobalOptions;
855
856 #[tokio::test]
857 async fn test_try_from_start_command_to_anymap() {
858 let fe_opts = FrontendOptions {
859 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
860 ..Default::default()
861 };
862
863 let mut plugins = Plugins::new();
864 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
865 .await
866 .unwrap();
867
868 let provider = plugins.get::<UserProviderRef>().unwrap();
869 let result = provider
870 .authenticate(
871 Identity::UserId("test", None),
872 Password::PlainText("test".to_string().into()),
873 )
874 .await;
875 let _ = result.unwrap();
876 }
877
878 #[test]
879 fn test_toml() {
880 let opts = StandaloneOptions::default();
881 let toml_string = toml::to_string(&opts).unwrap();
882 let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
883 }
884
885 #[test]
886 fn test_read_from_config_file() {
887 let mut file = create_named_temp_file();
888 let toml_str = r#"
889 enable_memory_catalog = true
890
891 [wal]
892 provider = "raft_engine"
893 dir = "./greptimedb_data/test/wal"
894 file_size = "1GB"
895 purge_threshold = "50GB"
896 purge_interval = "10m"
897 read_batch_size = 128
898 sync_write = false
899
900 [storage]
901 data_home = "./greptimedb_data/"
902 type = "File"
903
904 [[storage.providers]]
905 type = "Gcs"
906 bucket = "foo"
907 endpoint = "bar"
908
909 [[storage.providers]]
910 type = "S3"
911 access_key_id = "access_key_id"
912 secret_access_key = "secret_access_key"
913
914 [storage.compaction]
915 max_inflight_tasks = 3
916 max_files_in_level0 = 7
917 max_purge_tasks = 32
918
919 [storage.manifest]
920 checkpoint_margin = 9
921 gc_duration = '7s'
922
923 [http]
924 addr = "127.0.0.1:4000"
925 timeout = "33s"
926 body_limit = "128MB"
927
928 [opentsdb]
929 enable = true
930
931 [logging]
932 level = "debug"
933 dir = "./greptimedb_data/test/logs"
934 "#;
935 write!(file, "{}", toml_str).unwrap();
936 let cmd = StartCommand {
937 config_file: Some(file.path().to_str().unwrap().to_string()),
938 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
939 ..Default::default()
940 };
941
942 let options = cmd
943 .load_options(&GlobalOptions::default())
944 .unwrap()
945 .component;
946 let fe_opts = options.frontend_options();
947 let dn_opts = options.datanode_options();
948 let logging_opts = options.logging;
949 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
950 assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
951 assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
952 assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
953 assert!(fe_opts.mysql.enable);
954 assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
955 assert_eq!(2, fe_opts.mysql.runtime_size);
956 assert_eq!(None, fe_opts.mysql.reject_no_database);
957 assert!(fe_opts.influxdb.enable);
958 assert!(fe_opts.opentsdb.enable);
959
960 let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
961 unreachable!()
962 };
963 assert_eq!(
964 "./greptimedb_data/test/wal",
965 raft_engine_config.dir.unwrap()
966 );
967
968 assert!(matches!(
969 &dn_opts.storage.store,
970 object_store::config::ObjectStoreConfig::File(FileConfig { .. })
971 ));
972 assert_eq!(dn_opts.storage.providers.len(), 2);
973 assert!(matches!(
974 dn_opts.storage.providers[0],
975 object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
976 ));
977 match &dn_opts.storage.providers[1] {
978 object_store::config::ObjectStoreConfig::S3(s3_config) => {
979 assert_eq!(
980 "SecretBox<alloc::string::String>([REDACTED])".to_string(),
981 format!("{:?}", s3_config.access_key_id)
982 );
983 }
984 _ => {
985 unreachable!()
986 }
987 }
988
989 assert_eq!("debug", logging_opts.level.as_ref().unwrap());
990 assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
991 }
992
993 #[test]
994 fn test_load_log_options_from_cli() {
995 let cmd = StartCommand {
996 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
997 ..Default::default()
998 };
999
1000 let opts = cmd
1001 .load_options(&GlobalOptions {
1002 log_dir: Some("./greptimedb_data/test/logs".to_string()),
1003 log_level: Some("debug".to_string()),
1004
1005 #[cfg(feature = "tokio-console")]
1006 tokio_console_addr: None,
1007 })
1008 .unwrap()
1009 .component;
1010
1011 assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
1012 assert_eq!("debug", opts.logging.level.unwrap());
1013 }
1014
1015 #[test]
1016 fn test_config_precedence_order() {
1017 let mut file = create_named_temp_file();
1018 let toml_str = r#"
1019 [http]
1020 addr = "127.0.0.1:4000"
1021
1022 [logging]
1023 level = "debug"
1024 "#;
1025 write!(file, "{}", toml_str).unwrap();
1026
1027 let env_prefix = "STANDALONE_UT";
1028 temp_env::with_vars(
1029 [
1030 (
1031 [
1033 env_prefix.to_string(),
1034 "logging".to_uppercase(),
1035 "dir".to_uppercase(),
1036 ]
1037 .join(ENV_VAR_SEP),
1038 Some("/other/log/dir"),
1039 ),
1040 (
1041 [
1043 env_prefix.to_string(),
1044 "logging".to_uppercase(),
1045 "level".to_uppercase(),
1046 ]
1047 .join(ENV_VAR_SEP),
1048 Some("info"),
1049 ),
1050 (
1051 [
1053 env_prefix.to_string(),
1054 "http".to_uppercase(),
1055 "addr".to_uppercase(),
1056 ]
1057 .join(ENV_VAR_SEP),
1058 Some("127.0.0.1:24000"),
1059 ),
1060 ],
1061 || {
1062 let command = StartCommand {
1063 config_file: Some(file.path().to_str().unwrap().to_string()),
1064 http_addr: Some("127.0.0.1:14000".to_string()),
1065 env_prefix: env_prefix.to_string(),
1066 ..Default::default()
1067 };
1068
1069 let opts = command.load_options(&Default::default()).unwrap().component;
1070
1071 assert_eq!(opts.logging.dir, "/other/log/dir");
1073
1074 assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1076
1077 let fe_opts = opts.frontend_options();
1079 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1080 assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1081
1082 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1084 },
1085 );
1086 }
1087
1088 #[test]
1089 fn test_load_default_standalone_options() {
1090 let options =
1091 StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1092 let default_options = StandaloneOptions::default();
1093 assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1094 assert_eq!(options.http, default_options.http);
1095 assert_eq!(options.grpc, default_options.grpc);
1096 assert_eq!(options.mysql, default_options.mysql);
1097 assert_eq!(options.postgres, default_options.postgres);
1098 assert_eq!(options.opentsdb, default_options.opentsdb);
1099 assert_eq!(options.influxdb, default_options.influxdb);
1100 assert_eq!(options.prom_store, default_options.prom_store);
1101 assert_eq!(options.wal, default_options.wal);
1102 assert_eq!(options.metadata_store, default_options.metadata_store);
1103 assert_eq!(options.procedure, default_options.procedure);
1104 assert_eq!(options.logging, default_options.logging);
1105 assert_eq!(options.region_engine, default_options.region_engine);
1106 }
1107}