Skip to main content

cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::net::SocketAddr;
17use std::path::Path;
18use std::sync::Arc;
19use std::{fs, path};
20
21use async_trait::async_trait;
22use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
25use catalog::process_manager::ProcessManager;
26use clap::Parser;
27use common_base::Plugins;
28use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
29use common_config::{Configurable, metadata_store_dir};
30use common_error::ext::BoxedError;
31use common_meta::cache::LayeredCacheRegistryBuilder;
32use common_meta::ddl::flow_meta::FlowMetadataAllocator;
33use common_meta::ddl::table_meta::TableMetadataAllocator;
34use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef, DdlManagerRef};
36use common_meta::key::flow::FlowMetadataManager;
37use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
38use common_meta::kv_backend::KvBackendRef;
39use common_meta::node_manager::{FlownodeRef, NodeManagerRef};
40use common_meta::procedure_executor::{LocalProcedureExecutor, ProcedureExecutorRef};
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::{Sequence, SequenceBuilder};
44use common_meta::wal_provider::{WalProviderRef, build_wal_provider};
45use common_options::plugin_options::StandaloneFlag;
46use common_procedure::ProcedureManagerRef;
47use common_query::prelude::set_default_prefix;
48use common_telemetry::info;
49use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
50use common_time::timezone::set_default_timezone;
51use common_version::{short_version, verbose_version};
52use datanode::config::DatanodeOptions;
53use datanode::datanode::{Datanode, DatanodeBuilder};
54use datanode::region_server::RegionServer;
55use flow::{
56    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
57    GrpcQueryHandlerWithBoxedError,
58};
59use frontend::frontend::Frontend;
60use frontend::instance::StandaloneDatanodeManager;
61use frontend::instance::builder::FrontendBuilder;
62use frontend::server::Services;
63use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
64use plugins::PluginOptions;
65use plugins::frontend::context::{
66    CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
67};
68use plugins::standalone::context::DdlManagerConfigureContext;
69use servers::tls::{TlsMode, TlsOption, merge_tls_option};
70use snafu::ResultExt;
71use standalone::options::StandaloneOptions;
72use standalone::{StandaloneInformationExtension, StandaloneRepartitionProcedureFactory};
73use tracing_appender::non_blocking::WorkerGuard;
74
75use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
76use crate::options::{GlobalOptions, GreptimeOptions};
77use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
78
79pub const APP_NAME: &str = "greptime-standalone";
80
81#[derive(Parser)]
82pub struct Command {
83    #[clap(subcommand)]
84    subcmd: SubCommand,
85}
86
87impl Command {
88    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
89        self.subcmd.build(opts).await
90    }
91
92    pub fn load_options(
93        &self,
94        global_options: &GlobalOptions,
95    ) -> Result<GreptimeOptions<StandaloneOptions>> {
96        self.subcmd.load_options(global_options)
97    }
98}
99
100#[derive(Parser)]
101enum SubCommand {
102    Start(StartCommand),
103}
104
105impl SubCommand {
106    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
107        match self {
108            SubCommand::Start(cmd) => cmd.build(opts).await,
109        }
110    }
111
112    fn load_options(
113        &self,
114        global_options: &GlobalOptions,
115    ) -> Result<GreptimeOptions<StandaloneOptions>> {
116        match self {
117            SubCommand::Start(cmd) => cmd.load_options(global_options),
118        }
119    }
120}
121
122pub struct Instance {
123    datanode: Datanode,
124    frontend: Frontend,
125    flownode: FlownodeInstance,
126    procedure_manager: ProcedureManagerRef,
127    wal_provider: WalProviderRef,
128    // Keep the logging guard to prevent the worker from being dropped.
129    _guard: Vec<WorkerGuard>,
130}
131
132impl Instance {
133    /// Find the socket addr of a server by its `name`.
134    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
135        self.frontend.server_handlers().addr(name)
136    }
137
138    /// Get the mutable Frontend component of this Standalone instance for externally modification
139    /// by others (might not be in this code base, so don't delete this function).
140    pub fn mut_frontend(&mut self) -> &mut Frontend {
141        &mut self.frontend
142    }
143
144    /// Get the Datanode component of this Standalone instance for externally usage
145    /// by others (might not be in this code base, so don't delete this function).
146    pub fn datanode(&self) -> &Datanode {
147        &self.datanode
148    }
149}
150
151#[async_trait]
152impl App for Instance {
153    fn name(&self) -> &str {
154        APP_NAME
155    }
156
157    async fn start(&mut self) -> Result<()> {
158        self.datanode.start_telemetry();
159
160        self.procedure_manager
161            .start()
162            .await
163            .context(error::StartProcedureManagerSnafu)?;
164
165        self.wal_provider
166            .start()
167            .await
168            .context(error::StartWalProviderSnafu)?;
169
170        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
171            .await
172            .context(error::StartFrontendSnafu)?;
173
174        self.frontend
175            .start()
176            .await
177            .context(error::StartFrontendSnafu)?;
178
179        self.flownode.start().await.context(StartFlownodeSnafu)?;
180
181        Ok(())
182    }
183
184    async fn stop(&mut self) -> Result<()> {
185        self.frontend
186            .shutdown()
187            .await
188            .context(error::ShutdownFrontendSnafu)?;
189
190        self.procedure_manager
191            .stop()
192            .await
193            .context(error::StopProcedureManagerSnafu)?;
194
195        self.datanode
196            .shutdown()
197            .await
198            .context(error::ShutdownDatanodeSnafu)?;
199
200        self.flownode
201            .shutdown()
202            .await
203            .context(error::ShutdownFlownodeSnafu)?;
204
205        info!("Datanode instance stopped.");
206
207        Ok(())
208    }
209}
210
211#[derive(Debug, Default, Parser)]
212pub struct StartCommand {
213    #[clap(long)]
214    http_addr: Option<String>,
215    #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
216    grpc_bind_addr: Option<String>,
217    #[clap(long)]
218    mysql_addr: Option<String>,
219    #[clap(long)]
220    postgres_addr: Option<String>,
221    #[clap(short, long)]
222    influxdb_enable: bool,
223    #[clap(short, long)]
224    pub config_file: Option<String>,
225    #[clap(long)]
226    tls_mode: Option<TlsMode>,
227    #[clap(long)]
228    tls_cert_path: Option<String>,
229    #[clap(long)]
230    tls_key_path: Option<String>,
231    #[clap(long)]
232    tls_watch: bool,
233    #[clap(long)]
234    user_provider: Option<String>,
235    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
236    pub env_prefix: String,
237    /// The working home directory of this standalone instance.
238    #[clap(long)]
239    data_home: Option<String>,
240}
241
242impl StartCommand {
243    /// Load the GreptimeDB options from various sources (command line, config file or env).
244    pub fn load_options(
245        &self,
246        global_options: &GlobalOptions,
247    ) -> Result<GreptimeOptions<StandaloneOptions>> {
248        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
249            self.config_file.as_deref(),
250            self.env_prefix.as_ref(),
251        )
252        .context(error::LoadLayeredConfigSnafu)?;
253
254        self.merge_with_cli_options(global_options, &mut opts.component)?;
255        opts.component.sanitize();
256
257        Ok(opts)
258    }
259
260    // The precedence order is: cli > config file > environment variables > default values.
261    pub fn merge_with_cli_options(
262        &self,
263        global_options: &GlobalOptions,
264        opts: &mut StandaloneOptions,
265    ) -> Result<()> {
266        if let Some(dir) = &global_options.log_dir {
267            opts.logging.dir.clone_from(dir);
268        }
269
270        if global_options.log_level.is_some() {
271            opts.logging.level.clone_from(&global_options.log_level);
272        }
273
274        opts.tracing = TracingOptions {
275            #[cfg(feature = "tokio-console")]
276            tokio_console_addr: global_options.tokio_console_addr.clone(),
277        };
278
279        let tls_opts = TlsOption::new(
280            self.tls_mode,
281            self.tls_cert_path.clone(),
282            self.tls_key_path.clone(),
283            self.tls_watch,
284        );
285
286        if let Some(addr) = &self.http_addr {
287            opts.http.addr.clone_from(addr);
288        }
289
290        if let Some(data_home) = &self.data_home {
291            opts.storage.data_home.clone_from(data_home);
292        }
293
294        // If the logging dir is not set, use the default logs dir in the data home.
295        if opts.logging.dir.is_empty() {
296            opts.logging.dir = Path::new(&opts.storage.data_home)
297                .join(DEFAULT_LOGGING_DIR)
298                .to_string_lossy()
299                .to_string();
300        }
301
302        if let Some(addr) = &self.grpc_bind_addr {
303            // frontend grpc addr conflict with datanode default grpc addr
304            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
305            if addr.eq(&datanode_grpc_addr) {
306                return error::IllegalConfigSnafu {
307                    msg: format!(
308                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
309                    ),
310                }.fail();
311            }
312            opts.grpc.bind_addr.clone_from(addr);
313            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
314        }
315
316        if let Some(addr) = &self.mysql_addr {
317            opts.mysql.enable = true;
318            opts.mysql.addr.clone_from(addr);
319            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
320        }
321
322        if let Some(addr) = &self.postgres_addr {
323            opts.postgres.enable = true;
324            opts.postgres.addr.clone_from(addr);
325            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
326        }
327
328        if self.influxdb_enable {
329            opts.influxdb.enable = self.influxdb_enable;
330        }
331
332        if let Some(user_provider) = &self.user_provider {
333            opts.user_provider = Some(user_provider.clone());
334        }
335
336        Ok(())
337    }
338
339    #[allow(unreachable_code)]
340    #[allow(unused_variables)]
341    #[allow(clippy::diverging_sub_expression)]
342    /// Build GreptimeDB instance with the loaded options.
343    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
344        common_runtime::init_global_runtimes(&opts.runtime);
345
346        let guard = common_telemetry::init_global_logging(
347            APP_NAME,
348            &opts.component.logging,
349            &opts.component.tracing,
350            None,
351            Some(&opts.component.slow_query),
352        );
353
354        log_versions(verbose_version(), short_version(), APP_NAME);
355        maybe_activate_heap_profile(&opts.component.memory);
356        create_resource_limit_metrics(APP_NAME);
357
358        info!("Standalone start command: {:#?}", self);
359        info!("Standalone options: {opts:#?}");
360
361        let (mut instance, _) =
362            Self::build_with(opts.component, opts.plugins, InstanceCreator::default()).await?;
363        instance._guard.extend(guard);
364        Ok(instance)
365    }
366
367    pub async fn build_with(
368        mut opts: StandaloneOptions,
369        plugin_opts: Vec<PluginOptions>,
370        creator: InstanceCreator,
371    ) -> Result<(Instance, InstanceCreatorResult)> {
372        let mut plugins = Plugins::new();
373        plugins.insert(StandaloneFlag);
374        set_default_prefix(opts.default_column_prefix.as_deref())
375            .map_err(BoxedError::new)
376            .context(error::BuildCliSnafu)?;
377
378        opts.grpc.detect_server_addr();
379        let fe_opts = opts.frontend_options();
380        let dn_opts = opts.datanode_options();
381
382        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
383            .await
384            .context(error::StartFrontendSnafu)?;
385
386        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
387            .await
388            .context(error::StartDatanodeSnafu)?;
389
390        set_default_timezone(fe_opts.default_timezone.as_deref())
391            .context(error::InitTimezoneSnafu)?;
392
393        let data_home = &dn_opts.storage.data_home;
394        // Ensure the data_home directory exists.
395        fs::create_dir_all(path::Path::new(data_home))
396            .context(error::CreateDirSnafu { dir: data_home })?;
397
398        let metadata_dir = metadata_store_dir(data_home);
399        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
400            .context(error::BuildMetadataKvbackendSnafu)?;
401        let procedure_manager =
402            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
403
404        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
405            .await
406            .context(error::SetupStandalonePluginsSnafu)?;
407
408        // Builds cache registry
409        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
410        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
411        let layered_cache_registry = Arc::new(
412            with_default_composite_cache_registry(
413                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
414            )
415            .context(error::BuildCacheRegistrySnafu)?
416            .build(),
417        );
418
419        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
420        builder.with_cache_registry(layered_cache_registry.clone());
421        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
422
423        let information_extension = Arc::new(StandaloneInformationExtension::new(
424            datanode.region_server(),
425            procedure_manager.clone(),
426        ));
427
428        plugins.insert::<InformationExtensionRef>(information_extension.clone());
429
430        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
431
432        // for standalone not use grpc, but get a handler to frontend grpc client without
433        // actually make a connection
434        let (frontend_client, frontend_instance_handler) =
435            FrontendClient::from_empty_grpc_handler(opts.query.clone());
436        let frontend_client = Arc::new(frontend_client);
437
438        let builder = KvBackendCatalogManagerBuilder::new(
439            information_extension.clone(),
440            kv_backend.clone(),
441            layered_cache_registry.clone(),
442        )
443        .with_procedure_manager(procedure_manager.clone())
444        .with_process_manager(process_manager.clone());
445        let builder = if let Some(configurator) =
446            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
447        {
448            let ctx = StandaloneCatalogManagerConfigureContext {
449                fe_client: frontend_client.clone(),
450            };
451            let ctx = CatalogManagerConfigureContext::Standalone(ctx);
452            configurator
453                .configure(builder, ctx)
454                .await
455                .context(OtherSnafu)?
456        } else {
457            builder
458        };
459        let catalog_manager = builder.build();
460
461        let table_metadata_manager =
462            Self::create_table_metadata_manager(kv_backend.clone()).await?;
463
464        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
465        let flownode_options = FlownodeOptions {
466            flow: opts.flow.clone(),
467            ..Default::default()
468        };
469
470        let flow_builder = FlownodeBuilder::new(
471            flownode_options,
472            plugins.clone(),
473            table_metadata_manager.clone(),
474            catalog_manager.clone(),
475            flow_metadata_manager.clone(),
476            frontend_client.clone(),
477        );
478        let flownode = flow_builder
479            .build()
480            .await
481            .map_err(BoxedError::new)
482            .context(error::OtherSnafu)?;
483
484        // set the ref to query for the local flow state
485        {
486            information_extension
487                .set_flow_engine(flownode.flow_engine())
488                .await;
489        }
490
491        let node_manager = creator
492            .node_manager_creator
493            .create(
494                &kv_backend,
495                datanode.region_server(),
496                flownode.flow_engine(),
497            )
498            .await?;
499
500        let table_id_allocator = creator.table_id_allocator_creator.create(&kv_backend);
501        let flow_id_sequence = Arc::new(
502            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
503                .initial(MIN_USER_FLOW_ID as u64)
504                .step(10)
505                .build(),
506        );
507        let kafka_options = opts
508            .wal
509            .clone()
510            .try_into()
511            .context(error::InvalidWalProviderSnafu)?;
512        let wal_provider = build_wal_provider(&kafka_options, kv_backend.clone())
513            .await
514            .context(error::BuildWalProviderSnafu)?;
515        let wal_provider = Arc::new(wal_provider);
516        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
517            table_id_allocator.clone(),
518            wal_provider.clone(),
519        ));
520        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
521            flow_id_sequence,
522        ));
523
524        let ddl_context = DdlContext {
525            node_manager: node_manager.clone(),
526            cache_invalidator: layered_cache_registry.clone(),
527            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
528            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
529            table_metadata_manager: table_metadata_manager.clone(),
530            table_metadata_allocator: table_metadata_allocator.clone(),
531            flow_metadata_manager: flow_metadata_manager.clone(),
532            flow_metadata_allocator: flow_metadata_allocator.clone(),
533            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
534        };
535
536        let ddl_manager = DdlManager::try_new(
537            ddl_context,
538            procedure_manager.clone(),
539            Arc::new(StandaloneRepartitionProcedureFactory),
540            true,
541        )
542        .context(error::InitDdlManagerSnafu)?;
543
544        let ddl_manager = if let Some(configurator) =
545            plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
546        {
547            let ctx = DdlManagerConfigureContext {
548                kv_backend: kv_backend.clone(),
549                fe_client: frontend_client.clone(),
550                catalog_manager: catalog_manager.clone(),
551            };
552            configurator
553                .configure(ddl_manager, ctx)
554                .await
555                .context(OtherSnafu)?
556        } else {
557            ddl_manager
558        };
559
560        let procedure_executor = creator
561            .procedure_executor_creator
562            .create(Arc::new(ddl_manager), procedure_manager.clone())
563            .await?;
564
565        let fe_instance = FrontendBuilder::new(
566            fe_opts.clone(),
567            kv_backend.clone(),
568            layered_cache_registry.clone(),
569            catalog_manager.clone(),
570            node_manager.clone(),
571            procedure_executor.clone(),
572            process_manager,
573        )
574        .with_plugin(plugins.clone())
575        .try_build()
576        .await
577        .context(error::StartFrontendSnafu)?;
578        let fe_instance = Arc::new(fe_instance);
579
580        // set the frontend client for flownode
581        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
582        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
583        frontend_instance_handler
584            .set_handler(weak_grpc_handler)
585            .await;
586
587        // set the frontend invoker for flownode
588        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
589        // flow server need to be able to use frontend to write insert requests back
590        let invoker = FrontendInvoker::build_from(
591            flow_streaming_engine.clone(),
592            catalog_manager.clone(),
593            kv_backend.clone(),
594            layered_cache_registry.clone(),
595            procedure_executor,
596            node_manager.clone(),
597        )
598        .await
599        .context(StartFlownodeSnafu)?;
600        flow_streaming_engine.set_frontend_invoker(invoker).await;
601
602        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
603            .build()
604            .context(error::StartFrontendSnafu)?;
605
606        let frontend = Frontend {
607            instance: fe_instance,
608            servers,
609            heartbeat_task: None,
610        };
611
612        let instance = Instance {
613            datanode,
614            frontend,
615            flownode,
616            procedure_manager,
617            wal_provider,
618            _guard: vec![],
619        };
620        let result = InstanceCreatorResult {
621            kv_backend,
622            node_manager,
623            table_id_allocator,
624        };
625        Ok((instance, result))
626    }
627
628    pub async fn create_table_metadata_manager(
629        kv_backend: KvBackendRef,
630    ) -> Result<TableMetadataManagerRef> {
631        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
632
633        table_metadata_manager
634            .init()
635            .await
636            .context(error::InitMetadataSnafu)?;
637
638        Ok(table_metadata_manager)
639    }
640}
641
642#[async_trait]
643pub trait NodeManagerCreator {
644    async fn create(
645        &self,
646        kv_backend: &KvBackendRef,
647        region_server: RegionServer,
648        flow_server: FlownodeRef,
649    ) -> Result<NodeManagerRef>;
650}
651
652pub struct DefaultNodeManagerCreator;
653
654#[async_trait]
655impl NodeManagerCreator for DefaultNodeManagerCreator {
656    async fn create(
657        &self,
658        _: &KvBackendRef,
659        region_server: RegionServer,
660        flow_server: FlownodeRef,
661    ) -> Result<NodeManagerRef> {
662        Ok(Arc::new(StandaloneDatanodeManager {
663            region_server,
664            flow_server,
665        }))
666    }
667}
668
669pub trait TableIdAllocatorCreator {
670    fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence>;
671}
672
673struct DefaultTableIdAllocatorCreator;
674
675impl TableIdAllocatorCreator for DefaultTableIdAllocatorCreator {
676    fn create(&self, kv_backend: &KvBackendRef) -> Arc<Sequence> {
677        Arc::new(
678            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
679                .initial(MIN_USER_TABLE_ID as u64)
680                .step(10)
681                .build(),
682        )
683    }
684}
685
686#[async_trait]
687pub trait ProcedureExecutorCreator {
688    async fn create(
689        &self,
690        ddl_manager: DdlManagerRef,
691        procedure_manager: ProcedureManagerRef,
692    ) -> Result<ProcedureExecutorRef>;
693}
694
695pub struct DefaultProcedureExecutorCreator;
696
697#[async_trait]
698impl ProcedureExecutorCreator for DefaultProcedureExecutorCreator {
699    async fn create(
700        &self,
701        ddl_manager: DdlManagerRef,
702        procedure_manager: ProcedureManagerRef,
703    ) -> Result<ProcedureExecutorRef> {
704        Ok(Arc::new(LocalProcedureExecutor::new(
705            ddl_manager,
706            procedure_manager,
707        )))
708    }
709}
710
711/// `InstanceCreator` is used for grouping various component creators for building the
712/// Standalone instance, suitable for customizing how the instance can be built.
713pub struct InstanceCreator {
714    node_manager_creator: Box<dyn NodeManagerCreator>,
715    table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
716    procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
717}
718
719impl InstanceCreator {
720    pub fn new(
721        node_manager_creator: Box<dyn NodeManagerCreator>,
722        table_id_allocator_creator: Box<dyn TableIdAllocatorCreator>,
723        procedure_executor_creator: Box<dyn ProcedureExecutorCreator>,
724    ) -> Self {
725        Self {
726            node_manager_creator,
727            table_id_allocator_creator,
728            procedure_executor_creator,
729        }
730    }
731}
732
733impl Default for InstanceCreator {
734    fn default() -> Self {
735        Self {
736            node_manager_creator: Box::new(DefaultNodeManagerCreator),
737            table_id_allocator_creator: Box::new(DefaultTableIdAllocatorCreator),
738            procedure_executor_creator: Box::new(DefaultProcedureExecutorCreator),
739        }
740    }
741}
742
743/// `InstanceCreatorResult` is expected to be used paired with [InstanceCreator].
744/// It stores the created and other important components for further reusing.
745pub struct InstanceCreatorResult {
746    pub kv_backend: KvBackendRef,
747    pub node_manager: NodeManagerRef,
748    pub table_id_allocator: Arc<Sequence>,
749}
750
751#[cfg(test)]
752mod tests {
753    use std::default::Default;
754    use std::io::Write;
755    use std::time::Duration;
756
757    use auth::{Identity, Password, UserProviderRef};
758    use clap::{CommandFactory, Parser};
759    use common_base::readable_size::ReadableSize;
760    use common_config::ENV_VAR_SEP;
761    use common_test_util::temp_dir::create_named_temp_file;
762    use common_wal::config::DatanodeWalConfig;
763    use frontend::frontend::FrontendOptions;
764    use object_store::config::{FileConfig, GcsConfig};
765    use servers::grpc::GrpcOptions;
766
767    use super::*;
768    use crate::options::GlobalOptions;
769
770    #[tokio::test]
771    async fn test_try_from_start_command_to_anymap() {
772        let fe_opts = FrontendOptions {
773            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
774            ..Default::default()
775        };
776
777        let mut plugins = Plugins::new();
778        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
779            .await
780            .unwrap();
781
782        let provider = plugins.get::<UserProviderRef>().unwrap();
783        let result = provider
784            .authenticate(
785                Identity::UserId("test", None),
786                Password::PlainText("test".to_string().into()),
787            )
788            .await;
789        let _ = result.unwrap();
790    }
791
792    #[test]
793    fn test_toml() {
794        let opts = StandaloneOptions::default();
795        let toml_string = toml::to_string(&opts).unwrap();
796        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
797    }
798
799    #[test]
800    fn test_read_from_config_file() {
801        let mut file = create_named_temp_file();
802        let toml_str = r#"
803            enable_memory_catalog = true
804
805            [wal]
806            provider = "raft_engine"
807            dir = "./greptimedb_data/test/wal"
808            file_size = "1GB"
809            purge_threshold = "50GB"
810            purge_interval = "10m"
811            read_batch_size = 128
812            sync_write = false
813
814            [storage]
815            data_home = "./greptimedb_data/"
816            type = "File"
817
818            [[storage.providers]]
819            type = "Gcs"
820            bucket = "foo"
821            endpoint = "bar"
822
823            [[storage.providers]]
824            type = "S3"
825            access_key_id = "access_key_id"
826            secret_access_key = "secret_access_key"
827
828            [storage.compaction]
829            max_inflight_tasks = 3
830            max_files_in_level0 = 7
831            max_purge_tasks = 32
832
833            [storage.manifest]
834            checkpoint_margin = 9
835            gc_duration = '7s'
836
837            [http]
838            addr = "127.0.0.1:4000"
839            timeout = "33s"
840            body_limit = "128MB"
841
842            [opentsdb]
843            enable = true
844
845            [logging]
846            level = "debug"
847            dir = "./greptimedb_data/test/logs"
848        "#;
849        write!(file, "{}", toml_str).unwrap();
850        let cmd = StartCommand {
851            config_file: Some(file.path().to_str().unwrap().to_string()),
852            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
853            ..Default::default()
854        };
855
856        let options = cmd
857            .load_options(&GlobalOptions::default())
858            .unwrap()
859            .component;
860        let fe_opts = options.frontend_options();
861        let dn_opts = options.datanode_options();
862        let logging_opts = options.logging;
863        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
864        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
865        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
866        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
867        assert!(fe_opts.mysql.enable);
868        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
869        assert_eq!(2, fe_opts.mysql.runtime_size);
870        assert_eq!(None, fe_opts.mysql.reject_no_database);
871        assert!(fe_opts.influxdb.enable);
872        assert!(fe_opts.opentsdb.enable);
873
874        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
875            unreachable!()
876        };
877        assert_eq!(
878            "./greptimedb_data/test/wal",
879            raft_engine_config.dir.unwrap()
880        );
881
882        assert!(matches!(
883            &dn_opts.storage.store,
884            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
885        ));
886        assert_eq!(dn_opts.storage.providers.len(), 2);
887        assert!(matches!(
888            dn_opts.storage.providers[0],
889            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
890        ));
891        match &dn_opts.storage.providers[1] {
892            object_store::config::ObjectStoreConfig::S3(s3_config) => {
893                assert_eq!(
894                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
895                    format!("{:?}", s3_config.connection.access_key_id)
896                );
897            }
898            _ => {
899                unreachable!()
900            }
901        }
902
903        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
904        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
905    }
906
907    #[test]
908    fn test_load_log_options_from_cli() {
909        let cmd = StartCommand {
910            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
911            mysql_addr: Some("127.0.0.1:4002".to_string()),
912            postgres_addr: Some("127.0.0.1:4003".to_string()),
913            ..Default::default()
914        };
915
916        let opts = cmd
917            .load_options(&GlobalOptions {
918                log_dir: Some("./greptimedb_data/test/logs".to_string()),
919                log_level: Some("debug".to_string()),
920
921                #[cfg(feature = "tokio-console")]
922                tokio_console_addr: None,
923            })
924            .unwrap()
925            .component;
926
927        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
928        assert_eq!("debug", opts.logging.level.unwrap());
929    }
930
931    #[test]
932    fn test_config_precedence_order() {
933        let mut file = create_named_temp_file();
934        let toml_str = r#"
935            [http]
936            addr = "127.0.0.1:4000"
937
938            [logging]
939            level = "debug"
940        "#;
941        write!(file, "{}", toml_str).unwrap();
942
943        let env_prefix = "STANDALONE_UT";
944        temp_env::with_vars(
945            [
946                (
947                    // logging.dir = /other/log/dir
948                    [
949                        env_prefix.to_string(),
950                        "logging".to_uppercase(),
951                        "dir".to_uppercase(),
952                    ]
953                    .join(ENV_VAR_SEP),
954                    Some("/other/log/dir"),
955                ),
956                (
957                    // logging.level = info
958                    [
959                        env_prefix.to_string(),
960                        "logging".to_uppercase(),
961                        "level".to_uppercase(),
962                    ]
963                    .join(ENV_VAR_SEP),
964                    Some("info"),
965                ),
966                (
967                    // http.addr = 127.0.0.1:24000
968                    [
969                        env_prefix.to_string(),
970                        "http".to_uppercase(),
971                        "addr".to_uppercase(),
972                    ]
973                    .join(ENV_VAR_SEP),
974                    Some("127.0.0.1:24000"),
975                ),
976            ],
977            || {
978                let command = StartCommand {
979                    config_file: Some(file.path().to_str().unwrap().to_string()),
980                    http_addr: Some("127.0.0.1:14000".to_string()),
981                    env_prefix: env_prefix.to_string(),
982                    ..Default::default()
983                };
984
985                let opts = command.load_options(&Default::default()).unwrap().component;
986
987                // Should be read from env, env > default values.
988                assert_eq!(opts.logging.dir, "/other/log/dir");
989
990                // Should be read from config file, config file > env > default values.
991                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
992
993                // Should be read from cli, cli > config file > env > default values.
994                let fe_opts = opts.frontend_options();
995                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
996                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
997
998                // Should be default value.
999                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
1000            },
1001        );
1002    }
1003
1004    #[test]
1005    fn test_parse_grpc_bind_addr_aliases() {
1006        let command =
1007            StartCommand::try_parse_from(["standalone", "--grpc-bind-addr", "127.0.0.1:14001"])
1008                .unwrap();
1009        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:14001"));
1010
1011        let command =
1012            StartCommand::try_parse_from(["standalone", "--rpc-bind-addr", "127.0.0.1:24001"])
1013                .unwrap();
1014        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:24001"));
1015
1016        let command =
1017            StartCommand::try_parse_from(["standalone", "--rpc-addr", "127.0.0.1:34001"]).unwrap();
1018        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:34001"));
1019    }
1020
1021    #[test]
1022    fn test_help_uses_grpc_option_names() {
1023        let mut cmd = StartCommand::command();
1024        let mut help = Vec::new();
1025        cmd.write_long_help(&mut help).unwrap();
1026        let help = String::from_utf8(help).unwrap();
1027
1028        assert!(help.contains("--grpc-bind-addr"));
1029        assert!(!help.contains("--rpc-bind-addr"));
1030        assert!(!help.contains("--rpc-addr"));
1031    }
1032
1033    #[test]
1034    fn test_load_default_standalone_options() {
1035        let options =
1036            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
1037        let default_options = StandaloneOptions::default();
1038        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
1039        assert_eq!(options.http, default_options.http);
1040        assert_eq!(options.grpc, default_options.grpc);
1041        assert_eq!(options.mysql, default_options.mysql);
1042        assert_eq!(options.postgres, default_options.postgres);
1043        assert_eq!(options.opentsdb, default_options.opentsdb);
1044        assert_eq!(options.influxdb, default_options.influxdb);
1045        assert_eq!(options.prom_store, default_options.prom_store);
1046        assert_eq!(options.wal, default_options.wal);
1047        assert_eq!(options.metadata_store, default_options.metadata_store);
1048        assert_eq!(options.procedure, default_options.procedure);
1049        assert_eq!(options.logging, default_options.logging);
1050        assert_eq!(options.region_engine, default_options.region_engine);
1051    }
1052
1053    #[test]
1054    fn test_cache_config() {
1055        let toml_str = r#"
1056            [storage]
1057            data_home = "test_data_home"
1058            type = "S3"
1059            [storage.cache_config]
1060            enable_read_cache = true
1061        "#;
1062        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
1063        opts.sanitize();
1064        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
1065        assert_eq!(
1066            opts.storage.store.cache_config().unwrap().cache_path,
1067            "test_data_home"
1068        );
1069    }
1070}