cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtension;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use client::api::v1::meta::RegionRole;
27use common_base::readable_size::ReadableSize;
28use common_base::Plugins;
29use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
30use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
31use common_error::ext::BoxedError;
32use common_meta::cache::LayeredCacheRegistryBuilder;
33use common_meta::cluster::{NodeInfo, NodeStatus};
34use common_meta::datanode::RegionStat;
35use common_meta::ddl::flow_meta::FlowMetadataAllocator;
36use common_meta::ddl::table_meta::TableMetadataAllocator;
37use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
38use common_meta::ddl_manager::DdlManager;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::peer::Peer;
44use common_meta::procedure_executor::LocalProcedureExecutor;
45use common_meta::region_keeper::MemoryRegionKeeper;
46use common_meta::region_registry::LeaderRegionRegistry;
47use common_meta::sequence::SequenceBuilder;
48use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
49use common_options::memory::MemoryOptions;
50use common_procedure::{ProcedureInfo, ProcedureManagerRef};
51use common_telemetry::info;
52use common_telemetry::logging::{
53    LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
54};
55use common_time::timezone::set_default_timezone;
56use common_version::{short_version, verbose_version};
57use common_wal::config::DatanodeWalConfig;
58use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
59use datanode::datanode::{Datanode, DatanodeBuilder};
60use datanode::region_server::RegionServer;
61use file_engine::config::EngineConfig as FileEngineConfig;
62use flow::{
63    FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
64    FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
65};
66use frontend::frontend::{Frontend, FrontendOptions};
67use frontend::instance::builder::FrontendBuilder;
68use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
69use frontend::server::Services;
70use frontend::service_config::{
71    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
72    PromStoreOptions,
73};
74use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
75use mito2::config::MitoConfig;
76use query::options::QueryOptions;
77use serde::{Deserialize, Serialize};
78use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
79use servers::grpc::GrpcOptions;
80use servers::http::HttpOptions;
81use servers::tls::{TlsMode, TlsOption};
82use snafu::ResultExt;
83use tokio::sync::RwLock;
84use tracing_appender::non_blocking::WorkerGuard;
85
86use crate::error::{Result, StartFlownodeSnafu};
87use crate::options::{GlobalOptions, GreptimeOptions};
88use crate::{create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile, App};
89
90pub const APP_NAME: &str = "greptime-standalone";
91
92#[derive(Parser)]
93pub struct Command {
94    #[clap(subcommand)]
95    subcmd: SubCommand,
96}
97
98impl Command {
99    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
100        self.subcmd.build(opts).await
101    }
102
103    pub fn load_options(
104        &self,
105        global_options: &GlobalOptions,
106    ) -> Result<GreptimeOptions<StandaloneOptions>> {
107        self.subcmd.load_options(global_options)
108    }
109}
110
111#[derive(Parser)]
112enum SubCommand {
113    Start(StartCommand),
114}
115
116impl SubCommand {
117    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
118        match self {
119            SubCommand::Start(cmd) => cmd.build(opts).await,
120        }
121    }
122
123    fn load_options(
124        &self,
125        global_options: &GlobalOptions,
126    ) -> Result<GreptimeOptions<StandaloneOptions>> {
127        match self {
128            SubCommand::Start(cmd) => cmd.load_options(global_options),
129        }
130    }
131}
132
133#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
134#[serde(default)]
135pub struct StandaloneOptions {
136    pub enable_telemetry: bool,
137    pub default_timezone: Option<String>,
138    pub http: HttpOptions,
139    pub grpc: GrpcOptions,
140    pub mysql: MysqlOptions,
141    pub postgres: PostgresOptions,
142    pub opentsdb: OpentsdbOptions,
143    pub influxdb: InfluxdbOptions,
144    pub jaeger: JaegerOptions,
145    pub prom_store: PromStoreOptions,
146    pub wal: DatanodeWalConfig,
147    pub storage: StorageConfig,
148    pub metadata_store: KvBackendConfig,
149    pub procedure: ProcedureConfig,
150    pub flow: FlowConfig,
151    pub logging: LoggingOptions,
152    pub user_provider: Option<String>,
153    /// Options for different store engines.
154    pub region_engine: Vec<RegionEngineConfig>,
155    pub export_metrics: ExportMetricsOption,
156    pub tracing: TracingOptions,
157    pub init_regions_in_background: bool,
158    pub init_regions_parallelism: usize,
159    pub max_in_flight_write_bytes: Option<ReadableSize>,
160    pub slow_query: SlowQueryOptions,
161    pub query: QueryOptions,
162    pub memory: MemoryOptions,
163}
164
165impl Default for StandaloneOptions {
166    fn default() -> Self {
167        Self {
168            enable_telemetry: true,
169            default_timezone: None,
170            http: HttpOptions::default(),
171            grpc: GrpcOptions::default(),
172            mysql: MysqlOptions::default(),
173            postgres: PostgresOptions::default(),
174            opentsdb: OpentsdbOptions::default(),
175            influxdb: InfluxdbOptions::default(),
176            jaeger: JaegerOptions::default(),
177            prom_store: PromStoreOptions::default(),
178            wal: DatanodeWalConfig::default(),
179            storage: StorageConfig::default(),
180            metadata_store: KvBackendConfig::default(),
181            procedure: ProcedureConfig::default(),
182            flow: FlowConfig::default(),
183            logging: LoggingOptions::default(),
184            export_metrics: ExportMetricsOption::default(),
185            user_provider: None,
186            region_engine: vec![
187                RegionEngineConfig::Mito(MitoConfig::default()),
188                RegionEngineConfig::File(FileEngineConfig::default()),
189            ],
190            tracing: TracingOptions::default(),
191            init_regions_in_background: false,
192            init_regions_parallelism: 16,
193            max_in_flight_write_bytes: None,
194            slow_query: SlowQueryOptions::default(),
195            query: QueryOptions::default(),
196            memory: MemoryOptions::default(),
197        }
198    }
199}
200
201impl Configurable for StandaloneOptions {
202    fn env_list_keys() -> Option<&'static [&'static str]> {
203        Some(&["wal.broker_endpoints"])
204    }
205}
206
207/// The [`StandaloneOptions`] is only defined in cmd crate,
208/// we don't want to make `frontend` depends on it, so impl [`Into`]
209/// rather than [`From`].
210#[allow(clippy::from_over_into)]
211impl Into<FrontendOptions> for StandaloneOptions {
212    fn into(self) -> FrontendOptions {
213        self.frontend_options()
214    }
215}
216
217impl StandaloneOptions {
218    pub fn frontend_options(&self) -> FrontendOptions {
219        let cloned_opts = self.clone();
220        FrontendOptions {
221            default_timezone: cloned_opts.default_timezone,
222            http: cloned_opts.http,
223            grpc: cloned_opts.grpc,
224            mysql: cloned_opts.mysql,
225            postgres: cloned_opts.postgres,
226            opentsdb: cloned_opts.opentsdb,
227            influxdb: cloned_opts.influxdb,
228            jaeger: cloned_opts.jaeger,
229            prom_store: cloned_opts.prom_store,
230            meta_client: None,
231            logging: cloned_opts.logging,
232            user_provider: cloned_opts.user_provider,
233            // Handle the export metrics task run by standalone to frontend for execution
234            export_metrics: cloned_opts.export_metrics,
235            max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
236            slow_query: cloned_opts.slow_query,
237            ..Default::default()
238        }
239    }
240
241    pub fn datanode_options(&self) -> DatanodeOptions {
242        let cloned_opts = self.clone();
243        DatanodeOptions {
244            node_id: Some(0),
245            enable_telemetry: cloned_opts.enable_telemetry,
246            wal: cloned_opts.wal,
247            storage: cloned_opts.storage,
248            region_engine: cloned_opts.region_engine,
249            grpc: cloned_opts.grpc,
250            init_regions_in_background: cloned_opts.init_regions_in_background,
251            init_regions_parallelism: cloned_opts.init_regions_parallelism,
252            query: cloned_opts.query,
253            ..Default::default()
254        }
255    }
256}
257
258pub struct Instance {
259    datanode: Datanode,
260    frontend: Frontend,
261    flownode: FlownodeInstance,
262    procedure_manager: ProcedureManagerRef,
263    wal_options_allocator: WalOptionsAllocatorRef,
264
265    // The components of standalone, which make it easier to expand based
266    // on the components.
267    #[cfg(feature = "enterprise")]
268    components: Components,
269
270    // Keep the logging guard to prevent the worker from being dropped.
271    _guard: Vec<WorkerGuard>,
272}
273
274#[cfg(feature = "enterprise")]
275pub struct Components {
276    pub plugins: Plugins,
277    pub kv_backend: KvBackendRef,
278    pub frontend_client: Arc<FrontendClient>,
279    pub catalog_manager: catalog::CatalogManagerRef,
280}
281
282impl Instance {
283    /// Find the socket addr of a server by its `name`.
284    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
285        self.frontend.server_handlers().addr(name)
286    }
287
288    #[cfg(feature = "enterprise")]
289    pub fn components(&self) -> &Components {
290        &self.components
291    }
292}
293
294#[async_trait]
295impl App for Instance {
296    fn name(&self) -> &str {
297        APP_NAME
298    }
299
300    async fn start(&mut self) -> Result<()> {
301        self.datanode.start_telemetry();
302
303        self.procedure_manager
304            .start()
305            .await
306            .context(error::StartProcedureManagerSnafu)?;
307
308        self.wal_options_allocator
309            .start()
310            .await
311            .context(error::StartWalOptionsAllocatorSnafu)?;
312
313        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
314            .await
315            .context(error::StartFrontendSnafu)?;
316
317        self.frontend
318            .start()
319            .await
320            .context(error::StartFrontendSnafu)?;
321
322        self.flownode.start().await.context(StartFlownodeSnafu)?;
323
324        Ok(())
325    }
326
327    async fn stop(&mut self) -> Result<()> {
328        self.frontend
329            .shutdown()
330            .await
331            .context(error::ShutdownFrontendSnafu)?;
332
333        self.procedure_manager
334            .stop()
335            .await
336            .context(error::StopProcedureManagerSnafu)?;
337
338        self.datanode
339            .shutdown()
340            .await
341            .context(error::ShutdownDatanodeSnafu)?;
342
343        self.flownode
344            .shutdown()
345            .await
346            .context(error::ShutdownFlownodeSnafu)?;
347
348        info!("Datanode instance stopped.");
349
350        Ok(())
351    }
352}
353
354#[derive(Debug, Default, Parser)]
355pub struct StartCommand {
356    #[clap(long)]
357    http_addr: Option<String>,
358    #[clap(long, alias = "rpc-addr")]
359    rpc_bind_addr: Option<String>,
360    #[clap(long)]
361    mysql_addr: Option<String>,
362    #[clap(long)]
363    postgres_addr: Option<String>,
364    #[clap(short, long)]
365    influxdb_enable: bool,
366    #[clap(short, long)]
367    pub config_file: Option<String>,
368    #[clap(long)]
369    tls_mode: Option<TlsMode>,
370    #[clap(long)]
371    tls_cert_path: Option<String>,
372    #[clap(long)]
373    tls_key_path: Option<String>,
374    #[clap(long)]
375    user_provider: Option<String>,
376    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
377    pub env_prefix: String,
378    /// The working home directory of this standalone instance.
379    #[clap(long)]
380    data_home: Option<String>,
381}
382
383impl StartCommand {
384    /// Load the GreptimeDB options from various sources (command line, config file or env).
385    pub fn load_options(
386        &self,
387        global_options: &GlobalOptions,
388    ) -> Result<GreptimeOptions<StandaloneOptions>> {
389        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
390            self.config_file.as_deref(),
391            self.env_prefix.as_ref(),
392        )
393        .context(error::LoadLayeredConfigSnafu)?;
394
395        self.merge_with_cli_options(global_options, &mut opts.component)?;
396
397        Ok(opts)
398    }
399
400    // The precedence order is: cli > config file > environment variables > default values.
401    pub fn merge_with_cli_options(
402        &self,
403        global_options: &GlobalOptions,
404        opts: &mut StandaloneOptions,
405    ) -> Result<()> {
406        if let Some(dir) = &global_options.log_dir {
407            opts.logging.dir.clone_from(dir);
408        }
409
410        if global_options.log_level.is_some() {
411            opts.logging.level.clone_from(&global_options.log_level);
412        }
413
414        opts.tracing = TracingOptions {
415            #[cfg(feature = "tokio-console")]
416            tokio_console_addr: global_options.tokio_console_addr.clone(),
417        };
418
419        let tls_opts = TlsOption::new(
420            self.tls_mode.clone(),
421            self.tls_cert_path.clone(),
422            self.tls_key_path.clone(),
423        );
424
425        if let Some(addr) = &self.http_addr {
426            opts.http.addr.clone_from(addr);
427        }
428
429        if let Some(data_home) = &self.data_home {
430            opts.storage.data_home.clone_from(data_home);
431        }
432
433        // If the logging dir is not set, use the default logs dir in the data home.
434        if opts.logging.dir.is_empty() {
435            opts.logging.dir = Path::new(&opts.storage.data_home)
436                .join(DEFAULT_LOGGING_DIR)
437                .to_string_lossy()
438                .to_string();
439        }
440
441        if let Some(addr) = &self.rpc_bind_addr {
442            // frontend grpc addr conflict with datanode default grpc addr
443            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
444            if addr.eq(&datanode_grpc_addr) {
445                return error::IllegalConfigSnafu {
446                    msg: format!(
447                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
448                    ),
449                }.fail();
450            }
451            opts.grpc.bind_addr.clone_from(addr)
452        }
453
454        if let Some(addr) = &self.mysql_addr {
455            opts.mysql.enable = true;
456            opts.mysql.addr.clone_from(addr);
457            opts.mysql.tls = tls_opts.clone();
458        }
459
460        if let Some(addr) = &self.postgres_addr {
461            opts.postgres.enable = true;
462            opts.postgres.addr.clone_from(addr);
463            opts.postgres.tls = tls_opts;
464        }
465
466        if self.influxdb_enable {
467            opts.influxdb.enable = self.influxdb_enable;
468        }
469
470        if let Some(user_provider) = &self.user_provider {
471            opts.user_provider = Some(user_provider.clone());
472        }
473
474        Ok(())
475    }
476
477    #[allow(unreachable_code)]
478    #[allow(unused_variables)]
479    #[allow(clippy::diverging_sub_expression)]
480    /// Build GreptimeDB instance with the loaded options.
481    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
482        common_runtime::init_global_runtimes(&opts.runtime);
483
484        let guard = common_telemetry::init_global_logging(
485            APP_NAME,
486            &opts.component.logging,
487            &opts.component.tracing,
488            None,
489            Some(&opts.component.slow_query),
490        );
491
492        log_versions(verbose_version(), short_version(), APP_NAME);
493        maybe_activate_heap_profile(&opts.component.memory);
494        create_resource_limit_metrics(APP_NAME);
495
496        info!("Standalone start command: {:#?}", self);
497        info!("Standalone options: {opts:#?}");
498
499        let mut plugins = Plugins::new();
500        let plugin_opts = opts.plugins;
501        let mut opts = opts.component;
502        opts.grpc.detect_server_addr();
503        let fe_opts = opts.frontend_options();
504        let dn_opts = opts.datanode_options();
505
506        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
507            .await
508            .context(error::StartFrontendSnafu)?;
509
510        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
511            .await
512            .context(error::StartDatanodeSnafu)?;
513
514        set_default_timezone(fe_opts.default_timezone.as_deref())
515            .context(error::InitTimezoneSnafu)?;
516
517        let data_home = &dn_opts.storage.data_home;
518        // Ensure the data_home directory exists.
519        fs::create_dir_all(path::Path::new(data_home))
520            .context(error::CreateDirSnafu { dir: data_home })?;
521
522        let metadata_dir = metadata_store_dir(data_home);
523        let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
524            metadata_dir,
525            opts.metadata_store,
526            opts.procedure,
527        )
528        .await
529        .context(error::StartFrontendSnafu)?;
530
531        // Builds cache registry
532        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
533        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
534        let layered_cache_registry = Arc::new(
535            with_default_composite_cache_registry(
536                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
537            )
538            .context(error::BuildCacheRegistrySnafu)?
539            .build(),
540        );
541
542        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
543        builder.with_cache_registry(layered_cache_registry.clone());
544        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
545
546        let information_extension = Arc::new(StandaloneInformationExtension::new(
547            datanode.region_server(),
548            procedure_manager.clone(),
549        ));
550
551        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
552        let builder = KvBackendCatalogManagerBuilder::new(
553            information_extension.clone(),
554            kv_backend.clone(),
555            layered_cache_registry.clone(),
556        )
557        .with_procedure_manager(procedure_manager.clone())
558        .with_process_manager(process_manager.clone());
559        #[cfg(feature = "enterprise")]
560        let builder = if let Some(factories) = plugins.get() {
561            builder.with_extra_information_table_factories(factories)
562        } else {
563            builder
564        };
565        let catalog_manager = builder.build();
566
567        let table_metadata_manager =
568            Self::create_table_metadata_manager(kv_backend.clone()).await?;
569
570        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
571        let flownode_options = FlownodeOptions {
572            flow: opts.flow.clone(),
573            ..Default::default()
574        };
575
576        // for standalone not use grpc, but get a handler to frontend grpc client without
577        // actually make a connection
578        let (frontend_client, frontend_instance_handler) =
579            FrontendClient::from_empty_grpc_handler(opts.query.clone());
580        let frontend_client = Arc::new(frontend_client);
581        let flow_builder = FlownodeBuilder::new(
582            flownode_options,
583            plugins.clone(),
584            table_metadata_manager.clone(),
585            catalog_manager.clone(),
586            flow_metadata_manager.clone(),
587            frontend_client.clone(),
588        );
589        let flownode = flow_builder
590            .build()
591            .await
592            .map_err(BoxedError::new)
593            .context(error::OtherSnafu)?;
594
595        // set the ref to query for the local flow state
596        {
597            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
598            information_extension
599                .set_flow_streaming_engine(flow_streaming_engine)
600                .await;
601        }
602
603        let node_manager = Arc::new(StandaloneDatanodeManager {
604            region_server: datanode.region_server(),
605            flow_server: flownode.flow_engine(),
606        });
607
608        let table_id_sequence = Arc::new(
609            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
610                .initial(MIN_USER_TABLE_ID as u64)
611                .step(10)
612                .build(),
613        );
614        let flow_id_sequence = Arc::new(
615            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
616                .initial(MIN_USER_FLOW_ID as u64)
617                .step(10)
618                .build(),
619        );
620        let kafka_options = opts.wal.clone().into();
621        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
622            .await
623            .context(error::BuildWalOptionsAllocatorSnafu)?;
624        let wal_options_allocator = Arc::new(wal_options_allocator);
625        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
626            table_id_sequence,
627            wal_options_allocator.clone(),
628        ));
629        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
630            flow_id_sequence,
631        ));
632
633        let ddl_context = DdlContext {
634            node_manager: node_manager.clone(),
635            cache_invalidator: layered_cache_registry.clone(),
636            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
637            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
638            table_metadata_manager: table_metadata_manager.clone(),
639            table_metadata_allocator: table_metadata_allocator.clone(),
640            flow_metadata_manager: flow_metadata_manager.clone(),
641            flow_metadata_allocator: flow_metadata_allocator.clone(),
642            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
643        };
644
645        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
646            .context(error::InitDdlManagerSnafu)?;
647        #[cfg(feature = "enterprise")]
648        let ddl_manager = {
649            let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
650                plugins.get();
651            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
652        };
653
654        let procedure_executor = Arc::new(LocalProcedureExecutor::new(
655            Arc::new(ddl_manager),
656            procedure_manager.clone(),
657        ));
658
659        let fe_instance = FrontendBuilder::new(
660            fe_opts.clone(),
661            kv_backend.clone(),
662            layered_cache_registry.clone(),
663            catalog_manager.clone(),
664            node_manager.clone(),
665            procedure_executor.clone(),
666            process_manager,
667        )
668        .with_plugin(plugins.clone())
669        .try_build()
670        .await
671        .context(error::StartFrontendSnafu)?;
672        let fe_instance = Arc::new(fe_instance);
673
674        // set the frontend client for flownode
675        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
676        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
677        frontend_instance_handler
678            .lock()
679            .unwrap()
680            .replace(weak_grpc_handler);
681
682        // set the frontend invoker for flownode
683        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
684        // flow server need to be able to use frontend to write insert requests back
685        let invoker = FrontendInvoker::build_from(
686            flow_streaming_engine.clone(),
687            catalog_manager.clone(),
688            kv_backend.clone(),
689            layered_cache_registry.clone(),
690            procedure_executor,
691            node_manager,
692        )
693        .await
694        .context(StartFlownodeSnafu)?;
695        flow_streaming_engine.set_frontend_invoker(invoker).await;
696
697        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
698            .context(error::ServersSnafu)?;
699
700        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
701            .build()
702            .context(error::StartFrontendSnafu)?;
703
704        let frontend = Frontend {
705            instance: fe_instance,
706            servers,
707            heartbeat_task: None,
708            export_metrics_task,
709        };
710
711        #[cfg(feature = "enterprise")]
712        let components = Components {
713            plugins,
714            kv_backend,
715            frontend_client,
716            catalog_manager,
717        };
718
719        Ok(Instance {
720            datanode,
721            frontend,
722            flownode,
723            procedure_manager,
724            wal_options_allocator,
725            #[cfg(feature = "enterprise")]
726            components,
727            _guard: guard,
728        })
729    }
730
731    pub async fn create_table_metadata_manager(
732        kv_backend: KvBackendRef,
733    ) -> Result<TableMetadataManagerRef> {
734        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
735
736        table_metadata_manager
737            .init()
738            .await
739            .context(error::InitMetadataSnafu)?;
740
741        Ok(table_metadata_manager)
742    }
743}
744
745pub struct StandaloneInformationExtension {
746    region_server: RegionServer,
747    procedure_manager: ProcedureManagerRef,
748    start_time_ms: u64,
749    flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
750}
751
752impl StandaloneInformationExtension {
753    pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
754        Self {
755            region_server,
756            procedure_manager,
757            start_time_ms: common_time::util::current_time_millis() as u64,
758            flow_streaming_engine: RwLock::new(None),
759        }
760    }
761
762    /// Set the flow streaming engine for the standalone instance.
763    pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
764        let mut guard = self.flow_streaming_engine.write().await;
765        *guard = Some(flow_streaming_engine);
766    }
767}
768
769#[async_trait::async_trait]
770impl InformationExtension for StandaloneInformationExtension {
771    type Error = catalog::error::Error;
772
773    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
774        let build_info = common_version::build_info();
775        let node_info = NodeInfo {
776            // For the standalone:
777            // - id always 0
778            // - empty string for peer_addr
779            peer: Peer {
780                id: 0,
781                addr: "".to_string(),
782            },
783            last_activity_ts: -1,
784            status: NodeStatus::Standalone,
785            version: build_info.version.to_string(),
786            git_commit: build_info.commit_short.to_string(),
787            // Use `self.start_time_ms` instead.
788            // It's not precise but enough.
789            start_time_ms: self.start_time_ms,
790        };
791        Ok(vec![node_info])
792    }
793
794    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
795        self.procedure_manager
796            .list_procedures()
797            .await
798            .map_err(BoxedError::new)
799            .map(|procedures| {
800                procedures
801                    .into_iter()
802                    .map(|procedure| {
803                        let status = procedure.state.as_str_name().to_string();
804                        (status, procedure)
805                    })
806                    .collect::<Vec<_>>()
807            })
808            .context(catalog::error::ListProceduresSnafu)
809    }
810
811    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
812        let stats = self
813            .region_server
814            .reportable_regions()
815            .into_iter()
816            .map(|stat| {
817                let region_stat = self
818                    .region_server
819                    .region_statistic(stat.region_id)
820                    .unwrap_or_default();
821                RegionStat {
822                    id: stat.region_id,
823                    rcus: 0,
824                    wcus: 0,
825                    approximate_bytes: region_stat.estimated_disk_size(),
826                    engine: stat.engine,
827                    role: RegionRole::from(stat.role).into(),
828                    num_rows: region_stat.num_rows,
829                    memtable_size: region_stat.memtable_size,
830                    manifest_size: region_stat.manifest_size,
831                    sst_size: region_stat.sst_size,
832                    sst_num: region_stat.sst_num,
833                    index_size: region_stat.index_size,
834                    region_manifest: region_stat.manifest.into(),
835                    data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
836                    metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
837                    written_bytes: region_stat.written_bytes,
838                }
839            })
840            .collect::<Vec<_>>();
841        Ok(stats)
842    }
843
844    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
845        Ok(Some(
846            self.flow_streaming_engine
847                .read()
848                .await
849                .as_ref()
850                .unwrap()
851                .gen_state_report()
852                .await,
853        ))
854    }
855}
856
857#[cfg(test)]
858mod tests {
859    use std::default::Default;
860    use std::io::Write;
861    use std::time::Duration;
862
863    use auth::{Identity, Password, UserProviderRef};
864    use common_base::readable_size::ReadableSize;
865    use common_config::ENV_VAR_SEP;
866    use common_test_util::temp_dir::create_named_temp_file;
867    use common_wal::config::DatanodeWalConfig;
868    use object_store::config::{FileConfig, GcsConfig};
869
870    use super::*;
871    use crate::options::GlobalOptions;
872
873    #[tokio::test]
874    async fn test_try_from_start_command_to_anymap() {
875        let fe_opts = FrontendOptions {
876            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
877            ..Default::default()
878        };
879
880        let mut plugins = Plugins::new();
881        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
882            .await
883            .unwrap();
884
885        let provider = plugins.get::<UserProviderRef>().unwrap();
886        let result = provider
887            .authenticate(
888                Identity::UserId("test", None),
889                Password::PlainText("test".to_string().into()),
890            )
891            .await;
892        let _ = result.unwrap();
893    }
894
895    #[test]
896    fn test_toml() {
897        let opts = StandaloneOptions::default();
898        let toml_string = toml::to_string(&opts).unwrap();
899        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
900    }
901
902    #[test]
903    fn test_read_from_config_file() {
904        let mut file = create_named_temp_file();
905        let toml_str = r#"
906            enable_memory_catalog = true
907
908            [wal]
909            provider = "raft_engine"
910            dir = "./greptimedb_data/test/wal"
911            file_size = "1GB"
912            purge_threshold = "50GB"
913            purge_interval = "10m"
914            read_batch_size = 128
915            sync_write = false
916
917            [storage]
918            data_home = "./greptimedb_data/"
919            type = "File"
920
921            [[storage.providers]]
922            type = "Gcs"
923            bucket = "foo"
924            endpoint = "bar"
925
926            [[storage.providers]]
927            type = "S3"
928            access_key_id = "access_key_id"
929            secret_access_key = "secret_access_key"
930
931            [storage.compaction]
932            max_inflight_tasks = 3
933            max_files_in_level0 = 7
934            max_purge_tasks = 32
935
936            [storage.manifest]
937            checkpoint_margin = 9
938            gc_duration = '7s'
939
940            [http]
941            addr = "127.0.0.1:4000"
942            timeout = "33s"
943            body_limit = "128MB"
944
945            [opentsdb]
946            enable = true
947
948            [logging]
949            level = "debug"
950            dir = "./greptimedb_data/test/logs"
951        "#;
952        write!(file, "{}", toml_str).unwrap();
953        let cmd = StartCommand {
954            config_file: Some(file.path().to_str().unwrap().to_string()),
955            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
956            ..Default::default()
957        };
958
959        let options = cmd
960            .load_options(&GlobalOptions::default())
961            .unwrap()
962            .component;
963        let fe_opts = options.frontend_options();
964        let dn_opts = options.datanode_options();
965        let logging_opts = options.logging;
966        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
967        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
968        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
969        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
970        assert!(fe_opts.mysql.enable);
971        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
972        assert_eq!(2, fe_opts.mysql.runtime_size);
973        assert_eq!(None, fe_opts.mysql.reject_no_database);
974        assert!(fe_opts.influxdb.enable);
975        assert!(fe_opts.opentsdb.enable);
976
977        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
978            unreachable!()
979        };
980        assert_eq!(
981            "./greptimedb_data/test/wal",
982            raft_engine_config.dir.unwrap()
983        );
984
985        assert!(matches!(
986            &dn_opts.storage.store,
987            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
988        ));
989        assert_eq!(dn_opts.storage.providers.len(), 2);
990        assert!(matches!(
991            dn_opts.storage.providers[0],
992            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
993        ));
994        match &dn_opts.storage.providers[1] {
995            object_store::config::ObjectStoreConfig::S3(s3_config) => {
996                assert_eq!(
997                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
998                    format!("{:?}", s3_config.access_key_id)
999                );
1000            }
1001            _ => {
1002                unreachable!()
1003            }
1004        }
1005
1006        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
1007        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
1008    }
1009
1010    #[test]
1011    fn test_load_log_options_from_cli() {
1012        let cmd = StartCommand {
1013            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
1014            ..Default::default()
1015        };
1016
1017        let opts = cmd
1018            .load_options(&GlobalOptions {
1019                log_dir: Some("./greptimedb_data/test/logs".to_string()),
1020                log_level: Some("debug".to_string()),
1021
1022                #[cfg(feature = "tokio-console")]
1023                tokio_console_addr: None,
1024            })
1025            .unwrap()
1026            .component;
1027
1028        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
1029        assert_eq!("debug", opts.logging.level.unwrap());
1030    }
1031
1032    #[test]
1033    fn test_config_precedence_order() {
1034        let mut file = create_named_temp_file();
1035        let toml_str = r#"
1036            [http]
1037            addr = "127.0.0.1:4000"
1038
1039            [logging]
1040            level = "debug"
1041        "#;
1042        write!(file, "{}", toml_str).unwrap();
1043
1044        let env_prefix = "STANDALONE_UT";
1045        temp_env::with_vars(
1046            [
1047                (
1048                    // logging.dir = /other/log/dir
1049                    [
1050                        env_prefix.to_string(),
1051                        "logging".to_uppercase(),
1052                        "dir".to_uppercase(),
1053                    ]
1054                    .join(ENV_VAR_SEP),
1055                    Some("/other/log/dir"),
1056                ),
1057                (
1058                    // logging.level = info
1059                    [
1060                        env_prefix.to_string(),
1061                        "logging".to_uppercase(),
1062                        "level".to_uppercase(),
1063                    ]
1064                    .join(ENV_VAR_SEP),
1065                    Some("info"),
1066                ),
1067                (
1068                    // http.addr = 127.0.0.1:24000
1069                    [
1070                        env_prefix.to_string(),
1071                        "http".to_uppercase(),
1072                        "addr".to_uppercase(),
1073                    ]
1074                    .join(ENV_VAR_SEP),
1075                    Some("127.0.0.1:24000"),
1076                ),
1077            ],
1078            || {
1079                let command = StartCommand {
1080                    config_file: Some(file.path().to_str().unwrap().to_string()),
1081                    http_addr: Some("127.0.0.1:14000".to_string()),
1082                    env_prefix: env_prefix.to_string(),
1083                    ..Default::default()
1084                };
1085
1086                let opts = command.load_options(&Default::default()).unwrap().component;
1087
1088                // Should be read from env, env > default values.
1089                assert_eq!(opts.logging.dir, "/other/log/dir");
1090
1091                // Should be read from config file, config file > env > default values.
1092                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1093
1094                // Should be read from cli, cli > config file > env > default values.
1095                let fe_opts = opts.frontend_options();
1096                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1097                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1098
1099                // Should be default value.
1100                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1101            },
1102        );
1103    }
1104
1105    #[test]
1106    fn test_load_default_standalone_options() {
1107        let options =
1108            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1109        let default_options = StandaloneOptions::default();
1110        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1111        assert_eq!(options.http, default_options.http);
1112        assert_eq!(options.grpc, default_options.grpc);
1113        assert_eq!(options.mysql, default_options.mysql);
1114        assert_eq!(options.postgres, default_options.postgres);
1115        assert_eq!(options.opentsdb, default_options.opentsdb);
1116        assert_eq!(options.influxdb, default_options.influxdb);
1117        assert_eq!(options.prom_store, default_options.prom_store);
1118        assert_eq!(options.wal, default_options.wal);
1119        assert_eq!(options.metadata_store, default_options.metadata_store);
1120        assert_eq!(options.procedure, default_options.procedure);
1121        assert_eq!(options.logging, default_options.logging);
1122        assert_eq!(options.region_engine, default_options.region_engine);
1123    }
1124}