cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::sync::Arc;
17use std::{fs, path};
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_schema::InformationExtension;
22use catalog::kvbackend::KvBackendCatalogManager;
23use clap::Parser;
24use client::api::v1::meta::RegionRole;
25use common_base::readable_size::ReadableSize;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::cluster::{NodeInfo, NodeStatus};
33use common_meta::datanode::RegionStat;
34use common_meta::ddl::flow_meta::{FlowMetadataAllocator, FlowMetadataAllocatorRef};
35use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
36use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
37use common_meta::ddl_manager::DdlManager;
38use common_meta::key::flow::flow_state::FlowStat;
39use common_meta::key::flow::{FlowMetadataManager, FlowMetadataManagerRef};
40use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
41use common_meta::kv_backend::KvBackendRef;
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::peer::Peer;
44use common_meta::region_keeper::MemoryRegionKeeper;
45use common_meta::region_registry::LeaderRegionRegistry;
46use common_meta::sequence::SequenceBuilder;
47use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
48use common_procedure::{ProcedureInfo, ProcedureManagerRef};
49use common_telemetry::info;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_time::timezone::set_default_timezone;
52use common_version::{short_version, version};
53use common_wal::config::DatanodeWalConfig;
54use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
55use datanode::datanode::{Datanode, DatanodeBuilder};
56use datanode::region_server::RegionServer;
57use file_engine::config::EngineConfig as FileEngineConfig;
58use flow::{
59    FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
60    FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
61};
62use frontend::frontend::{Frontend, FrontendOptions};
63use frontend::instance::builder::FrontendBuilder;
64use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
65use frontend::server::Services;
66use frontend::service_config::{
67    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
68    PromStoreOptions,
69};
70use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
71use mito2::config::MitoConfig;
72use query::stats::StatementStatistics;
73use serde::{Deserialize, Serialize};
74use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
75use servers::grpc::GrpcOptions;
76use servers::http::HttpOptions;
77use servers::tls::{TlsMode, TlsOption};
78use snafu::ResultExt;
79use tokio::sync::RwLock;
80use tracing_appender::non_blocking::WorkerGuard;
81
82use crate::error::{Result, StartFlownodeSnafu};
83use crate::options::{GlobalOptions, GreptimeOptions};
84use crate::{error, log_versions, App};
85
86pub const APP_NAME: &str = "greptime-standalone";
87
88#[derive(Parser)]
89pub struct Command {
90    #[clap(subcommand)]
91    subcmd: SubCommand,
92}
93
94impl Command {
95    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
96        self.subcmd.build(opts).await
97    }
98
99    pub fn load_options(
100        &self,
101        global_options: &GlobalOptions,
102    ) -> Result<GreptimeOptions<StandaloneOptions>> {
103        self.subcmd.load_options(global_options)
104    }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109    Start(StartCommand),
110}
111
112impl SubCommand {
113    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
114        match self {
115            SubCommand::Start(cmd) => cmd.build(opts).await,
116        }
117    }
118
119    fn load_options(
120        &self,
121        global_options: &GlobalOptions,
122    ) -> Result<GreptimeOptions<StandaloneOptions>> {
123        match self {
124            SubCommand::Start(cmd) => cmd.load_options(global_options),
125        }
126    }
127}
128
129#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
130#[serde(default)]
131pub struct StandaloneOptions {
132    pub enable_telemetry: bool,
133    pub default_timezone: Option<String>,
134    pub http: HttpOptions,
135    pub grpc: GrpcOptions,
136    pub mysql: MysqlOptions,
137    pub postgres: PostgresOptions,
138    pub opentsdb: OpentsdbOptions,
139    pub influxdb: InfluxdbOptions,
140    pub jaeger: JaegerOptions,
141    pub prom_store: PromStoreOptions,
142    pub wal: DatanodeWalConfig,
143    pub storage: StorageConfig,
144    pub metadata_store: KvBackendConfig,
145    pub procedure: ProcedureConfig,
146    pub flow: FlowConfig,
147    pub logging: LoggingOptions,
148    pub user_provider: Option<String>,
149    /// Options for different store engines.
150    pub region_engine: Vec<RegionEngineConfig>,
151    pub export_metrics: ExportMetricsOption,
152    pub tracing: TracingOptions,
153    pub init_regions_in_background: bool,
154    pub init_regions_parallelism: usize,
155    pub max_in_flight_write_bytes: Option<ReadableSize>,
156}
157
158impl Default for StandaloneOptions {
159    fn default() -> Self {
160        Self {
161            enable_telemetry: true,
162            default_timezone: None,
163            http: HttpOptions::default(),
164            grpc: GrpcOptions::default(),
165            mysql: MysqlOptions::default(),
166            postgres: PostgresOptions::default(),
167            opentsdb: OpentsdbOptions::default(),
168            influxdb: InfluxdbOptions::default(),
169            jaeger: JaegerOptions::default(),
170            prom_store: PromStoreOptions::default(),
171            wal: DatanodeWalConfig::default(),
172            storage: StorageConfig::default(),
173            metadata_store: KvBackendConfig::default(),
174            procedure: ProcedureConfig::default(),
175            flow: FlowConfig::default(),
176            logging: LoggingOptions::default(),
177            export_metrics: ExportMetricsOption::default(),
178            user_provider: None,
179            region_engine: vec![
180                RegionEngineConfig::Mito(MitoConfig::default()),
181                RegionEngineConfig::File(FileEngineConfig::default()),
182            ],
183            tracing: TracingOptions::default(),
184            init_regions_in_background: false,
185            init_regions_parallelism: 16,
186            max_in_flight_write_bytes: None,
187        }
188    }
189}
190
191impl Configurable for StandaloneOptions {
192    fn env_list_keys() -> Option<&'static [&'static str]> {
193        Some(&["wal.broker_endpoints"])
194    }
195}
196
197/// The [`StandaloneOptions`] is only defined in cmd crate,
198/// we don't want to make `frontend` depends on it, so impl [`Into`]
199/// rather than [`From`].
200#[allow(clippy::from_over_into)]
201impl Into<FrontendOptions> for StandaloneOptions {
202    fn into(self) -> FrontendOptions {
203        self.frontend_options()
204    }
205}
206
207impl StandaloneOptions {
208    pub fn frontend_options(&self) -> FrontendOptions {
209        let cloned_opts = self.clone();
210        FrontendOptions {
211            default_timezone: cloned_opts.default_timezone,
212            http: cloned_opts.http,
213            grpc: cloned_opts.grpc,
214            mysql: cloned_opts.mysql,
215            postgres: cloned_opts.postgres,
216            opentsdb: cloned_opts.opentsdb,
217            influxdb: cloned_opts.influxdb,
218            jaeger: cloned_opts.jaeger,
219            prom_store: cloned_opts.prom_store,
220            meta_client: None,
221            logging: cloned_opts.logging,
222            user_provider: cloned_opts.user_provider,
223            // Handle the export metrics task run by standalone to frontend for execution
224            export_metrics: cloned_opts.export_metrics,
225            max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
226            ..Default::default()
227        }
228    }
229
230    pub fn datanode_options(&self) -> DatanodeOptions {
231        let cloned_opts = self.clone();
232        DatanodeOptions {
233            node_id: Some(0),
234            enable_telemetry: cloned_opts.enable_telemetry,
235            wal: cloned_opts.wal,
236            storage: cloned_opts.storage,
237            region_engine: cloned_opts.region_engine,
238            grpc: cloned_opts.grpc,
239            init_regions_in_background: cloned_opts.init_regions_in_background,
240            init_regions_parallelism: cloned_opts.init_regions_parallelism,
241            ..Default::default()
242        }
243    }
244}
245
246pub struct Instance {
247    datanode: Datanode,
248    frontend: Frontend,
249    flownode: FlownodeInstance,
250    procedure_manager: ProcedureManagerRef,
251    wal_options_allocator: WalOptionsAllocatorRef,
252    // Keep the logging guard to prevent the worker from being dropped.
253    _guard: Vec<WorkerGuard>,
254}
255
256impl Instance {
257    /// Find the socket addr of a server by its `name`.
258    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
259        self.frontend.server_handlers().addr(name)
260    }
261}
262
263#[async_trait]
264impl App for Instance {
265    fn name(&self) -> &str {
266        APP_NAME
267    }
268
269    async fn start(&mut self) -> Result<()> {
270        self.datanode.start_telemetry();
271
272        self.procedure_manager
273            .start()
274            .await
275            .context(error::StartProcedureManagerSnafu)?;
276
277        self.wal_options_allocator
278            .start()
279            .await
280            .context(error::StartWalOptionsAllocatorSnafu)?;
281
282        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
283            .await
284            .context(error::StartFrontendSnafu)?;
285
286        self.frontend
287            .start()
288            .await
289            .context(error::StartFrontendSnafu)?;
290
291        self.flownode.start().await.context(StartFlownodeSnafu)?;
292
293        Ok(())
294    }
295
296    async fn stop(&mut self) -> Result<()> {
297        self.frontend
298            .shutdown()
299            .await
300            .context(error::ShutdownFrontendSnafu)?;
301
302        self.procedure_manager
303            .stop()
304            .await
305            .context(error::StopProcedureManagerSnafu)?;
306
307        self.datanode
308            .shutdown()
309            .await
310            .context(error::ShutdownDatanodeSnafu)?;
311
312        self.flownode
313            .shutdown()
314            .await
315            .context(error::ShutdownFlownodeSnafu)?;
316
317        info!("Datanode instance stopped.");
318
319        Ok(())
320    }
321}
322
323#[derive(Debug, Default, Parser)]
324pub struct StartCommand {
325    #[clap(long)]
326    http_addr: Option<String>,
327    #[clap(long, alias = "rpc-addr")]
328    rpc_bind_addr: Option<String>,
329    #[clap(long)]
330    mysql_addr: Option<String>,
331    #[clap(long)]
332    postgres_addr: Option<String>,
333    #[clap(short, long)]
334    influxdb_enable: bool,
335    #[clap(short, long)]
336    pub config_file: Option<String>,
337    #[clap(long)]
338    tls_mode: Option<TlsMode>,
339    #[clap(long)]
340    tls_cert_path: Option<String>,
341    #[clap(long)]
342    tls_key_path: Option<String>,
343    #[clap(long)]
344    user_provider: Option<String>,
345    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
346    pub env_prefix: String,
347    /// The working home directory of this standalone instance.
348    #[clap(long)]
349    data_home: Option<String>,
350}
351
352impl StartCommand {
353    /// Load the GreptimeDB options from various sources (command line, config file or env).
354    pub fn load_options(
355        &self,
356        global_options: &GlobalOptions,
357    ) -> Result<GreptimeOptions<StandaloneOptions>> {
358        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
359            self.config_file.as_deref(),
360            self.env_prefix.as_ref(),
361        )
362        .context(error::LoadLayeredConfigSnafu)?;
363
364        self.merge_with_cli_options(global_options, &mut opts.component)?;
365
366        Ok(opts)
367    }
368
369    // The precedence order is: cli > config file > environment variables > default values.
370    pub fn merge_with_cli_options(
371        &self,
372        global_options: &GlobalOptions,
373        opts: &mut StandaloneOptions,
374    ) -> Result<()> {
375        if let Some(dir) = &global_options.log_dir {
376            opts.logging.dir.clone_from(dir);
377        }
378
379        if global_options.log_level.is_some() {
380            opts.logging.level.clone_from(&global_options.log_level);
381        }
382
383        opts.tracing = TracingOptions {
384            #[cfg(feature = "tokio-console")]
385            tokio_console_addr: global_options.tokio_console_addr.clone(),
386        };
387
388        let tls_opts = TlsOption::new(
389            self.tls_mode.clone(),
390            self.tls_cert_path.clone(),
391            self.tls_key_path.clone(),
392        );
393
394        if let Some(addr) = &self.http_addr {
395            opts.http.addr.clone_from(addr);
396        }
397
398        if let Some(data_home) = &self.data_home {
399            opts.storage.data_home.clone_from(data_home);
400        }
401
402        if let Some(addr) = &self.rpc_bind_addr {
403            // frontend grpc addr conflict with datanode default grpc addr
404            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
405            if addr.eq(&datanode_grpc_addr) {
406                return error::IllegalConfigSnafu {
407                    msg: format!(
408                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
409                    ),
410                }.fail();
411            }
412            opts.grpc.bind_addr.clone_from(addr)
413        }
414
415        if let Some(addr) = &self.mysql_addr {
416            opts.mysql.enable = true;
417            opts.mysql.addr.clone_from(addr);
418            opts.mysql.tls = tls_opts.clone();
419        }
420
421        if let Some(addr) = &self.postgres_addr {
422            opts.postgres.enable = true;
423            opts.postgres.addr.clone_from(addr);
424            opts.postgres.tls = tls_opts;
425        }
426
427        if self.influxdb_enable {
428            opts.influxdb.enable = self.influxdb_enable;
429        }
430
431        if let Some(user_provider) = &self.user_provider {
432            opts.user_provider = Some(user_provider.clone());
433        }
434
435        Ok(())
436    }
437
438    #[allow(unreachable_code)]
439    #[allow(unused_variables)]
440    #[allow(clippy::diverging_sub_expression)]
441    /// Build GreptimeDB instance with the loaded options.
442    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
443        common_runtime::init_global_runtimes(&opts.runtime);
444
445        let guard = common_telemetry::init_global_logging(
446            APP_NAME,
447            &opts.component.logging,
448            &opts.component.tracing,
449            None,
450        );
451        log_versions(version(), short_version(), APP_NAME);
452
453        info!("Standalone start command: {:#?}", self);
454        info!("Standalone options: {opts:#?}");
455
456        let mut plugins = Plugins::new();
457        let plugin_opts = opts.plugins;
458        let mut opts = opts.component;
459        opts.grpc.detect_server_addr();
460        let fe_opts = opts.frontend_options();
461        let dn_opts = opts.datanode_options();
462
463        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
464            .await
465            .context(error::StartFrontendSnafu)?;
466
467        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
468            .await
469            .context(error::StartDatanodeSnafu)?;
470
471        set_default_timezone(fe_opts.default_timezone.as_deref())
472            .context(error::InitTimezoneSnafu)?;
473
474        let data_home = &dn_opts.storage.data_home;
475        // Ensure the data_home directory exists.
476        fs::create_dir_all(path::Path::new(data_home))
477            .context(error::CreateDirSnafu { dir: data_home })?;
478
479        let metadata_dir = metadata_store_dir(data_home);
480        let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
481            metadata_dir,
482            opts.metadata_store,
483            opts.procedure,
484        )
485        .await
486        .context(error::StartFrontendSnafu)?;
487
488        // Builds cache registry
489        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
490        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
491        let layered_cache_registry = Arc::new(
492            with_default_composite_cache_registry(
493                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
494            )
495            .context(error::BuildCacheRegistrySnafu)?
496            .build(),
497        );
498
499        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
500        builder.with_cache_registry(layered_cache_registry.clone());
501        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
502
503        let information_extension = Arc::new(StandaloneInformationExtension::new(
504            datanode.region_server(),
505            procedure_manager.clone(),
506        ));
507        let catalog_manager = KvBackendCatalogManager::new(
508            information_extension.clone(),
509            kv_backend.clone(),
510            layered_cache_registry.clone(),
511            Some(procedure_manager.clone()),
512        );
513
514        let table_metadata_manager =
515            Self::create_table_metadata_manager(kv_backend.clone()).await?;
516
517        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
518        let flownode_options = FlownodeOptions {
519            flow: opts.flow.clone(),
520            ..Default::default()
521        };
522
523        // for standalone not use grpc, but get a handler to frontend grpc client without
524        // actually make a connection
525        let (frontend_client, frontend_instance_handler) =
526            FrontendClient::from_empty_grpc_handler();
527        let flow_builder = FlownodeBuilder::new(
528            flownode_options,
529            plugins.clone(),
530            table_metadata_manager.clone(),
531            catalog_manager.clone(),
532            flow_metadata_manager.clone(),
533            Arc::new(frontend_client.clone()),
534        );
535        let flownode = flow_builder
536            .build()
537            .await
538            .map_err(BoxedError::new)
539            .context(error::OtherSnafu)?;
540
541        // set the ref to query for the local flow state
542        {
543            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
544            information_extension
545                .set_flow_streaming_engine(flow_streaming_engine)
546                .await;
547        }
548
549        let node_manager = Arc::new(StandaloneDatanodeManager {
550            region_server: datanode.region_server(),
551            flow_server: flownode.flow_engine(),
552        });
553
554        let table_id_sequence = Arc::new(
555            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
556                .initial(MIN_USER_TABLE_ID as u64)
557                .step(10)
558                .build(),
559        );
560        let flow_id_sequence = Arc::new(
561            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
562                .initial(MIN_USER_FLOW_ID as u64)
563                .step(10)
564                .build(),
565        );
566        let kafka_options = opts.wal.clone().into();
567        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
568            .await
569            .context(error::BuildWalOptionsAllocatorSnafu)?;
570        let wal_options_allocator = Arc::new(wal_options_allocator);
571        let table_meta_allocator = Arc::new(TableMetadataAllocator::new(
572            table_id_sequence,
573            wal_options_allocator.clone(),
574        ));
575        let flow_meta_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
576            flow_id_sequence,
577        ));
578
579        let ddl_task_executor = Self::create_ddl_task_executor(
580            procedure_manager.clone(),
581            node_manager.clone(),
582            layered_cache_registry.clone(),
583            table_metadata_manager,
584            table_meta_allocator,
585            flow_metadata_manager,
586            flow_meta_allocator,
587        )
588        .await?;
589
590        let fe_instance = FrontendBuilder::new(
591            fe_opts.clone(),
592            kv_backend.clone(),
593            layered_cache_registry.clone(),
594            catalog_manager.clone(),
595            node_manager.clone(),
596            ddl_task_executor.clone(),
597            StatementStatistics::new(opts.logging.slow_query.clone()),
598        )
599        .with_plugin(plugins.clone())
600        .try_build()
601        .await
602        .context(error::StartFrontendSnafu)?;
603        let fe_instance = Arc::new(fe_instance);
604
605        // set the frontend client for flownode
606        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
607        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
608        frontend_instance_handler
609            .lock()
610            .unwrap()
611            .replace(weak_grpc_handler);
612
613        // set the frontend invoker for flownode
614        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
615        // flow server need to be able to use frontend to write insert requests back
616        let invoker = FrontendInvoker::build_from(
617            flow_streaming_engine.clone(),
618            catalog_manager.clone(),
619            kv_backend.clone(),
620            layered_cache_registry.clone(),
621            ddl_task_executor.clone(),
622            node_manager,
623        )
624        .await
625        .context(error::StartFlownodeSnafu)?;
626        flow_streaming_engine.set_frontend_invoker(invoker).await;
627
628        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
629            .context(error::ServersSnafu)?;
630
631        let servers = Services::new(opts, fe_instance.clone(), plugins)
632            .build()
633            .context(error::StartFrontendSnafu)?;
634
635        let frontend = Frontend {
636            instance: fe_instance,
637            servers,
638            heartbeat_task: None,
639            export_metrics_task,
640        };
641
642        Ok(Instance {
643            datanode,
644            frontend,
645            flownode,
646            procedure_manager,
647            wal_options_allocator,
648            _guard: guard,
649        })
650    }
651
652    pub async fn create_ddl_task_executor(
653        procedure_manager: ProcedureManagerRef,
654        node_manager: NodeManagerRef,
655        cache_invalidator: CacheInvalidatorRef,
656        table_metadata_manager: TableMetadataManagerRef,
657        table_metadata_allocator: TableMetadataAllocatorRef,
658        flow_metadata_manager: FlowMetadataManagerRef,
659        flow_metadata_allocator: FlowMetadataAllocatorRef,
660    ) -> Result<ProcedureExecutorRef> {
661        let procedure_executor: ProcedureExecutorRef = Arc::new(
662            DdlManager::try_new(
663                DdlContext {
664                    node_manager,
665                    cache_invalidator,
666                    memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
667                    leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
668                    table_metadata_manager,
669                    table_metadata_allocator,
670                    flow_metadata_manager,
671                    flow_metadata_allocator,
672                    region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
673                },
674                procedure_manager,
675                true,
676            )
677            .context(error::InitDdlManagerSnafu)?,
678        );
679
680        Ok(procedure_executor)
681    }
682
683    pub async fn create_table_metadata_manager(
684        kv_backend: KvBackendRef,
685    ) -> Result<TableMetadataManagerRef> {
686        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
687
688        table_metadata_manager
689            .init()
690            .await
691            .context(error::InitMetadataSnafu)?;
692
693        Ok(table_metadata_manager)
694    }
695}
696
697pub struct StandaloneInformationExtension {
698    region_server: RegionServer,
699    procedure_manager: ProcedureManagerRef,
700    start_time_ms: u64,
701    flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
702}
703
704impl StandaloneInformationExtension {
705    pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
706        Self {
707            region_server,
708            procedure_manager,
709            start_time_ms: common_time::util::current_time_millis() as u64,
710            flow_streaming_engine: RwLock::new(None),
711        }
712    }
713
714    /// Set the flow streaming engine for the standalone instance.
715    pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
716        let mut guard = self.flow_streaming_engine.write().await;
717        *guard = Some(flow_streaming_engine);
718    }
719}
720
721#[async_trait::async_trait]
722impl InformationExtension for StandaloneInformationExtension {
723    type Error = catalog::error::Error;
724
725    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
726        let build_info = common_version::build_info();
727        let node_info = NodeInfo {
728            // For the standalone:
729            // - id always 0
730            // - empty string for peer_addr
731            peer: Peer {
732                id: 0,
733                addr: "".to_string(),
734            },
735            last_activity_ts: -1,
736            status: NodeStatus::Standalone,
737            version: build_info.version.to_string(),
738            git_commit: build_info.commit_short.to_string(),
739            // Use `self.start_time_ms` instead.
740            // It's not precise but enough.
741            start_time_ms: self.start_time_ms,
742        };
743        Ok(vec![node_info])
744    }
745
746    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
747        self.procedure_manager
748            .list_procedures()
749            .await
750            .map_err(BoxedError::new)
751            .map(|procedures| {
752                procedures
753                    .into_iter()
754                    .map(|procedure| {
755                        let status = procedure.state.as_str_name().to_string();
756                        (status, procedure)
757                    })
758                    .collect::<Vec<_>>()
759            })
760            .context(catalog::error::ListProceduresSnafu)
761    }
762
763    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
764        let stats = self
765            .region_server
766            .reportable_regions()
767            .into_iter()
768            .map(|stat| {
769                let region_stat = self
770                    .region_server
771                    .region_statistic(stat.region_id)
772                    .unwrap_or_default();
773                RegionStat {
774                    id: stat.region_id,
775                    rcus: 0,
776                    wcus: 0,
777                    approximate_bytes: region_stat.estimated_disk_size(),
778                    engine: stat.engine,
779                    role: RegionRole::from(stat.role).into(),
780                    num_rows: region_stat.num_rows,
781                    memtable_size: region_stat.memtable_size,
782                    manifest_size: region_stat.manifest_size,
783                    sst_size: region_stat.sst_size,
784                    index_size: region_stat.index_size,
785                    region_manifest: region_stat.manifest.into(),
786                    data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
787                    metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
788                }
789            })
790            .collect::<Vec<_>>();
791        Ok(stats)
792    }
793
794    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
795        Ok(Some(
796            self.flow_streaming_engine
797                .read()
798                .await
799                .as_ref()
800                .unwrap()
801                .gen_state_report()
802                .await,
803        ))
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use std::default::Default;
810    use std::io::Write;
811    use std::time::Duration;
812
813    use auth::{Identity, Password, UserProviderRef};
814    use common_base::readable_size::ReadableSize;
815    use common_config::ENV_VAR_SEP;
816    use common_test_util::temp_dir::create_named_temp_file;
817    use common_wal::config::DatanodeWalConfig;
818    use datanode::config::{FileConfig, GcsConfig};
819
820    use super::*;
821    use crate::options::GlobalOptions;
822
823    #[tokio::test]
824    async fn test_try_from_start_command_to_anymap() {
825        let fe_opts = FrontendOptions {
826            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
827            ..Default::default()
828        };
829
830        let mut plugins = Plugins::new();
831        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
832            .await
833            .unwrap();
834
835        let provider = plugins.get::<UserProviderRef>().unwrap();
836        let result = provider
837            .authenticate(
838                Identity::UserId("test", None),
839                Password::PlainText("test".to_string().into()),
840            )
841            .await;
842        let _ = result.unwrap();
843    }
844
845    #[test]
846    fn test_toml() {
847        let opts = StandaloneOptions::default();
848        let toml_string = toml::to_string(&opts).unwrap();
849        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
850    }
851
852    #[test]
853    fn test_read_from_config_file() {
854        let mut file = create_named_temp_file();
855        let toml_str = r#"
856            enable_memory_catalog = true
857
858            [wal]
859            provider = "raft_engine"
860            dir = "./greptimedb_data/test/wal"
861            file_size = "1GB"
862            purge_threshold = "50GB"
863            purge_interval = "10m"
864            read_batch_size = 128
865            sync_write = false
866
867            [storage]
868            data_home = "./greptimedb_data/"
869            type = "File"
870
871            [[storage.providers]]
872            type = "Gcs"
873            bucket = "foo"
874            endpoint = "bar"
875
876            [[storage.providers]]
877            type = "S3"
878            access_key_id = "access_key_id"
879            secret_access_key = "secret_access_key"
880
881            [storage.compaction]
882            max_inflight_tasks = 3
883            max_files_in_level0 = 7
884            max_purge_tasks = 32
885
886            [storage.manifest]
887            checkpoint_margin = 9
888            gc_duration = '7s'
889
890            [http]
891            addr = "127.0.0.1:4000"
892            timeout = "33s"
893            body_limit = "128MB"
894
895            [opentsdb]
896            enable = true
897
898            [logging]
899            level = "debug"
900            dir = "./greptimedb_data/test/logs"
901        "#;
902        write!(file, "{}", toml_str).unwrap();
903        let cmd = StartCommand {
904            config_file: Some(file.path().to_str().unwrap().to_string()),
905            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
906            ..Default::default()
907        };
908
909        let options = cmd
910            .load_options(&GlobalOptions::default())
911            .unwrap()
912            .component;
913        let fe_opts = options.frontend_options();
914        let dn_opts = options.datanode_options();
915        let logging_opts = options.logging;
916        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
917        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
918        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
919        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
920        assert!(fe_opts.mysql.enable);
921        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
922        assert_eq!(2, fe_opts.mysql.runtime_size);
923        assert_eq!(None, fe_opts.mysql.reject_no_database);
924        assert!(fe_opts.influxdb.enable);
925        assert!(fe_opts.opentsdb.enable);
926
927        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
928            unreachable!()
929        };
930        assert_eq!(
931            "./greptimedb_data/test/wal",
932            raft_engine_config.dir.unwrap()
933        );
934
935        assert!(matches!(
936            &dn_opts.storage.store,
937            datanode::config::ObjectStoreConfig::File(FileConfig { .. })
938        ));
939        assert_eq!(dn_opts.storage.providers.len(), 2);
940        assert!(matches!(
941            dn_opts.storage.providers[0],
942            datanode::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
943        ));
944        match &dn_opts.storage.providers[1] {
945            datanode::config::ObjectStoreConfig::S3(s3_config) => {
946                assert_eq!(
947                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
948                    format!("{:?}", s3_config.access_key_id)
949                );
950            }
951            _ => {
952                unreachable!()
953            }
954        }
955
956        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
957        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
958    }
959
960    #[test]
961    fn test_load_log_options_from_cli() {
962        let cmd = StartCommand {
963            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
964            ..Default::default()
965        };
966
967        let opts = cmd
968            .load_options(&GlobalOptions {
969                log_dir: Some("./greptimedb_data/test/logs".to_string()),
970                log_level: Some("debug".to_string()),
971
972                #[cfg(feature = "tokio-console")]
973                tokio_console_addr: None,
974            })
975            .unwrap()
976            .component;
977
978        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
979        assert_eq!("debug", opts.logging.level.unwrap());
980    }
981
982    #[test]
983    fn test_config_precedence_order() {
984        let mut file = create_named_temp_file();
985        let toml_str = r#"
986            [http]
987            addr = "127.0.0.1:4000"
988
989            [logging]
990            level = "debug"
991        "#;
992        write!(file, "{}", toml_str).unwrap();
993
994        let env_prefix = "STANDALONE_UT";
995        temp_env::with_vars(
996            [
997                (
998                    // logging.dir = /other/log/dir
999                    [
1000                        env_prefix.to_string(),
1001                        "logging".to_uppercase(),
1002                        "dir".to_uppercase(),
1003                    ]
1004                    .join(ENV_VAR_SEP),
1005                    Some("/other/log/dir"),
1006                ),
1007                (
1008                    // logging.level = info
1009                    [
1010                        env_prefix.to_string(),
1011                        "logging".to_uppercase(),
1012                        "level".to_uppercase(),
1013                    ]
1014                    .join(ENV_VAR_SEP),
1015                    Some("info"),
1016                ),
1017                (
1018                    // http.addr = 127.0.0.1:24000
1019                    [
1020                        env_prefix.to_string(),
1021                        "http".to_uppercase(),
1022                        "addr".to_uppercase(),
1023                    ]
1024                    .join(ENV_VAR_SEP),
1025                    Some("127.0.0.1:24000"),
1026                ),
1027            ],
1028            || {
1029                let command = StartCommand {
1030                    config_file: Some(file.path().to_str().unwrap().to_string()),
1031                    http_addr: Some("127.0.0.1:14000".to_string()),
1032                    env_prefix: env_prefix.to_string(),
1033                    ..Default::default()
1034                };
1035
1036                let opts = command.load_options(&Default::default()).unwrap().component;
1037
1038                // Should be read from env, env > default values.
1039                assert_eq!(opts.logging.dir, "/other/log/dir");
1040
1041                // Should be read from config file, config file > env > default values.
1042                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1043
1044                // Should be read from cli, cli > config file > env > default values.
1045                let fe_opts = opts.frontend_options();
1046                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1047                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1048
1049                // Should be default value.
1050                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1051            },
1052        );
1053    }
1054
1055    #[test]
1056    fn test_load_default_standalone_options() {
1057        let options =
1058            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1059        let default_options = StandaloneOptions::default();
1060        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1061        assert_eq!(options.http, default_options.http);
1062        assert_eq!(options.grpc, default_options.grpc);
1063        assert_eq!(options.mysql, default_options.mysql);
1064        assert_eq!(options.postgres, default_options.postgres);
1065        assert_eq!(options.opentsdb, default_options.opentsdb);
1066        assert_eq!(options.influxdb, default_options.influxdb);
1067        assert_eq!(options.prom_store, default_options.prom_store);
1068        assert_eq!(options.wal, default_options.wal);
1069        assert_eq!(options.metadata_store, default_options.metadata_store);
1070        assert_eq!(options.procedure, default_options.procedure);
1071        assert_eq!(options.logging, default_options.logging);
1072        assert_eq!(options.region_engine, default_options.region_engine);
1073    }
1074}