cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::net::SocketAddr;
17use std::path::Path;
18use std::sync::Arc;
19use std::{fs, path};
20
21use async_trait::async_trait;
22use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder};
25use catalog::process_manager::ProcessManager;
26use clap::Parser;
27use common_base::Plugins;
28use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
29use common_config::{Configurable, metadata_store_dir};
30use common_error::ext::BoxedError;
31use common_meta::cache::LayeredCacheRegistryBuilder;
32use common_meta::ddl::flow_meta::FlowMetadataAllocator;
33use common_meta::ddl::table_meta::TableMetadataAllocator;
34use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
35use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
36use common_meta::key::flow::FlowMetadataManager;
37use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
38use common_meta::kv_backend::KvBackendRef;
39use common_meta::procedure_executor::LocalProcedureExecutor;
40use common_meta::region_keeper::MemoryRegionKeeper;
41use common_meta::region_registry::LeaderRegionRegistry;
42use common_meta::sequence::SequenceBuilder;
43use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
44use common_procedure::ProcedureManagerRef;
45use common_query::prelude::set_default_prefix;
46use common_telemetry::info;
47use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
48use common_time::timezone::set_default_timezone;
49use common_version::{short_version, verbose_version};
50use datanode::config::DatanodeOptions;
51use datanode::datanode::{Datanode, DatanodeBuilder};
52use flow::{
53    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
54    GrpcQueryHandlerWithBoxedError,
55};
56use frontend::frontend::Frontend;
57use frontend::instance::StandaloneDatanodeManager;
58use frontend::instance::builder::FrontendBuilder;
59use frontend::server::Services;
60use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
61use plugins::frontend::context::{
62    CatalogManagerConfigureContext, StandaloneCatalogManagerConfigureContext,
63};
64use plugins::standalone::context::DdlManagerConfigureContext;
65use servers::tls::{TlsMode, TlsOption, merge_tls_option};
66use snafu::ResultExt;
67use standalone::StandaloneInformationExtension;
68use standalone::options::StandaloneOptions;
69use tracing_appender::non_blocking::WorkerGuard;
70
71use crate::error::{OtherSnafu, Result, StartFlownodeSnafu};
72use crate::options::{GlobalOptions, GreptimeOptions};
73use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
74
75pub const APP_NAME: &str = "greptime-standalone";
76
77#[derive(Parser)]
78pub struct Command {
79    #[clap(subcommand)]
80    subcmd: SubCommand,
81}
82
83impl Command {
84    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
85        self.subcmd.build(opts).await
86    }
87
88    pub fn load_options(
89        &self,
90        global_options: &GlobalOptions,
91    ) -> Result<GreptimeOptions<StandaloneOptions>> {
92        self.subcmd.load_options(global_options)
93    }
94}
95
96#[derive(Parser)]
97enum SubCommand {
98    Start(StartCommand),
99}
100
101impl SubCommand {
102    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
103        match self {
104            SubCommand::Start(cmd) => cmd.build(opts).await,
105        }
106    }
107
108    fn load_options(
109        &self,
110        global_options: &GlobalOptions,
111    ) -> Result<GreptimeOptions<StandaloneOptions>> {
112        match self {
113            SubCommand::Start(cmd) => cmd.load_options(global_options),
114        }
115    }
116}
117
118pub struct Instance {
119    datanode: Datanode,
120    frontend: Frontend,
121    flownode: FlownodeInstance,
122    procedure_manager: ProcedureManagerRef,
123    wal_options_allocator: WalOptionsAllocatorRef,
124    // Keep the logging guard to prevent the worker from being dropped.
125    _guard: Vec<WorkerGuard>,
126}
127
128impl Instance {
129    /// Find the socket addr of a server by its `name`.
130    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
131        self.frontend.server_handlers().addr(name)
132    }
133}
134
135#[async_trait]
136impl App for Instance {
137    fn name(&self) -> &str {
138        APP_NAME
139    }
140
141    async fn start(&mut self) -> Result<()> {
142        self.datanode.start_telemetry();
143
144        self.procedure_manager
145            .start()
146            .await
147            .context(error::StartProcedureManagerSnafu)?;
148
149        self.wal_options_allocator
150            .start()
151            .await
152            .context(error::StartWalOptionsAllocatorSnafu)?;
153
154        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
155            .await
156            .context(error::StartFrontendSnafu)?;
157
158        self.frontend
159            .start()
160            .await
161            .context(error::StartFrontendSnafu)?;
162
163        self.flownode.start().await.context(StartFlownodeSnafu)?;
164
165        Ok(())
166    }
167
168    async fn stop(&mut self) -> Result<()> {
169        self.frontend
170            .shutdown()
171            .await
172            .context(error::ShutdownFrontendSnafu)?;
173
174        self.procedure_manager
175            .stop()
176            .await
177            .context(error::StopProcedureManagerSnafu)?;
178
179        self.datanode
180            .shutdown()
181            .await
182            .context(error::ShutdownDatanodeSnafu)?;
183
184        self.flownode
185            .shutdown()
186            .await
187            .context(error::ShutdownFlownodeSnafu)?;
188
189        info!("Datanode instance stopped.");
190
191        Ok(())
192    }
193}
194
195#[derive(Debug, Default, Parser)]
196pub struct StartCommand {
197    #[clap(long)]
198    http_addr: Option<String>,
199    #[clap(long, alias = "rpc-addr")]
200    rpc_bind_addr: Option<String>,
201    #[clap(long)]
202    mysql_addr: Option<String>,
203    #[clap(long)]
204    postgres_addr: Option<String>,
205    #[clap(short, long)]
206    influxdb_enable: bool,
207    #[clap(short, long)]
208    pub config_file: Option<String>,
209    #[clap(long)]
210    tls_mode: Option<TlsMode>,
211    #[clap(long)]
212    tls_cert_path: Option<String>,
213    #[clap(long)]
214    tls_key_path: Option<String>,
215    #[clap(long)]
216    tls_watch: bool,
217    #[clap(long)]
218    user_provider: Option<String>,
219    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
220    pub env_prefix: String,
221    /// The working home directory of this standalone instance.
222    #[clap(long)]
223    data_home: Option<String>,
224}
225
226impl StartCommand {
227    /// Load the GreptimeDB options from various sources (command line, config file or env).
228    pub fn load_options(
229        &self,
230        global_options: &GlobalOptions,
231    ) -> Result<GreptimeOptions<StandaloneOptions>> {
232        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
233            self.config_file.as_deref(),
234            self.env_prefix.as_ref(),
235        )
236        .context(error::LoadLayeredConfigSnafu)?;
237
238        self.merge_with_cli_options(global_options, &mut opts.component)?;
239        opts.component.sanitize();
240
241        Ok(opts)
242    }
243
244    // The precedence order is: cli > config file > environment variables > default values.
245    pub fn merge_with_cli_options(
246        &self,
247        global_options: &GlobalOptions,
248        opts: &mut StandaloneOptions,
249    ) -> Result<()> {
250        if let Some(dir) = &global_options.log_dir {
251            opts.logging.dir.clone_from(dir);
252        }
253
254        if global_options.log_level.is_some() {
255            opts.logging.level.clone_from(&global_options.log_level);
256        }
257
258        opts.tracing = TracingOptions {
259            #[cfg(feature = "tokio-console")]
260            tokio_console_addr: global_options.tokio_console_addr.clone(),
261        };
262
263        let tls_opts = TlsOption::new(
264            self.tls_mode.clone(),
265            self.tls_cert_path.clone(),
266            self.tls_key_path.clone(),
267            self.tls_watch,
268        );
269
270        if let Some(addr) = &self.http_addr {
271            opts.http.addr.clone_from(addr);
272        }
273
274        if let Some(data_home) = &self.data_home {
275            opts.storage.data_home.clone_from(data_home);
276        }
277
278        // If the logging dir is not set, use the default logs dir in the data home.
279        if opts.logging.dir.is_empty() {
280            opts.logging.dir = Path::new(&opts.storage.data_home)
281                .join(DEFAULT_LOGGING_DIR)
282                .to_string_lossy()
283                .to_string();
284        }
285
286        if let Some(addr) = &self.rpc_bind_addr {
287            // frontend grpc addr conflict with datanode default grpc addr
288            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
289            if addr.eq(&datanode_grpc_addr) {
290                return error::IllegalConfigSnafu {
291                    msg: format!(
292                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
293                    ),
294                }.fail();
295            }
296            opts.grpc.bind_addr.clone_from(addr);
297            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
298        }
299
300        if let Some(addr) = &self.mysql_addr {
301            opts.mysql.enable = true;
302            opts.mysql.addr.clone_from(addr);
303            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
304        }
305
306        if let Some(addr) = &self.postgres_addr {
307            opts.postgres.enable = true;
308            opts.postgres.addr.clone_from(addr);
309            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
310        }
311
312        if self.influxdb_enable {
313            opts.influxdb.enable = self.influxdb_enable;
314        }
315
316        if let Some(user_provider) = &self.user_provider {
317            opts.user_provider = Some(user_provider.clone());
318        }
319
320        Ok(())
321    }
322
323    #[allow(unreachable_code)]
324    #[allow(unused_variables)]
325    #[allow(clippy::diverging_sub_expression)]
326    /// Build GreptimeDB instance with the loaded options.
327    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
328        common_runtime::init_global_runtimes(&opts.runtime);
329
330        let guard = common_telemetry::init_global_logging(
331            APP_NAME,
332            &opts.component.logging,
333            &opts.component.tracing,
334            None,
335            Some(&opts.component.slow_query),
336        );
337
338        log_versions(verbose_version(), short_version(), APP_NAME);
339        maybe_activate_heap_profile(&opts.component.memory);
340        create_resource_limit_metrics(APP_NAME);
341
342        info!("Standalone start command: {:#?}", self);
343        info!("Standalone options: {opts:#?}");
344
345        let mut plugins = Plugins::new();
346        let plugin_opts = opts.plugins;
347        let mut opts = opts.component;
348        set_default_prefix(opts.default_column_prefix.as_deref())
349            .map_err(BoxedError::new)
350            .context(error::BuildCliSnafu)?;
351
352        opts.grpc.detect_server_addr();
353        let fe_opts = opts.frontend_options();
354        let dn_opts = opts.datanode_options();
355
356        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
357            .await
358            .context(error::StartFrontendSnafu)?;
359
360        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
361            .await
362            .context(error::StartDatanodeSnafu)?;
363
364        set_default_timezone(fe_opts.default_timezone.as_deref())
365            .context(error::InitTimezoneSnafu)?;
366
367        let data_home = &dn_opts.storage.data_home;
368        // Ensure the data_home directory exists.
369        fs::create_dir_all(path::Path::new(data_home))
370            .context(error::CreateDirSnafu { dir: data_home })?;
371
372        let metadata_dir = metadata_store_dir(data_home);
373        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
374            .context(error::BuildMetadataKvbackendSnafu)?;
375        let procedure_manager =
376            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
377
378        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
379            .await
380            .context(error::SetupStandalonePluginsSnafu)?;
381
382        // Builds cache registry
383        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
384        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
385        let layered_cache_registry = Arc::new(
386            with_default_composite_cache_registry(
387                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
388            )
389            .context(error::BuildCacheRegistrySnafu)?
390            .build(),
391        );
392
393        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
394        builder.with_cache_registry(layered_cache_registry.clone());
395        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
396
397        let information_extension = Arc::new(StandaloneInformationExtension::new(
398            datanode.region_server(),
399            procedure_manager.clone(),
400        ));
401
402        plugins.insert::<InformationExtensionRef>(information_extension.clone());
403
404        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
405
406        // for standalone not use grpc, but get a handler to frontend grpc client without
407        // actually make a connection
408        let (frontend_client, frontend_instance_handler) =
409            FrontendClient::from_empty_grpc_handler(opts.query.clone());
410        let frontend_client = Arc::new(frontend_client);
411
412        let builder = KvBackendCatalogManagerBuilder::new(
413            information_extension.clone(),
414            kv_backend.clone(),
415            layered_cache_registry.clone(),
416        )
417        .with_procedure_manager(procedure_manager.clone())
418        .with_process_manager(process_manager.clone());
419        let builder = if let Some(configurator) =
420            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
421        {
422            let ctx = StandaloneCatalogManagerConfigureContext {
423                fe_client: frontend_client.clone(),
424            };
425            let ctx = CatalogManagerConfigureContext::Standalone(ctx);
426            configurator
427                .configure(builder, ctx)
428                .await
429                .context(OtherSnafu)?
430        } else {
431            builder
432        };
433        let catalog_manager = builder.build();
434
435        let table_metadata_manager =
436            Self::create_table_metadata_manager(kv_backend.clone()).await?;
437
438        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
439        let flownode_options = FlownodeOptions {
440            flow: opts.flow.clone(),
441            ..Default::default()
442        };
443
444        let flow_builder = FlownodeBuilder::new(
445            flownode_options,
446            plugins.clone(),
447            table_metadata_manager.clone(),
448            catalog_manager.clone(),
449            flow_metadata_manager.clone(),
450            frontend_client.clone(),
451        );
452        let flownode = flow_builder
453            .build()
454            .await
455            .map_err(BoxedError::new)
456            .context(error::OtherSnafu)?;
457
458        // set the ref to query for the local flow state
459        {
460            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
461            information_extension
462                .set_flow_streaming_engine(flow_streaming_engine)
463                .await;
464        }
465
466        let node_manager = Arc::new(StandaloneDatanodeManager {
467            region_server: datanode.region_server(),
468            flow_server: flownode.flow_engine(),
469        });
470
471        let table_id_sequence = Arc::new(
472            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
473                .initial(MIN_USER_TABLE_ID as u64)
474                .step(10)
475                .build(),
476        );
477        let flow_id_sequence = Arc::new(
478            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
479                .initial(MIN_USER_FLOW_ID as u64)
480                .step(10)
481                .build(),
482        );
483        let kafka_options = opts
484            .wal
485            .clone()
486            .try_into()
487            .context(error::InvalidWalProviderSnafu)?;
488        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
489            .await
490            .context(error::BuildWalOptionsAllocatorSnafu)?;
491        let wal_options_allocator = Arc::new(wal_options_allocator);
492        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
493            table_id_sequence,
494            wal_options_allocator.clone(),
495        ));
496        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
497            flow_id_sequence,
498        ));
499
500        let ddl_context = DdlContext {
501            node_manager: node_manager.clone(),
502            cache_invalidator: layered_cache_registry.clone(),
503            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
504            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
505            table_metadata_manager: table_metadata_manager.clone(),
506            table_metadata_allocator: table_metadata_allocator.clone(),
507            flow_metadata_manager: flow_metadata_manager.clone(),
508            flow_metadata_allocator: flow_metadata_allocator.clone(),
509            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
510        };
511
512        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
513            .context(error::InitDdlManagerSnafu)?;
514
515        let ddl_manager = if let Some(configurator) =
516            plugins.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>()
517        {
518            let ctx = DdlManagerConfigureContext {
519                kv_backend: kv_backend.clone(),
520                fe_client: frontend_client.clone(),
521                catalog_manager: catalog_manager.clone(),
522            };
523            configurator
524                .configure(ddl_manager, ctx)
525                .await
526                .context(OtherSnafu)?
527        } else {
528            ddl_manager
529        };
530
531        let procedure_executor = Arc::new(LocalProcedureExecutor::new(
532            Arc::new(ddl_manager),
533            procedure_manager.clone(),
534        ));
535
536        let fe_instance = FrontendBuilder::new(
537            fe_opts.clone(),
538            kv_backend.clone(),
539            layered_cache_registry.clone(),
540            catalog_manager.clone(),
541            node_manager.clone(),
542            procedure_executor.clone(),
543            process_manager,
544        )
545        .with_plugin(plugins.clone())
546        .try_build()
547        .await
548        .context(error::StartFrontendSnafu)?;
549        let fe_instance = Arc::new(fe_instance);
550
551        // set the frontend client for flownode
552        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
553        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
554        frontend_instance_handler
555            .lock()
556            .unwrap()
557            .replace(weak_grpc_handler);
558
559        // set the frontend invoker for flownode
560        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
561        // flow server need to be able to use frontend to write insert requests back
562        let invoker = FrontendInvoker::build_from(
563            flow_streaming_engine.clone(),
564            catalog_manager.clone(),
565            kv_backend.clone(),
566            layered_cache_registry.clone(),
567            procedure_executor,
568            node_manager,
569        )
570        .await
571        .context(StartFlownodeSnafu)?;
572        flow_streaming_engine.set_frontend_invoker(invoker).await;
573
574        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
575            .build()
576            .context(error::StartFrontendSnafu)?;
577
578        let frontend = Frontend {
579            instance: fe_instance,
580            servers,
581            heartbeat_task: None,
582        };
583
584        Ok(Instance {
585            datanode,
586            frontend,
587            flownode,
588            procedure_manager,
589            wal_options_allocator,
590            _guard: guard,
591        })
592    }
593
594    pub async fn create_table_metadata_manager(
595        kv_backend: KvBackendRef,
596    ) -> Result<TableMetadataManagerRef> {
597        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
598
599        table_metadata_manager
600            .init()
601            .await
602            .context(error::InitMetadataSnafu)?;
603
604        Ok(table_metadata_manager)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use std::default::Default;
611    use std::io::Write;
612    use std::time::Duration;
613
614    use auth::{Identity, Password, UserProviderRef};
615    use common_base::readable_size::ReadableSize;
616    use common_config::ENV_VAR_SEP;
617    use common_test_util::temp_dir::create_named_temp_file;
618    use common_wal::config::DatanodeWalConfig;
619    use frontend::frontend::FrontendOptions;
620    use object_store::config::{FileConfig, GcsConfig};
621    use servers::grpc::GrpcOptions;
622
623    use super::*;
624    use crate::options::GlobalOptions;
625
626    #[tokio::test]
627    async fn test_try_from_start_command_to_anymap() {
628        let fe_opts = FrontendOptions {
629            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
630            ..Default::default()
631        };
632
633        let mut plugins = Plugins::new();
634        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
635            .await
636            .unwrap();
637
638        let provider = plugins.get::<UserProviderRef>().unwrap();
639        let result = provider
640            .authenticate(
641                Identity::UserId("test", None),
642                Password::PlainText("test".to_string().into()),
643            )
644            .await;
645        let _ = result.unwrap();
646    }
647
648    #[test]
649    fn test_toml() {
650        let opts = StandaloneOptions::default();
651        let toml_string = toml::to_string(&opts).unwrap();
652        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
653    }
654
655    #[test]
656    fn test_read_from_config_file() {
657        let mut file = create_named_temp_file();
658        let toml_str = r#"
659            enable_memory_catalog = true
660
661            [wal]
662            provider = "raft_engine"
663            dir = "./greptimedb_data/test/wal"
664            file_size = "1GB"
665            purge_threshold = "50GB"
666            purge_interval = "10m"
667            read_batch_size = 128
668            sync_write = false
669
670            [storage]
671            data_home = "./greptimedb_data/"
672            type = "File"
673
674            [[storage.providers]]
675            type = "Gcs"
676            bucket = "foo"
677            endpoint = "bar"
678
679            [[storage.providers]]
680            type = "S3"
681            access_key_id = "access_key_id"
682            secret_access_key = "secret_access_key"
683
684            [storage.compaction]
685            max_inflight_tasks = 3
686            max_files_in_level0 = 7
687            max_purge_tasks = 32
688
689            [storage.manifest]
690            checkpoint_margin = 9
691            gc_duration = '7s'
692
693            [http]
694            addr = "127.0.0.1:4000"
695            timeout = "33s"
696            body_limit = "128MB"
697
698            [opentsdb]
699            enable = true
700
701            [logging]
702            level = "debug"
703            dir = "./greptimedb_data/test/logs"
704        "#;
705        write!(file, "{}", toml_str).unwrap();
706        let cmd = StartCommand {
707            config_file: Some(file.path().to_str().unwrap().to_string()),
708            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
709            ..Default::default()
710        };
711
712        let options = cmd
713            .load_options(&GlobalOptions::default())
714            .unwrap()
715            .component;
716        let fe_opts = options.frontend_options();
717        let dn_opts = options.datanode_options();
718        let logging_opts = options.logging;
719        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
720        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
721        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
722        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
723        assert!(fe_opts.mysql.enable);
724        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
725        assert_eq!(2, fe_opts.mysql.runtime_size);
726        assert_eq!(None, fe_opts.mysql.reject_no_database);
727        assert!(fe_opts.influxdb.enable);
728        assert!(fe_opts.opentsdb.enable);
729
730        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
731            unreachable!()
732        };
733        assert_eq!(
734            "./greptimedb_data/test/wal",
735            raft_engine_config.dir.unwrap()
736        );
737
738        assert!(matches!(
739            &dn_opts.storage.store,
740            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
741        ));
742        assert_eq!(dn_opts.storage.providers.len(), 2);
743        assert!(matches!(
744            dn_opts.storage.providers[0],
745            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
746        ));
747        match &dn_opts.storage.providers[1] {
748            object_store::config::ObjectStoreConfig::S3(s3_config) => {
749                assert_eq!(
750                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
751                    format!("{:?}", s3_config.connection.access_key_id)
752                );
753            }
754            _ => {
755                unreachable!()
756            }
757        }
758
759        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
760        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
761    }
762
763    #[test]
764    fn test_load_log_options_from_cli() {
765        let cmd = StartCommand {
766            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
767            mysql_addr: Some("127.0.0.1:4002".to_string()),
768            postgres_addr: Some("127.0.0.1:4003".to_string()),
769            ..Default::default()
770        };
771
772        let opts = cmd
773            .load_options(&GlobalOptions {
774                log_dir: Some("./greptimedb_data/test/logs".to_string()),
775                log_level: Some("debug".to_string()),
776
777                #[cfg(feature = "tokio-console")]
778                tokio_console_addr: None,
779            })
780            .unwrap()
781            .component;
782
783        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
784        assert_eq!("debug", opts.logging.level.unwrap());
785    }
786
787    #[test]
788    fn test_config_precedence_order() {
789        let mut file = create_named_temp_file();
790        let toml_str = r#"
791            [http]
792            addr = "127.0.0.1:4000"
793
794            [logging]
795            level = "debug"
796        "#;
797        write!(file, "{}", toml_str).unwrap();
798
799        let env_prefix = "STANDALONE_UT";
800        temp_env::with_vars(
801            [
802                (
803                    // logging.dir = /other/log/dir
804                    [
805                        env_prefix.to_string(),
806                        "logging".to_uppercase(),
807                        "dir".to_uppercase(),
808                    ]
809                    .join(ENV_VAR_SEP),
810                    Some("/other/log/dir"),
811                ),
812                (
813                    // logging.level = info
814                    [
815                        env_prefix.to_string(),
816                        "logging".to_uppercase(),
817                        "level".to_uppercase(),
818                    ]
819                    .join(ENV_VAR_SEP),
820                    Some("info"),
821                ),
822                (
823                    // http.addr = 127.0.0.1:24000
824                    [
825                        env_prefix.to_string(),
826                        "http".to_uppercase(),
827                        "addr".to_uppercase(),
828                    ]
829                    .join(ENV_VAR_SEP),
830                    Some("127.0.0.1:24000"),
831                ),
832            ],
833            || {
834                let command = StartCommand {
835                    config_file: Some(file.path().to_str().unwrap().to_string()),
836                    http_addr: Some("127.0.0.1:14000".to_string()),
837                    env_prefix: env_prefix.to_string(),
838                    ..Default::default()
839                };
840
841                let opts = command.load_options(&Default::default()).unwrap().component;
842
843                // Should be read from env, env > default values.
844                assert_eq!(opts.logging.dir, "/other/log/dir");
845
846                // Should be read from config file, config file > env > default values.
847                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
848
849                // Should be read from cli, cli > config file > env > default values.
850                let fe_opts = opts.frontend_options();
851                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
852                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
853
854                // Should be default value.
855                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
856            },
857        );
858    }
859
860    #[test]
861    fn test_load_default_standalone_options() {
862        let options =
863            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
864        let default_options = StandaloneOptions::default();
865        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
866        assert_eq!(options.http, default_options.http);
867        assert_eq!(options.grpc, default_options.grpc);
868        assert_eq!(options.mysql, default_options.mysql);
869        assert_eq!(options.postgres, default_options.postgres);
870        assert_eq!(options.opentsdb, default_options.opentsdb);
871        assert_eq!(options.influxdb, default_options.influxdb);
872        assert_eq!(options.prom_store, default_options.prom_store);
873        assert_eq!(options.wal, default_options.wal);
874        assert_eq!(options.metadata_store, default_options.metadata_store);
875        assert_eq!(options.procedure, default_options.procedure);
876        assert_eq!(options.logging, default_options.logging);
877        assert_eq!(options.region_engine, default_options.region_engine);
878    }
879
880    #[test]
881    fn test_cache_config() {
882        let toml_str = r#"
883            [storage]
884            data_home = "test_data_home"
885            type = "S3"
886            [storage.cache_config]
887            enable_read_cache = true
888        "#;
889        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
890        opts.sanitize();
891        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
892        assert_eq!(
893            opts.storage.store.cache_config().unwrap().cache_path,
894            "test_data_home"
895        );
896    }
897}