cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtension;
23use catalog::kvbackend::KvBackendCatalogManager;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use client::api::v1::meta::RegionRole;
27use common_base::readable_size::ReadableSize;
28use common_base::Plugins;
29use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
30use common_config::{metadata_store_dir, Configurable, KvBackendConfig};
31use common_error::ext::BoxedError;
32use common_meta::cache::LayeredCacheRegistryBuilder;
33use common_meta::cluster::{NodeInfo, NodeStatus};
34use common_meta::datanode::RegionStat;
35use common_meta::ddl::flow_meta::FlowMetadataAllocator;
36use common_meta::ddl::table_meta::TableMetadataAllocator;
37use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl, ProcedureExecutorRef};
38use common_meta::ddl_manager::DdlManager;
39use common_meta::key::flow::flow_state::FlowStat;
40use common_meta::key::flow::FlowMetadataManager;
41use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
42use common_meta::kv_backend::KvBackendRef;
43use common_meta::peer::Peer;
44use common_meta::region_keeper::MemoryRegionKeeper;
45use common_meta::region_registry::LeaderRegionRegistry;
46use common_meta::sequence::SequenceBuilder;
47use common_meta::wal_options_allocator::{build_wal_options_allocator, WalOptionsAllocatorRef};
48use common_procedure::{ProcedureInfo, ProcedureManagerRef};
49use common_telemetry::info;
50use common_telemetry::logging::{
51    LoggingOptions, SlowQueryOptions, TracingOptions, DEFAULT_LOGGING_DIR,
52};
53use common_time::timezone::set_default_timezone;
54use common_version::{short_version, version};
55use common_wal::config::DatanodeWalConfig;
56use datanode::config::{DatanodeOptions, ProcedureConfig, RegionEngineConfig, StorageConfig};
57use datanode::datanode::{Datanode, DatanodeBuilder};
58use datanode::region_server::RegionServer;
59use file_engine::config::EngineConfig as FileEngineConfig;
60use flow::{
61    FlowConfig, FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient,
62    FrontendInvoker, GrpcQueryHandlerWithBoxedError, StreamingEngine,
63};
64use frontend::frontend::{Frontend, FrontendOptions};
65use frontend::instance::builder::FrontendBuilder;
66use frontend::instance::{Instance as FeInstance, StandaloneDatanodeManager};
67use frontend::server::Services;
68use frontend::service_config::{
69    InfluxdbOptions, JaegerOptions, MysqlOptions, OpentsdbOptions, PostgresOptions,
70    PromStoreOptions,
71};
72use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
73use mito2::config::MitoConfig;
74use query::options::QueryOptions;
75use serde::{Deserialize, Serialize};
76use servers::export_metrics::{ExportMetricsOption, ExportMetricsTask};
77use servers::grpc::GrpcOptions;
78use servers::http::HttpOptions;
79use servers::tls::{TlsMode, TlsOption};
80use snafu::ResultExt;
81use tokio::sync::RwLock;
82use tracing_appender::non_blocking::WorkerGuard;
83
84use crate::error::{Result, StartFlownodeSnafu};
85use crate::options::{GlobalOptions, GreptimeOptions};
86use crate::{create_resource_limit_metrics, error, log_versions, App};
87
88pub const APP_NAME: &str = "greptime-standalone";
89
90#[derive(Parser)]
91pub struct Command {
92    #[clap(subcommand)]
93    subcmd: SubCommand,
94}
95
96impl Command {
97    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98        self.subcmd.build(opts).await
99    }
100
101    pub fn load_options(
102        &self,
103        global_options: &GlobalOptions,
104    ) -> Result<GreptimeOptions<StandaloneOptions>> {
105        self.subcmd.load_options(global_options)
106    }
107}
108
109#[derive(Parser)]
110enum SubCommand {
111    Start(StartCommand),
112}
113
114impl SubCommand {
115    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
116        match self {
117            SubCommand::Start(cmd) => cmd.build(opts).await,
118        }
119    }
120
121    fn load_options(
122        &self,
123        global_options: &GlobalOptions,
124    ) -> Result<GreptimeOptions<StandaloneOptions>> {
125        match self {
126            SubCommand::Start(cmd) => cmd.load_options(global_options),
127        }
128    }
129}
130
131#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
132#[serde(default)]
133pub struct StandaloneOptions {
134    pub enable_telemetry: bool,
135    pub default_timezone: Option<String>,
136    pub http: HttpOptions,
137    pub grpc: GrpcOptions,
138    pub mysql: MysqlOptions,
139    pub postgres: PostgresOptions,
140    pub opentsdb: OpentsdbOptions,
141    pub influxdb: InfluxdbOptions,
142    pub jaeger: JaegerOptions,
143    pub prom_store: PromStoreOptions,
144    pub wal: DatanodeWalConfig,
145    pub storage: StorageConfig,
146    pub metadata_store: KvBackendConfig,
147    pub procedure: ProcedureConfig,
148    pub flow: FlowConfig,
149    pub logging: LoggingOptions,
150    pub user_provider: Option<String>,
151    /// Options for different store engines.
152    pub region_engine: Vec<RegionEngineConfig>,
153    pub export_metrics: ExportMetricsOption,
154    pub tracing: TracingOptions,
155    pub init_regions_in_background: bool,
156    pub init_regions_parallelism: usize,
157    pub max_in_flight_write_bytes: Option<ReadableSize>,
158    pub slow_query: Option<SlowQueryOptions>,
159    pub query: QueryOptions,
160}
161
162impl Default for StandaloneOptions {
163    fn default() -> Self {
164        Self {
165            enable_telemetry: true,
166            default_timezone: None,
167            http: HttpOptions::default(),
168            grpc: GrpcOptions::default(),
169            mysql: MysqlOptions::default(),
170            postgres: PostgresOptions::default(),
171            opentsdb: OpentsdbOptions::default(),
172            influxdb: InfluxdbOptions::default(),
173            jaeger: JaegerOptions::default(),
174            prom_store: PromStoreOptions::default(),
175            wal: DatanodeWalConfig::default(),
176            storage: StorageConfig::default(),
177            metadata_store: KvBackendConfig::default(),
178            procedure: ProcedureConfig::default(),
179            flow: FlowConfig::default(),
180            logging: LoggingOptions::default(),
181            export_metrics: ExportMetricsOption::default(),
182            user_provider: None,
183            region_engine: vec![
184                RegionEngineConfig::Mito(MitoConfig::default()),
185                RegionEngineConfig::File(FileEngineConfig::default()),
186            ],
187            tracing: TracingOptions::default(),
188            init_regions_in_background: false,
189            init_regions_parallelism: 16,
190            max_in_flight_write_bytes: None,
191            slow_query: Some(SlowQueryOptions::default()),
192            query: QueryOptions::default(),
193        }
194    }
195}
196
197impl Configurable for StandaloneOptions {
198    fn env_list_keys() -> Option<&'static [&'static str]> {
199        Some(&["wal.broker_endpoints"])
200    }
201}
202
203/// The [`StandaloneOptions`] is only defined in cmd crate,
204/// we don't want to make `frontend` depends on it, so impl [`Into`]
205/// rather than [`From`].
206#[allow(clippy::from_over_into)]
207impl Into<FrontendOptions> for StandaloneOptions {
208    fn into(self) -> FrontendOptions {
209        self.frontend_options()
210    }
211}
212
213impl StandaloneOptions {
214    pub fn frontend_options(&self) -> FrontendOptions {
215        let cloned_opts = self.clone();
216        FrontendOptions {
217            default_timezone: cloned_opts.default_timezone,
218            http: cloned_opts.http,
219            grpc: cloned_opts.grpc,
220            mysql: cloned_opts.mysql,
221            postgres: cloned_opts.postgres,
222            opentsdb: cloned_opts.opentsdb,
223            influxdb: cloned_opts.influxdb,
224            jaeger: cloned_opts.jaeger,
225            prom_store: cloned_opts.prom_store,
226            meta_client: None,
227            logging: cloned_opts.logging,
228            user_provider: cloned_opts.user_provider,
229            // Handle the export metrics task run by standalone to frontend for execution
230            export_metrics: cloned_opts.export_metrics,
231            max_in_flight_write_bytes: cloned_opts.max_in_flight_write_bytes,
232            slow_query: cloned_opts.slow_query,
233            ..Default::default()
234        }
235    }
236
237    pub fn datanode_options(&self) -> DatanodeOptions {
238        let cloned_opts = self.clone();
239        DatanodeOptions {
240            node_id: Some(0),
241            enable_telemetry: cloned_opts.enable_telemetry,
242            wal: cloned_opts.wal,
243            storage: cloned_opts.storage,
244            region_engine: cloned_opts.region_engine,
245            grpc: cloned_opts.grpc,
246            init_regions_in_background: cloned_opts.init_regions_in_background,
247            init_regions_parallelism: cloned_opts.init_regions_parallelism,
248            query: cloned_opts.query,
249            ..Default::default()
250        }
251    }
252}
253
254pub struct Instance {
255    datanode: Datanode,
256    frontend: Frontend,
257    flownode: FlownodeInstance,
258    procedure_manager: ProcedureManagerRef,
259    wal_options_allocator: WalOptionsAllocatorRef,
260
261    // The components of standalone, which make it easier to expand based
262    // on the components.
263    #[cfg(feature = "enterprise")]
264    components: Components,
265
266    // Keep the logging guard to prevent the worker from being dropped.
267    _guard: Vec<WorkerGuard>,
268}
269
270#[cfg(feature = "enterprise")]
271pub struct Components {
272    pub plugins: Plugins,
273    pub kv_backend: KvBackendRef,
274    pub frontend_client: Arc<FrontendClient>,
275    pub catalog_manager: catalog::CatalogManagerRef,
276}
277
278impl Instance {
279    /// Find the socket addr of a server by its `name`.
280    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
281        self.frontend.server_handlers().addr(name)
282    }
283
284    #[cfg(feature = "enterprise")]
285    pub fn components(&self) -> &Components {
286        &self.components
287    }
288}
289
290#[async_trait]
291impl App for Instance {
292    fn name(&self) -> &str {
293        APP_NAME
294    }
295
296    async fn start(&mut self) -> Result<()> {
297        self.datanode.start_telemetry();
298
299        self.procedure_manager
300            .start()
301            .await
302            .context(error::StartProcedureManagerSnafu)?;
303
304        self.wal_options_allocator
305            .start()
306            .await
307            .context(error::StartWalOptionsAllocatorSnafu)?;
308
309        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
310            .await
311            .context(error::StartFrontendSnafu)?;
312
313        self.frontend
314            .start()
315            .await
316            .context(error::StartFrontendSnafu)?;
317
318        self.flownode.start().await.context(StartFlownodeSnafu)?;
319
320        Ok(())
321    }
322
323    async fn stop(&mut self) -> Result<()> {
324        self.frontend
325            .shutdown()
326            .await
327            .context(error::ShutdownFrontendSnafu)?;
328
329        self.procedure_manager
330            .stop()
331            .await
332            .context(error::StopProcedureManagerSnafu)?;
333
334        self.datanode
335            .shutdown()
336            .await
337            .context(error::ShutdownDatanodeSnafu)?;
338
339        self.flownode
340            .shutdown()
341            .await
342            .context(error::ShutdownFlownodeSnafu)?;
343
344        info!("Datanode instance stopped.");
345
346        Ok(())
347    }
348}
349
350#[derive(Debug, Default, Parser)]
351pub struct StartCommand {
352    #[clap(long)]
353    http_addr: Option<String>,
354    #[clap(long, alias = "rpc-addr")]
355    rpc_bind_addr: Option<String>,
356    #[clap(long)]
357    mysql_addr: Option<String>,
358    #[clap(long)]
359    postgres_addr: Option<String>,
360    #[clap(short, long)]
361    influxdb_enable: bool,
362    #[clap(short, long)]
363    pub config_file: Option<String>,
364    #[clap(long)]
365    tls_mode: Option<TlsMode>,
366    #[clap(long)]
367    tls_cert_path: Option<String>,
368    #[clap(long)]
369    tls_key_path: Option<String>,
370    #[clap(long)]
371    user_provider: Option<String>,
372    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
373    pub env_prefix: String,
374    /// The working home directory of this standalone instance.
375    #[clap(long)]
376    data_home: Option<String>,
377}
378
379impl StartCommand {
380    /// Load the GreptimeDB options from various sources (command line, config file or env).
381    pub fn load_options(
382        &self,
383        global_options: &GlobalOptions,
384    ) -> Result<GreptimeOptions<StandaloneOptions>> {
385        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
386            self.config_file.as_deref(),
387            self.env_prefix.as_ref(),
388        )
389        .context(error::LoadLayeredConfigSnafu)?;
390
391        self.merge_with_cli_options(global_options, &mut opts.component)?;
392
393        Ok(opts)
394    }
395
396    // The precedence order is: cli > config file > environment variables > default values.
397    pub fn merge_with_cli_options(
398        &self,
399        global_options: &GlobalOptions,
400        opts: &mut StandaloneOptions,
401    ) -> Result<()> {
402        if let Some(dir) = &global_options.log_dir {
403            opts.logging.dir.clone_from(dir);
404        }
405
406        if global_options.log_level.is_some() {
407            opts.logging.level.clone_from(&global_options.log_level);
408        }
409
410        opts.tracing = TracingOptions {
411            #[cfg(feature = "tokio-console")]
412            tokio_console_addr: global_options.tokio_console_addr.clone(),
413        };
414
415        let tls_opts = TlsOption::new(
416            self.tls_mode.clone(),
417            self.tls_cert_path.clone(),
418            self.tls_key_path.clone(),
419        );
420
421        if let Some(addr) = &self.http_addr {
422            opts.http.addr.clone_from(addr);
423        }
424
425        if let Some(data_home) = &self.data_home {
426            opts.storage.data_home.clone_from(data_home);
427        }
428
429        // If the logging dir is not set, use the default logs dir in the data home.
430        if opts.logging.dir.is_empty() {
431            opts.logging.dir = Path::new(&opts.storage.data_home)
432                .join(DEFAULT_LOGGING_DIR)
433                .to_string_lossy()
434                .to_string();
435        }
436
437        if let Some(addr) = &self.rpc_bind_addr {
438            // frontend grpc addr conflict with datanode default grpc addr
439            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
440            if addr.eq(&datanode_grpc_addr) {
441                return error::IllegalConfigSnafu {
442                    msg: format!(
443                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
444                    ),
445                }.fail();
446            }
447            opts.grpc.bind_addr.clone_from(addr)
448        }
449
450        if let Some(addr) = &self.mysql_addr {
451            opts.mysql.enable = true;
452            opts.mysql.addr.clone_from(addr);
453            opts.mysql.tls = tls_opts.clone();
454        }
455
456        if let Some(addr) = &self.postgres_addr {
457            opts.postgres.enable = true;
458            opts.postgres.addr.clone_from(addr);
459            opts.postgres.tls = tls_opts;
460        }
461
462        if self.influxdb_enable {
463            opts.influxdb.enable = self.influxdb_enable;
464        }
465
466        if let Some(user_provider) = &self.user_provider {
467            opts.user_provider = Some(user_provider.clone());
468        }
469
470        Ok(())
471    }
472
473    #[allow(unreachable_code)]
474    #[allow(unused_variables)]
475    #[allow(clippy::diverging_sub_expression)]
476    /// Build GreptimeDB instance with the loaded options.
477    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
478        common_runtime::init_global_runtimes(&opts.runtime);
479
480        let guard = common_telemetry::init_global_logging(
481            APP_NAME,
482            &opts.component.logging,
483            &opts.component.tracing,
484            None,
485            opts.component.slow_query.as_ref(),
486        );
487
488        log_versions(version(), short_version(), APP_NAME);
489        create_resource_limit_metrics(APP_NAME);
490
491        info!("Standalone start command: {:#?}", self);
492        info!("Standalone options: {opts:#?}");
493
494        let mut plugins = Plugins::new();
495        let plugin_opts = opts.plugins;
496        let mut opts = opts.component;
497        opts.grpc.detect_server_addr();
498        let fe_opts = opts.frontend_options();
499        let dn_opts = opts.datanode_options();
500
501        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
502            .await
503            .context(error::StartFrontendSnafu)?;
504
505        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
506            .await
507            .context(error::StartDatanodeSnafu)?;
508
509        set_default_timezone(fe_opts.default_timezone.as_deref())
510            .context(error::InitTimezoneSnafu)?;
511
512        let data_home = &dn_opts.storage.data_home;
513        // Ensure the data_home directory exists.
514        fs::create_dir_all(path::Path::new(data_home))
515            .context(error::CreateDirSnafu { dir: data_home })?;
516
517        let metadata_dir = metadata_store_dir(data_home);
518        let (kv_backend, procedure_manager) = FeInstance::try_build_standalone_components(
519            metadata_dir,
520            opts.metadata_store,
521            opts.procedure,
522        )
523        .await
524        .context(error::StartFrontendSnafu)?;
525
526        // Builds cache registry
527        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
528        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
529        let layered_cache_registry = Arc::new(
530            with_default_composite_cache_registry(
531                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
532            )
533            .context(error::BuildCacheRegistrySnafu)?
534            .build(),
535        );
536
537        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
538        builder.with_cache_registry(layered_cache_registry.clone());
539        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
540
541        let information_extension = Arc::new(StandaloneInformationExtension::new(
542            datanode.region_server(),
543            procedure_manager.clone(),
544        ));
545
546        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
547        let catalog_manager = KvBackendCatalogManager::new(
548            information_extension.clone(),
549            kv_backend.clone(),
550            layered_cache_registry.clone(),
551            Some(procedure_manager.clone()),
552            Some(process_manager.clone()),
553        );
554
555        let table_metadata_manager =
556            Self::create_table_metadata_manager(kv_backend.clone()).await?;
557
558        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
559        let flownode_options = FlownodeOptions {
560            flow: opts.flow.clone(),
561            ..Default::default()
562        };
563
564        // for standalone not use grpc, but get a handler to frontend grpc client without
565        // actually make a connection
566        let (frontend_client, frontend_instance_handler) =
567            FrontendClient::from_empty_grpc_handler(opts.query.clone());
568        let frontend_client = Arc::new(frontend_client);
569        let flow_builder = FlownodeBuilder::new(
570            flownode_options,
571            plugins.clone(),
572            table_metadata_manager.clone(),
573            catalog_manager.clone(),
574            flow_metadata_manager.clone(),
575            frontend_client.clone(),
576        );
577        let flownode = flow_builder
578            .build()
579            .await
580            .map_err(BoxedError::new)
581            .context(error::OtherSnafu)?;
582
583        // set the ref to query for the local flow state
584        {
585            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
586            information_extension
587                .set_flow_streaming_engine(flow_streaming_engine)
588                .await;
589        }
590
591        let node_manager = Arc::new(StandaloneDatanodeManager {
592            region_server: datanode.region_server(),
593            flow_server: flownode.flow_engine(),
594        });
595
596        let table_id_sequence = Arc::new(
597            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
598                .initial(MIN_USER_TABLE_ID as u64)
599                .step(10)
600                .build(),
601        );
602        let flow_id_sequence = Arc::new(
603            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
604                .initial(MIN_USER_FLOW_ID as u64)
605                .step(10)
606                .build(),
607        );
608        let kafka_options = opts.wal.clone().into();
609        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
610            .await
611            .context(error::BuildWalOptionsAllocatorSnafu)?;
612        let wal_options_allocator = Arc::new(wal_options_allocator);
613        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
614            table_id_sequence,
615            wal_options_allocator.clone(),
616        ));
617        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
618            flow_id_sequence,
619        ));
620
621        let ddl_context = DdlContext {
622            node_manager: node_manager.clone(),
623            cache_invalidator: layered_cache_registry.clone(),
624            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
625            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
626            table_metadata_manager: table_metadata_manager.clone(),
627            table_metadata_allocator: table_metadata_allocator.clone(),
628            flow_metadata_manager: flow_metadata_manager.clone(),
629            flow_metadata_allocator: flow_metadata_allocator.clone(),
630            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
631        };
632        let procedure_manager_c = procedure_manager.clone();
633
634        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
635            .context(error::InitDdlManagerSnafu)?;
636        #[cfg(feature = "enterprise")]
637        let ddl_manager = {
638            let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
639                plugins.get();
640            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
641        };
642        let ddl_task_executor: ProcedureExecutorRef = Arc::new(ddl_manager);
643
644        let fe_instance = FrontendBuilder::new(
645            fe_opts.clone(),
646            kv_backend.clone(),
647            layered_cache_registry.clone(),
648            catalog_manager.clone(),
649            node_manager.clone(),
650            ddl_task_executor.clone(),
651            process_manager,
652        )
653        .with_plugin(plugins.clone())
654        .try_build()
655        .await
656        .context(error::StartFrontendSnafu)?;
657        let fe_instance = Arc::new(fe_instance);
658
659        // set the frontend client for flownode
660        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
661        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
662        frontend_instance_handler
663            .lock()
664            .unwrap()
665            .replace(weak_grpc_handler);
666
667        // set the frontend invoker for flownode
668        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
669        // flow server need to be able to use frontend to write insert requests back
670        let invoker = FrontendInvoker::build_from(
671            flow_streaming_engine.clone(),
672            catalog_manager.clone(),
673            kv_backend.clone(),
674            layered_cache_registry.clone(),
675            ddl_task_executor.clone(),
676            node_manager,
677        )
678        .await
679        .context(StartFlownodeSnafu)?;
680        flow_streaming_engine.set_frontend_invoker(invoker).await;
681
682        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
683            .context(error::ServersSnafu)?;
684
685        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
686            .build()
687            .context(error::StartFrontendSnafu)?;
688
689        let frontend = Frontend {
690            instance: fe_instance,
691            servers,
692            heartbeat_task: None,
693            export_metrics_task,
694        };
695
696        #[cfg(feature = "enterprise")]
697        let components = Components {
698            plugins,
699            kv_backend,
700            frontend_client,
701            catalog_manager,
702        };
703
704        Ok(Instance {
705            datanode,
706            frontend,
707            flownode,
708            procedure_manager,
709            wal_options_allocator,
710            #[cfg(feature = "enterprise")]
711            components,
712            _guard: guard,
713        })
714    }
715
716    pub async fn create_table_metadata_manager(
717        kv_backend: KvBackendRef,
718    ) -> Result<TableMetadataManagerRef> {
719        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
720
721        table_metadata_manager
722            .init()
723            .await
724            .context(error::InitMetadataSnafu)?;
725
726        Ok(table_metadata_manager)
727    }
728}
729
730pub struct StandaloneInformationExtension {
731    region_server: RegionServer,
732    procedure_manager: ProcedureManagerRef,
733    start_time_ms: u64,
734    flow_streaming_engine: RwLock<Option<Arc<StreamingEngine>>>,
735}
736
737impl StandaloneInformationExtension {
738    pub fn new(region_server: RegionServer, procedure_manager: ProcedureManagerRef) -> Self {
739        Self {
740            region_server,
741            procedure_manager,
742            start_time_ms: common_time::util::current_time_millis() as u64,
743            flow_streaming_engine: RwLock::new(None),
744        }
745    }
746
747    /// Set the flow streaming engine for the standalone instance.
748    pub async fn set_flow_streaming_engine(&self, flow_streaming_engine: Arc<StreamingEngine>) {
749        let mut guard = self.flow_streaming_engine.write().await;
750        *guard = Some(flow_streaming_engine);
751    }
752}
753
754#[async_trait::async_trait]
755impl InformationExtension for StandaloneInformationExtension {
756    type Error = catalog::error::Error;
757
758    async fn nodes(&self) -> std::result::Result<Vec<NodeInfo>, Self::Error> {
759        let build_info = common_version::build_info();
760        let node_info = NodeInfo {
761            // For the standalone:
762            // - id always 0
763            // - empty string for peer_addr
764            peer: Peer {
765                id: 0,
766                addr: "".to_string(),
767            },
768            last_activity_ts: -1,
769            status: NodeStatus::Standalone,
770            version: build_info.version.to_string(),
771            git_commit: build_info.commit_short.to_string(),
772            // Use `self.start_time_ms` instead.
773            // It's not precise but enough.
774            start_time_ms: self.start_time_ms,
775        };
776        Ok(vec![node_info])
777    }
778
779    async fn procedures(&self) -> std::result::Result<Vec<(String, ProcedureInfo)>, Self::Error> {
780        self.procedure_manager
781            .list_procedures()
782            .await
783            .map_err(BoxedError::new)
784            .map(|procedures| {
785                procedures
786                    .into_iter()
787                    .map(|procedure| {
788                        let status = procedure.state.as_str_name().to_string();
789                        (status, procedure)
790                    })
791                    .collect::<Vec<_>>()
792            })
793            .context(catalog::error::ListProceduresSnafu)
794    }
795
796    async fn region_stats(&self) -> std::result::Result<Vec<RegionStat>, Self::Error> {
797        let stats = self
798            .region_server
799            .reportable_regions()
800            .into_iter()
801            .map(|stat| {
802                let region_stat = self
803                    .region_server
804                    .region_statistic(stat.region_id)
805                    .unwrap_or_default();
806                RegionStat {
807                    id: stat.region_id,
808                    rcus: 0,
809                    wcus: 0,
810                    approximate_bytes: region_stat.estimated_disk_size(),
811                    engine: stat.engine,
812                    role: RegionRole::from(stat.role).into(),
813                    num_rows: region_stat.num_rows,
814                    memtable_size: region_stat.memtable_size,
815                    manifest_size: region_stat.manifest_size,
816                    sst_size: region_stat.sst_size,
817                    index_size: region_stat.index_size,
818                    region_manifest: region_stat.manifest.into(),
819                    data_topic_latest_entry_id: region_stat.data_topic_latest_entry_id,
820                    metadata_topic_latest_entry_id: region_stat.metadata_topic_latest_entry_id,
821                }
822            })
823            .collect::<Vec<_>>();
824        Ok(stats)
825    }
826
827    async fn flow_stats(&self) -> std::result::Result<Option<FlowStat>, Self::Error> {
828        Ok(Some(
829            self.flow_streaming_engine
830                .read()
831                .await
832                .as_ref()
833                .unwrap()
834                .gen_state_report()
835                .await,
836        ))
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use std::default::Default;
843    use std::io::Write;
844    use std::time::Duration;
845
846    use auth::{Identity, Password, UserProviderRef};
847    use common_base::readable_size::ReadableSize;
848    use common_config::ENV_VAR_SEP;
849    use common_test_util::temp_dir::create_named_temp_file;
850    use common_wal::config::DatanodeWalConfig;
851    use object_store::config::{FileConfig, GcsConfig};
852
853    use super::*;
854    use crate::options::GlobalOptions;
855
856    #[tokio::test]
857    async fn test_try_from_start_command_to_anymap() {
858        let fe_opts = FrontendOptions {
859            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
860            ..Default::default()
861        };
862
863        let mut plugins = Plugins::new();
864        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
865            .await
866            .unwrap();
867
868        let provider = plugins.get::<UserProviderRef>().unwrap();
869        let result = provider
870            .authenticate(
871                Identity::UserId("test", None),
872                Password::PlainText("test".to_string().into()),
873            )
874            .await;
875        let _ = result.unwrap();
876    }
877
878    #[test]
879    fn test_toml() {
880        let opts = StandaloneOptions::default();
881        let toml_string = toml::to_string(&opts).unwrap();
882        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
883    }
884
885    #[test]
886    fn test_read_from_config_file() {
887        let mut file = create_named_temp_file();
888        let toml_str = r#"
889            enable_memory_catalog = true
890
891            [wal]
892            provider = "raft_engine"
893            dir = "./greptimedb_data/test/wal"
894            file_size = "1GB"
895            purge_threshold = "50GB"
896            purge_interval = "10m"
897            read_batch_size = 128
898            sync_write = false
899
900            [storage]
901            data_home = "./greptimedb_data/"
902            type = "File"
903
904            [[storage.providers]]
905            type = "Gcs"
906            bucket = "foo"
907            endpoint = "bar"
908
909            [[storage.providers]]
910            type = "S3"
911            access_key_id = "access_key_id"
912            secret_access_key = "secret_access_key"
913
914            [storage.compaction]
915            max_inflight_tasks = 3
916            max_files_in_level0 = 7
917            max_purge_tasks = 32
918
919            [storage.manifest]
920            checkpoint_margin = 9
921            gc_duration = '7s'
922
923            [http]
924            addr = "127.0.0.1:4000"
925            timeout = "33s"
926            body_limit = "128MB"
927
928            [opentsdb]
929            enable = true
930
931            [logging]
932            level = "debug"
933            dir = "./greptimedb_data/test/logs"
934        "#;
935        write!(file, "{}", toml_str).unwrap();
936        let cmd = StartCommand {
937            config_file: Some(file.path().to_str().unwrap().to_string()),
938            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
939            ..Default::default()
940        };
941
942        let options = cmd
943            .load_options(&GlobalOptions::default())
944            .unwrap()
945            .component;
946        let fe_opts = options.frontend_options();
947        let dn_opts = options.datanode_options();
948        let logging_opts = options.logging;
949        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
950        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
951        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
952        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
953        assert!(fe_opts.mysql.enable);
954        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
955        assert_eq!(2, fe_opts.mysql.runtime_size);
956        assert_eq!(None, fe_opts.mysql.reject_no_database);
957        assert!(fe_opts.influxdb.enable);
958        assert!(fe_opts.opentsdb.enable);
959
960        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
961            unreachable!()
962        };
963        assert_eq!(
964            "./greptimedb_data/test/wal",
965            raft_engine_config.dir.unwrap()
966        );
967
968        assert!(matches!(
969            &dn_opts.storage.store,
970            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
971        ));
972        assert_eq!(dn_opts.storage.providers.len(), 2);
973        assert!(matches!(
974            dn_opts.storage.providers[0],
975            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
976        ));
977        match &dn_opts.storage.providers[1] {
978            object_store::config::ObjectStoreConfig::S3(s3_config) => {
979                assert_eq!(
980                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
981                    format!("{:?}", s3_config.access_key_id)
982                );
983            }
984            _ => {
985                unreachable!()
986            }
987        }
988
989        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
990        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
991    }
992
993    #[test]
994    fn test_load_log_options_from_cli() {
995        let cmd = StartCommand {
996            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
997            ..Default::default()
998        };
999
1000        let opts = cmd
1001            .load_options(&GlobalOptions {
1002                log_dir: Some("./greptimedb_data/test/logs".to_string()),
1003                log_level: Some("debug".to_string()),
1004
1005                #[cfg(feature = "tokio-console")]
1006                tokio_console_addr: None,
1007            })
1008            .unwrap()
1009            .component;
1010
1011        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
1012        assert_eq!("debug", opts.logging.level.unwrap());
1013    }
1014
1015    #[test]
1016    fn test_config_precedence_order() {
1017        let mut file = create_named_temp_file();
1018        let toml_str = r#"
1019            [http]
1020            addr = "127.0.0.1:4000"
1021
1022            [logging]
1023            level = "debug"
1024        "#;
1025        write!(file, "{}", toml_str).unwrap();
1026
1027        let env_prefix = "STANDALONE_UT";
1028        temp_env::with_vars(
1029            [
1030                (
1031                    // logging.dir = /other/log/dir
1032                    [
1033                        env_prefix.to_string(),
1034                        "logging".to_uppercase(),
1035                        "dir".to_uppercase(),
1036                    ]
1037                    .join(ENV_VAR_SEP),
1038                    Some("/other/log/dir"),
1039                ),
1040                (
1041                    // logging.level = info
1042                    [
1043                        env_prefix.to_string(),
1044                        "logging".to_uppercase(),
1045                        "level".to_uppercase(),
1046                    ]
1047                    .join(ENV_VAR_SEP),
1048                    Some("info"),
1049                ),
1050                (
1051                    // http.addr = 127.0.0.1:24000
1052                    [
1053                        env_prefix.to_string(),
1054                        "http".to_uppercase(),
1055                        "addr".to_uppercase(),
1056                    ]
1057                    .join(ENV_VAR_SEP),
1058                    Some("127.0.0.1:24000"),
1059                ),
1060            ],
1061            || {
1062                let command = StartCommand {
1063                    config_file: Some(file.path().to_str().unwrap().to_string()),
1064                    http_addr: Some("127.0.0.1:14000".to_string()),
1065                    env_prefix: env_prefix.to_string(),
1066                    ..Default::default()
1067                };
1068
1069                let opts = command.load_options(&Default::default()).unwrap().component;
1070
1071                // Should be read from env, env > default values.
1072                assert_eq!(opts.logging.dir, "/other/log/dir");
1073
1074                // Should be read from config file, config file > env > default values.
1075                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
1076
1077                // Should be read from cli, cli > config file > env > default values.
1078                let fe_opts = opts.frontend_options();
1079                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
1080                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
1081
1082                // Should be default value.
1083                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1084            },
1085        );
1086    }
1087
1088    #[test]
1089    fn test_load_default_standalone_options() {
1090        let options =
1091            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1092        let default_options = StandaloneOptions::default();
1093        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1094        assert_eq!(options.http, default_options.http);
1095        assert_eq!(options.grpc, default_options.grpc);
1096        assert_eq!(options.mysql, default_options.mysql);
1097        assert_eq!(options.postgres, default_options.postgres);
1098        assert_eq!(options.opentsdb, default_options.opentsdb);
1099        assert_eq!(options.influxdb, default_options.influxdb);
1100        assert_eq!(options.prom_store, default_options.prom_store);
1101        assert_eq!(options.wal, default_options.wal);
1102        assert_eq!(options.metadata_store, default_options.metadata_store);
1103        assert_eq!(options.procedure, default_options.procedure);
1104        assert_eq!(options.logging, default_options.logging);
1105        assert_eq!(options.region_engine, default_options.region_engine);
1106    }
1107}