Skip to main content

cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::net::SocketAddr;
17use std::path::Path;
18use std::sync::Arc;
19use std::{fs, path};
20
21use async_trait::async_trait;
22use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
25use catalog::process_manager::ProcessManager;
26use clap::Parser;
27use common_base::Plugins;
28use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
29use common_config::{Configurable, metadata_store_dir};
30use common_error::ext::BoxedError;
31use common_meta::cache::LayeredCacheRegistryBuilder;
32use common_meta::ddl::flow_meta::FlowMetadataAllocator;
33use common_meta::ddl::table_meta::TableMetadataAllocator;
34use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerRef};
36use common_meta::key::flow::FlowMetadataManager;
37use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
38use common_meta::kv_backend::KvBackendRef;
39use common_meta::node_manager::{FlownodeRef, NodeManagerRef};
40use common_meta::procedure_executor::{LocalProcedureExecutor, ProcedureExecutorRef};
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::{Sequence, SequenceBuilder};
44use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
45use common_procedure::ProcedureManagerRef;
46use common_query::prelude::set_default_prefix;
47use common_telemetry::info;
48use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
49use common_time::timezone::set_default_timezone;
50use common_version::{short_version, verbose_version};
51use datanode::config::DatanodeOptions;
52use datanode::datanode::{Datanode, DatanodeBuilder};
53use datanode::region_server::RegionServer;
54use flow::{
55    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
56    GrpcQueryHandlerWithBoxedError,
57};
58use frontend::frontend::Frontend;
59use frontend::instance::StandaloneDatanodeManager;
60use frontend::instance::builder::FrontendBuilder;
61use frontend::server::Services;
62use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
63use plugins::PluginOptions;
64use plugins::frontend::context::{
65    CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
66};
67use plugins::standalone::context::DdlManagerConfigureContext;
68use servers::tls::{TlsMode, TlsOption, merge_tls_option};
69use snafu::ResultExt;
70use standalone::options::StandaloneOptions;
71use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
72use tracing_appender::non_blocking::WorkerGuard;
73
74use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
75use crate::options::{GlobalOptions, GreptimeOptions};
76use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
77
78pub const APP_NAME: &str = "greptime-standalone";
79
80#[derive(Parser)]
81pub struct Command {
82    #[clap(subcommand)]
83    subcmd: SubCommand,
84}
85
86impl Command {
87    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
88        self.subcmd.build(opts).await
89    }
90
91    pub fn load_options(
92        &self,
93        global_options: &GlobalOptions,
94    ) -> Result<GreptimeOptions<StandaloneOptions>> {
95        self.subcmd.load_options(global_options)
96    }
97}
98
99#[derive(Parser)]
100enum SubCommand {
101    Start(StartCommand),
102}
103
104impl SubCommand {
105    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
106        match self {
107            SubCommand::Start(cmd) => cmd.build(opts).await,
108        }
109    }
110
111    fn load_options(
112        &self,
113        global_options: &GlobalOptions,
114    ) -> Result<GreptimeOptions<StandaloneOptions>> {
115        match self {
116            SubCommand::Start(cmd) => cmd.load_options(global_options),
117        }
118    }
119}
120
121pub struct Instance {
122    datanode: Datanode,
123    frontend: Frontend,
124    flownode: FlownodeInstance,
125    procedure_manager: ProcedureManagerRef,
126    wal_provider: WalProviderRef,
127    // Keep the logging guard to prevent the worker from being dropped.
128    _guard: Vec<WorkerGuard>,
129}
130
131impl Instance {
132    /// Find the socket addr of a server by its `name`.
133    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
134        self.frontend.server_handlers().addr(name)
135    }
136
137    /// Get the mutable Frontend component of this Standalone instance for externally modification
138    /// by others (might not be in this code base, so don't delete this function).
139    pub fn mut_frontend(&mut self) -> &mut Frontend {
140        &mut self.frontend
141    }
142
143    /// Get the Datanode component of this Standalone instance for externally usage
144    /// by others (might not be in this code base, so don't delete this function).
145    pub fn datanode(&self) -> &Datanode {
146        &self.datanode
147    }
148}
149
150#[async_trait]
151impl App for Instance {
152    fn name(&self) -> &str {
153        APP_NAME
154    }
155
156    async fn start(&mut self) -> Result<()> {
157        self.datanode.start_telemetry();
158
159        self.procedure_manager
160            .start()
161            .await
162            .context(error::StartProcedureManagerSnafu)?;
163
164        self.wal_provider
165            .start()
166            .await
167            .context(error::StartWalProviderSnafu)?;
168
169        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
170            .await
171            .context(error::StartFrontendSnafu)?;
172
173        self.frontend
174            .start()
175            .await
176            .context(error::StartFrontendSnafu)?;
177
178        self.flownode.start().await.context(StartFlownodeSnafu)?;
179
180        Ok(())
181    }
182
183    async fn stop(&mut self) -> Result<()> {
184        self.frontend
185            .shutdown()
186            .await
187            .context(error::ShutdownFrontendSnafu)?;
188
189        self.procedure_manager
190            .stop()
191            .await
192            .context(error::StopProcedureManagerSnafu)?;
193
194        self.datanode
195            .shutdown()
196            .await
197            .context(error::ShutdownDatanodeSnafu)?;
198
199        self.flownode
200            .shutdown()
201            .await
202            .context(error::ShutdownFlownodeSnafu)?;
203
204        info!("Datanode instance stopped.");
205
206        Ok(())
207    }
208}
209
210#[derive(Debug, Default, Parser)]
211pub struct StartCommand {
212    #[clap(long)]
213    http_addr: Option<String>,
214    #[clap(long, alias = "rpc-addr")]
215    rpc_bind_addr: Option<String>,
216    #[clap(long)]
217    mysql_addr: Option<String>,
218    #[clap(long)]
219    postgres_addr: Option<String>,
220    #[clap(short, long)]
221    influxdb_enable: bool,
222    #[clap(short, long)]
223    pub config_file: Option<String>,
224    #[clap(long)]
225    tls_mode: Option<TlsMode>,
226    #[clap(long)]
227    tls_cert_path: Option<String>,
228    #[clap(long)]
229    tls_key_path: Option<String>,
230    #[clap(long)]
231    tls_watch: bool,
232    #[clap(long)]
233    user_provider: Option<String>,
234    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
235    pub env_prefix: String,
236    /// The working home directory of this standalone instance.
237    #[clap(long)]
238    data_home: Option<String>,
239}
240
241impl StartCommand {
242    /// Load the GreptimeDB options from various sources (command line, config file or env).
243    pub fn load_options(
244        &self,
245        global_options: &GlobalOptions,
246    ) -> Result<GreptimeOptions<StandaloneOptions>> {
247        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
248            self.config_file.as_deref(),
249            self.env_prefix.as_ref(),
250        )
251        .context(error::LoadLayeredConfigSnafu)?;
252
253        self.merge_with_cli_options(global_options, &mut opts.component)?;
254        opts.component.sanitize();
255
256        Ok(opts)
257    }
258
259    // The precedence order is: cli > config file > environment variables > default values.
260    pub fn merge_with_cli_options(
261        &self,
262        global_options: &GlobalOptions,
263        opts: &mut StandaloneOptions,
264    ) -> Result<()> {
265        if let Some(dir) = &global_options.log_dir {
266            opts.logging.dir.clone_from(dir);
267        }
268
269        if global_options.log_level.is_some() {
270            opts.logging.level.clone_from(&global_options.log_level);
271        }
272
273        opts.tracing = TracingOptions {
274            #[cfg(feature = "tokio-console")]
275            tokio_console_addr: global_options.tokio_console_addr.clone(),
276        };
277
278        let tls_opts = TlsOption::new(
279            self.tls_mode,
280            self.tls_cert_path.clone(),
281            self.tls_key_path.clone(),
282            self.tls_watch,
283        );
284
285        if let Some(addr) = &self.http_addr {
286            opts.http.addr.clone_from(addr);
287        }
288
289        if let Some(data_home) = &self.data_home {
290            opts.storage.data_home.clone_from(data_home);
291        }
292
293        // If the logging dir is not set, use the default logs dir in the data home.
294        if opts.logging.dir.is_empty() {
295            opts.logging.dir = Path::new(&opts.storage.data_home)
296                .join(DEFAULT_LOGGING_DIR)
297                .to_string_lossy()
298                .to_string();
299        }
300
301        if let Some(addr) = &self.rpc_bind_addr {
302            // frontend grpc addr conflict with datanode default grpc addr
303            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
304            if addr.eq(&datanode_grpc_addr) {
305                return error::IllegalConfigSnafu {
306                    msg: format!(
307                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
308                    ),
309                }.fail();
310            }
311            opts.grpc.bind_addr.clone_from(addr);
312            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
313        }
314
315        if let Some(addr) = &self.mysql_addr {
316            opts.mysql.enable = true;
317            opts.mysql.addr.clone_from(addr);
318            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
319        }
320
321        if let Some(addr) = &self.postgres_addr {
322            opts.postgres.enable = true;
323            opts.postgres.addr.clone_from(addr);
324            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
325        }
326
327        if self.influxdb_enable {
328            opts.influxdb.enable = self.influxdb_enable;
329        }
330
331        if let Some(user_provider) = &self.user_provider {
332            opts.user_provider = Some(user_provider.clone());
333        }
334
335        Ok(())
336    }
337
338    #[allow(unreachable_code)]
339    #[allow(unused_variables)]
340    #[allow(clippy::diverging_sub_expression)]
341    /// Build GreptimeDB instance with the loaded options.
342    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
343        common_runtime::init_global_runtimes(&opts.runtime);
344
345        let guard = common_telemetry::init_global_logging(
346            APP_NAME,
347            &opts.component.logging,
348            &opts.component.tracing,
349            None,
350            Some(&opts.component.slow_query),
351        );
352
353        log_versions(verbose_version(), short_version(), APP_NAME);
354        maybe_activate_heap_profile(&opts.component.memory);
355        create_resource_limit_metrics(APP_NAME);
356
357        info!("Standalone start command: {:#?}", self);
358        info!("Standalone options: {opts:#?}");
359
360        let (mut instance, _) =
361            Self::build_with(opts.component, opts.plugins, InstanceCreator::default()).await?;
362        instance._guard.extend(guard);
363        Ok(instance)
364    }
365
366    pub async fn build_with(
367        mut opts: StandaloneOptions,
368        plugin_opts: Vec<PluginOptions>,
369        creator: InstanceCreator,
370    ) -> Result<(Instance, InstanceCreatorResult)> {
371        let mut plugins = Plugins::new();
372        set_default_prefix(opts.default_column_prefix.as_deref())
373            .map_err(BoxedError::new)
374            .context(error::BuildCliSnafu)?;
375
376        opts.grpc.detect_server_addr();
377        let fe_opts = opts.frontend_options();
378        let dn_opts = opts.datanode_options();
379
380        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
381            .await
382            .context(error::StartFrontendSnafu)?;
383
384        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
385            .await
386            .context(error::StartDatanodeSnafu)?;
387
388        set_default_timezone(fe_opts.default_timezone.as_deref())
389            .context(error::InitTimezoneSnafu)?;
390
391        let data_home = &dn_opts.storage.data_home;
392        // Ensure the data_home directory exists.
393        fs::create_dir_all(path::Path::new(data_home))
394            .context(error::CreateDirSnafu { dir: data_home })?;
395
396        let metadata_dir = metadata_store_dir(data_home);
397        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
398            .context(error::BuildMetadataKvbackendSnafu)?;
399        let procedure_manager =
400            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
401
402        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
403            .await
404            .context(error::SetupStandalonePluginsSnafu)?;
405
406        // Builds cache registry
407        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
408        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
409        let layered_cache_registry = Arc::new(
410            with_default_composite_cache_registry(
411                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
412            )
413            .context(error::BuildCacheRegistrySnafu)?
414            .build(),
415        );
416
417        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
418        builder.with_cache_registry(layered_cache_registry.clone());
419        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
420
421        let information_extension = Arc::new(StandaloneInformationExtension::new(
422            datanode.region_server(),
423            procedure_manager.clone(),
424        ));
425
426        plugins.insert::<InformationExtensionRef>(information_extension.clone());
427
428        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
429
430        // for standalone not use grpc, but get a handler to frontend grpc client without
431        // actually make a connection
432        let (frontend_client, frontend_instance_handler) =
433            FrontendClient::from_empty_grpc_handler(opts.query.clone());
434        let frontend_client = Arc::new(frontend_client);
435
436        let builder = KvBackendCatalogManagerBuilder::new(
437            information_extension.clone(),
438            kv_backend.clone(),
439            layered_cache_registry.clone(),
440        )
441        .with_procedure_manager(procedure_manager.clone())
442        .with_process_manager(process_manager.clone());
443        let builder = if let Some(configurator) =
444            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
445        {
446            let ctx = StandaloneCatalogManagerConfigureContext {
447                fe_client: frontend_client.clone(),
448            };
449            let ctx = CatalogManagerConfigureContext::Standalone(ctx);
450            configurator
451                .configure(builder, ctx)
452                .await
453                .context(OtherSnafu)?
454        } else {
455            builder
456        };
457        let catalog_manager = builder.build();
458
459        let table_metadata_manager =
460            Self::create_table_metadata_manager(kv_backend.clone()).await?;
461
462        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
463        let flownode_options = FlownodeOptions {
464            flow: opts.flow.clone(),
465            ..Default::default()
466        };
467
468        let flow_builder = FlownodeBuilder::new(
469            flownode_options,
470            plugins.clone(),
471            table_metadata_manager.clone(),
472            catalog_manager.clone(),
473            flow_metadata_manager.clone(),
474            frontend_client.clone(),
475        );
476        let flownode = flow_builder
477            .build()
478            .await
479            .map_err(BoxedError::new)
480            .context(error::OtherSnafu)?;
481
482        // set the ref to query for the local flow state
483        {
484            information_extension
485                .set_flow_engine(flownode.flow_engine())
486                .await;
487        }
488
489        let node_manager = creator
490            .node_manager_creator
491            .create(
492                &kv_backend,
493                datanode.region_server(),
494                flownode.flow_engine(),
495            )
496            .await?;
497
498        let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend);
499        let flow_id_sequence = Arc::new(
500            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
501                .initial(MIN_USER_FLOW_ID as u64)
502                .step(10)
503                .build(),
504        );
505        let kafka_options = opts
506            .wal
507            .clone()
508            .try_into()
509            .context(error::InvalidWalProviderSnafu)?;
510        let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
511            .await
512            .context(error::BuildWalProviderSnafu)?;
513        let wal_provider = Arc::new(wal_provider);
514        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
515            table_id_allocator.clone(),
516            wal_provider.clone(),
517        ));
518        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
519            flow_id_sequence,
520        ));
521
522        let ddl_context = DdlContext {
523            node_manager: node_manager.clone(),
524            cache_invalidator: layered_cache_registry.clone(),
525            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
526            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
527            table_metadata_manager: table_metadata_manager.clone(),
528            table_metadata_allocator: table_metadata_allocator.clone(),
529            flow_metadata_manager: flow_metadata_manager.clone(),
530            flow_metadata_allocator: flow_metadata_allocator.clone(),
531            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
532        };
533
534        let ddl_manager = DdlManager::try_new(
535            ddl_context,
536            procedure_manager.clone(),
537            Arc::new(StandaloneRepartitionProcedureFactory),
538            true,
539        )
540        .context(error::InitDdlManagerSnafu)?;
541
542        let ddl_manager = if let Some(configurator) =
543            plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
544        {
545            let ctx = DdlManagerConfigureContext {
546                kv_backend: kv_backend.clone(),
547                fe_client: frontend_client.clone(),
548                catalog_manager: catalog_manager.clone(),
549            };
550            configurator
551                .configure(ddl_manager, ctx)
552                .await
553                .context(OtherSnafu)?
554        } else {
555            ddl_manager
556        };
557
558        let procedure_executor = creator
559            .procedure_executor_creator
560            .create(Arc::new(ddl_manager), procedure_manager.clone())
561            .await?;
562
563        let fe_instance = FrontendBuilder::new(
564            fe_opts.clone(),
565            kv_backend.clone(),
566            layered_cache_registry.clone(),
567            catalog_manager.clone(),
568            node_manager.clone(),
569            procedure_executor.clone(),
570            process_manager,
571        )
572        .with_plugin(plugins.clone())
573        .try_build()
574        .await
575        .context(error::StartFrontendSnafu)?;
576        let fe_instance = Arc::new(fe_instance);
577
578        // set the frontend client for flownode
579        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
580        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
581        frontend_instance_handler
582            .set_handler(weak_grpc_handler)
583            .await;
584
585        // set the frontend invoker for flownode
586        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
587        // flow server need to be able to use frontend to write insert requests back
588        let invoker = FrontendInvoker::build_from(
589            flow_streaming_engine.clone(),
590            catalog_manager.clone(),
591            kv_backend.clone(),
592            layered_cache_registry.clone(),
593            procedure_executor,
594            node_manager.clone(),
595        )
596        .await
597        .context(StartFlownodeSnafu)?;
598        flow_streaming_engine.set_frontend_invoker(invoker).await;
599
600        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
601            .build()
602            .context(error::StartFrontendSnafu)?;
603
604        let frontend = Frontend {
605            instance: fe_instance,
606            servers,
607            heartbeat_task: None,
608        };
609
610        let instance = Instance {
611            datanode,
612            frontend,
613            flownode,
614            procedure_manager,
615            wal_provider,
616            _guard: vec![],
617        };
618        let result = InstanceCreatorResult {
619            kv_backend,
620            node_manager,
621            table_id_allocator,
622        };
623        Ok((instance, result))
624    }
625
626    pub async fn create_table_metadata_manager(
627        kv_backend: KvBackendRef,
628    ) -> Result<TableMetadataManagerRef> {
629        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
630
631        table_metadata_manager
632            .init()
633            .await
634            .context(error::InitMetadataSnafu)?;
635
636        Ok(table_metadata_manager)
637    }
638}
639
640#[async_trait]
641pub trait NodeManagerCreator {
642    async fn create(
643        &self,
644        kv_backend: &KvBackendRef,
645        region_server: RegionServer,
646        flow_server: FlownodeRef,
647    ) -> Result<NodeManagerRef>;
648}
649
650pub struct DefaultNodeManagerCreator;
651
652#[async_trait]
653impl NodeManagerCreator for DefaultNodeManagerCreator {
654    async fn create(
655        &self,
656        _: &KvBackendRef,
657        region_server: RegionServer,
658        flow_server: FlownodeRef,
659    ) -> Result<NodeManagerRef> {
660        Ok(Arc::new(StandaloneDatanodeManager {
661            region_server,
662            flow_server,
663        }))
664    }
665}
666
667pub trait TableIdAllocatorCreator {
668    fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
669}
670
671struct DefaultTableIdAllocatorCreator;
672
673impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
674    fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence> {
675        Arc::new(
676            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
677                .initial(MIN_USER_TABLE_ID as u64)
678                .step(10)
679                .build(),
680        )
681    }
682}
683
684#[async_trait]
685pub trait ProcedureExecutorCreator {
686    async fn create(
687        &self,
688        ddl_manager: DdlManagerRef,
689        procedure_manager: ProcedureManagerRef,
690    ) -> Result<ProcedureExecutorRef>;
691}
692
693pub struct DefaultProcedureExecutorCreator;
694
695#[async_trait]
696impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
697    async fn create(
698        &self,
699        ddl_manager: DdlManagerRef,
700        procedure_manager: ProcedureManagerRef,
701    ) -> Result<ProcedureExecutorRef> {
702        Ok(Arc::new(LocalProcedureExecutor::new(
703            ddl_manager,
704            procedure_manager,
705        )))
706    }
707}
708
709/// `InstanceCreator` is used for grouping various component creators for building the
710/// Standalone instance, suitable for customizing how the instance can be built.
711pub struct InstanceCreator {
712    node_manager_creator: Box<dyn NodeManagerCreator>,
713    table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
714    procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
715}
716
717impl InstanceCreator {
718    pub fn new(
719        node_manager_creator: Box<dyn NodeManagerCreator>,
720        table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
721        procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
722    ) -> Self {
723        Self {
724            node_manager_creator,
725            table_id_allocator_creator,
726            procedure_executor_creator,
727        }
728    }
729}
730
731impl Default for InstanceCreator {
732    fn default() -> Self {
733        Self {
734            node_manager_creator: Box::new(DefaultNodeManagerCreator),
735            table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
736            procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
737        }
738    }
739}
740
741/// `InstanceCreatorResult` is expected to be used paired with [InstanceCreator].
742/// It stores the created and other important components for further reusing.
743pub struct InstanceCreatorResult {
744    pub kv_backend: KvBackendRef,
745    pub node_manager: NodeManagerRef,
746    pub table_id_allocator: Arc<Sequence>,
747}
748
749#[cfg(test)]
750mod tests {
751    use std::default::Default;
752    use std::io::Write;
753    use std::time::Duration;
754
755    use auth::{Identity, Password, UserProviderRef};
756    use common_base::readable_size::ReadableSize;
757    use common_config::ENV_VAR_SEP;
758    use common_test_util::temp_dir::create_named_temp_file;
759    use common_wal::config::DatanodeWalConfig;
760    use frontend::frontend::FrontendOptions;
761    use object_store::config::{FileConfig, GcsConfig};
762    use servers::grpc::GrpcOptions;
763
764    use super::*;
765    use crate::options::GlobalOptions;
766
767    #[tokio::test]
768    async fn test_try_from_start_command_to_anymap() {
769        let fe_opts = FrontendOptions {
770            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
771            ..Default::default()
772        };
773
774        let mut plugins = Plugins::new();
775        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
776            .await
777            .unwrap();
778
779        let provider = plugins.get::<UserProviderRef>().unwrap();
780        let result = provider
781            .authenticate(
782                Identity::UserId("test", None),
783                Password::PlainText("test".to_string().into()),
784            )
785            .await;
786        let _ = result.unwrap();
787    }
788
789    #[test]
790    fn test_toml() {
791        let opts = StandaloneOptions::default();
792        let toml_string = toml::to_string(&opts).unwrap();
793        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
794    }
795
796    #[test]
797    fn test_read_from_config_file() {
798        let mut file = create_named_temp_file();
799        let toml_str = r#"
800            enable_memory_catalog = true
801
802            [wal]
803            provider = "raft_engine"
804            dir = "./greptimedb_data/test/wal"
805            file_size = "1GB"
806            purge_threshold = "50GB"
807            purge_interval = "10m"
808            read_batch_size = 128
809            sync_write = false
810
811            [storage]
812            data_home = "./greptimedb_data/"
813            type = "File"
814
815            [[storage.providers]]
816            type = "Gcs"
817            bucket = "foo"
818            endpoint = "bar"
819
820            [[storage.providers]]
821            type = "S3"
822            access_key_id = "access_key_id"
823            secret_access_key = "secret_access_key"
824
825            [storage.compaction]
826            max_inflight_tasks = 3
827            max_files_in_level0 = 7
828            max_purge_tasks = 32
829
830            [storage.manifest]
831            checkpoint_margin = 9
832            gc_duration = '7s'
833
834            [http]
835            addr = "127.0.0.1:4000"
836            timeout = "33s"
837            body_limit = "128MB"
838
839            [opentsdb]
840            enable = true
841
842            [logging]
843            level = "debug"
844            dir = "./greptimedb_data/test/logs"
845        "#;
846        write!(file, "{}", toml_str).unwrap();
847        let cmd = StartCommand {
848            config_file: Some(file.path().to_str().unwrap().to_string()),
849            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
850            ..Default::default()
851        };
852
853        let options = cmd
854            .load_options(&GlobalOptions::default())
855            .unwrap()
856            .component;
857        let fe_opts = options.frontend_options();
858        let dn_opts = options.datanode_options();
859        let logging_opts = options.logging;
860        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
861        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
862        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
863        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
864        assert!(fe_opts.mysql.enable);
865        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
866        assert_eq!(2, fe_opts.mysql.runtime_size);
867        assert_eq!(None, fe_opts.mysql.reject_no_database);
868        assert!(fe_opts.influxdb.enable);
869        assert!(fe_opts.opentsdb.enable);
870
871        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
872            unreachable!()
873        };
874        assert_eq!(
875            "./greptimedb_data/test/wal",
876            raft_engine_config.dir.unwrap()
877        );
878
879        assert!(matches!(
880            &dn_opts.storage.store,
881            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
882        ));
883        assert_eq!(dn_opts.storage.providers.len(), 2);
884        assert!(matches!(
885            dn_opts.storage.providers[0],
886            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
887        ));
888        match &dn_opts.storage.providers[1] {
889            object_store::config::ObjectStoreConfig::S3(s3_config) => {
890                assert_eq!(
891                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
892                    format!("{:?}", s3_config.connection.access_key_id)
893                );
894            }
895            _ => {
896                unreachable!()
897            }
898        }
899
900        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
901        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
902    }
903
904    #[test]
905    fn test_load_log_options_from_cli() {
906        let cmd = StartCommand {
907            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
908            mysql_addr: Some("127.0.0.1:4002".to_string()),
909            postgres_addr: Some("127.0.0.1:4003".to_string()),
910            ..Default::default()
911        };
912
913        let opts = cmd
914            .load_options(&GlobalOptions {
915                log_dir: Some("./greptimedb_data/test/logs".to_string()),
916                log_level: Some("debug".to_string()),
917
918                #[cfg(feature = "tokio-console")]
919                tokio_console_addr: None,
920            })
921            .unwrap()
922            .component;
923
924        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
925        assert_eq!("debug", opts.logging.level.unwrap());
926    }
927
928    #[test]
929    fn test_config_precedence_order() {
930        let mut file = create_named_temp_file();
931        let toml_str = r#"
932            [http]
933            addr = "127.0.0.1:4000"
934
935            [logging]
936            level = "debug"
937        "#;
938        write!(file, "{}", toml_str).unwrap();
939
940        let env_prefix = "STANDALONE_UT";
941        temp_env::with_vars(
942            [
943                (
944                    // logging.dir = /other/log/dir
945                    [
946                        env_prefix.to_string(),
947                        "logging".to_uppercase(),
948                        "dir".to_uppercase(),
949                    ]
950                    .join(ENV_VAR_SEP),
951                    Some("/other/log/dir"),
952                ),
953                (
954                    // logging.level = info
955                    [
956                        env_prefix.to_string(),
957                        "logging".to_uppercase(),
958                        "level".to_uppercase(),
959                    ]
960                    .join(ENV_VAR_SEP),
961                    Some("info"),
962                ),
963                (
964                    // http.addr = 127.0.0.1:24000
965                    [
966                        env_prefix.to_string(),
967                        "http".to_uppercase(),
968                        "addr".to_uppercase(),
969                    ]
970                    .join(ENV_VAR_SEP),
971                    Some("127.0.0.1:24000"),
972                ),
973            ],
974            || {
975                let command = StartCommand {
976                    config_file: Some(file.path().to_str().unwrap().to_string()),
977                    http_addr: Some("127.0.0.1:14000".to_string()),
978                    env_prefix: env_prefix.to_string(),
979                    ..Default::default()
980                };
981
982                let opts = command.load_options(&Default::default()).unwrap().component;
983
984                // Should be read from env, env > default values.
985                assert_eq!(opts.logging.dir, "/other/log/dir");
986
987                // Should be read from config file, config file > env > default values.
988                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
989
990                // Should be read from cli, cli > config file > env > default values.
991                let fe_opts = opts.frontend_options();
992                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
993                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
994
995                // Should be default value.
996                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
997            },
998        );
999    }
1000
1001    #[test]
1002    fn test_load_default_standalone_options() {
1003        let options =
1004            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1005        let default_options = StandaloneOptions::default();
1006        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1007        assert_eq!(options.http, default_options.http);
1008        assert_eq!(options.grpc, default_options.grpc);
1009        assert_eq!(options.mysql, default_options.mysql);
1010        assert_eq!(options.postgres, default_options.postgres);
1011        assert_eq!(options.opentsdb, default_options.opentsdb);
1012        assert_eq!(options.influxdb, default_options.influxdb);
1013        assert_eq!(options.prom_store, default_options.prom_store);
1014        assert_eq!(options.wal, default_options.wal);
1015        assert_eq!(options.metadata_store, default_options.metadata_store);
1016        assert_eq!(options.procedure, default_options.procedure);
1017        assert_eq!(options.logging, default_options.logging);
1018        assert_eq!(options.region_engine, default_options.region_engine);
1019    }
1020
1021    #[test]
1022    fn test_cache_config() {
1023        let toml_str = r#"
1024            [storage]
1025            data_home = "test_data_home"
1026            type = "S3"
1027            [storage.cache_config]
1028            enable_read_cache = true
1029        "#;
1030        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
1031        opts.sanitize();
1032        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
1033        assert_eq!(
1034            opts.storage.store.cache_config().unwrap().cache_path,
1035            "test_data_home"
1036        );
1037    }
1038}