cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::kvbackend::KvBackendCatalogManagerBuilder;
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use common_base::Plugins;
26use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
27use common_config::{Configurable, metadata_store_dir};
28use common_error::ext::BoxedError;
29use common_meta::cache::LayeredCacheRegistryBuilder;
30use common_meta::ddl::flow_meta::FlowMetadataAllocator;
31use common_meta::ddl::table_meta::TableMetadataAllocator;
32use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
33use common_meta::ddl_manager::DdlManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
36use common_meta::kv_backend::KvBackendRef;
37use common_meta::procedure_executor::LocalProcedureExecutor;
38use common_meta::region_keeper::MemoryRegionKeeper;
39use common_meta::region_registry::LeaderRegionRegistry;
40use common_meta::sequence::SequenceBuilder;
41use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
42use common_procedure::ProcedureManagerRef;
43use common_telemetry::info;
44use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
45use common_time::timezone::set_default_timezone;
46use common_version::{short_version, verbose_version};
47use datanode::config::DatanodeOptions;
48use datanode::datanode::{Datanode, DatanodeBuilder};
49use flow::{
50    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
51    GrpcQueryHandlerWithBoxedError,
52};
53use frontend::frontend::Frontend;
54use frontend::instance::StandaloneDatanodeManager;
55use frontend::instance::builder::FrontendBuilder;
56use frontend::server::Services;
57use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
58use servers::export_metrics::ExportMetricsTask;
59use servers::tls::{TlsMode, TlsOption};
60use snafu::ResultExt;
61use standalone::StandaloneInformationExtension;
62use standalone::options::StandaloneOptions;
63use tracing_appender::non_blocking::WorkerGuard;
64
65use crate::error::{Result, StartFlownodeSnafu};
66use crate::options::{GlobalOptions, GreptimeOptions};
67use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
68
69pub const APP_NAME: &str = "greptime-standalone";
70
71#[derive(Parser)]
72pub struct Command {
73    #[clap(subcommand)]
74    subcmd: SubCommand,
75}
76
77impl Command {
78    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
79        self.subcmd.build(opts).await
80    }
81
82    pub fn load_options(
83        &self,
84        global_options: &GlobalOptions,
85    ) -> Result<GreptimeOptions<StandaloneOptions>> {
86        self.subcmd.load_options(global_options)
87    }
88}
89
90#[derive(Parser)]
91enum SubCommand {
92    Start(StartCommand),
93}
94
95impl SubCommand {
96    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
97        match self {
98            SubCommand::Start(cmd) => cmd.build(opts).await,
99        }
100    }
101
102    fn load_options(
103        &self,
104        global_options: &GlobalOptions,
105    ) -> Result<GreptimeOptions<StandaloneOptions>> {
106        match self {
107            SubCommand::Start(cmd) => cmd.load_options(global_options),
108        }
109    }
110}
111
112pub struct Instance {
113    datanode: Datanode,
114    frontend: Frontend,
115    flownode: FlownodeInstance,
116    procedure_manager: ProcedureManagerRef,
117    wal_options_allocator: WalOptionsAllocatorRef,
118
119    // The components of standalone, which make it easier to expand based
120    // on the components.
121    #[cfg(feature = "enterprise")]
122    components: Components,
123
124    // Keep the logging guard to prevent the worker from being dropped.
125    _guard: Vec<WorkerGuard>,
126}
127
128#[cfg(feature = "enterprise")]
129pub struct Components {
130    pub plugins: Plugins,
131    pub kv_backend: KvBackendRef,
132    pub frontend_client: Arc<FrontendClient>,
133    pub catalog_manager: catalog::CatalogManagerRef,
134}
135
136impl Instance {
137    /// Find the socket addr of a server by its `name`.
138    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
139        self.frontend.server_handlers().addr(name)
140    }
141
142    #[cfg(feature = "enterprise")]
143    pub fn components(&self) -> &Components {
144        &self.components
145    }
146}
147
148#[async_trait]
149impl App for Instance {
150    fn name(&self) -> &str {
151        APP_NAME
152    }
153
154    async fn start(&mut self) -> Result<()> {
155        self.datanode.start_telemetry();
156
157        self.procedure_manager
158            .start()
159            .await
160            .context(error::StartProcedureManagerSnafu)?;
161
162        self.wal_options_allocator
163            .start()
164            .await
165            .context(error::StartWalOptionsAllocatorSnafu)?;
166
167        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
168            .await
169            .context(error::StartFrontendSnafu)?;
170
171        self.frontend
172            .start()
173            .await
174            .context(error::StartFrontendSnafu)?;
175
176        self.flownode.start().await.context(StartFlownodeSnafu)?;
177
178        Ok(())
179    }
180
181    async fn stop(&mut self) -> Result<()> {
182        self.frontend
183            .shutdown()
184            .await
185            .context(error::ShutdownFrontendSnafu)?;
186
187        self.procedure_manager
188            .stop()
189            .await
190            .context(error::StopProcedureManagerSnafu)?;
191
192        self.datanode
193            .shutdown()
194            .await
195            .context(error::ShutdownDatanodeSnafu)?;
196
197        self.flownode
198            .shutdown()
199            .await
200            .context(error::ShutdownFlownodeSnafu)?;
201
202        info!("Datanode instance stopped.");
203
204        Ok(())
205    }
206}
207
208#[derive(Debug, Default, Parser)]
209pub struct StartCommand {
210    #[clap(long)]
211    http_addr: Option<String>,
212    #[clap(long, alias = "rpc-addr")]
213    rpc_bind_addr: Option<String>,
214    #[clap(long)]
215    mysql_addr: Option<String>,
216    #[clap(long)]
217    postgres_addr: Option<String>,
218    #[clap(short, long)]
219    influxdb_enable: bool,
220    #[clap(short, long)]
221    pub config_file: Option<String>,
222    #[clap(long)]
223    tls_mode: Option<TlsMode>,
224    #[clap(long)]
225    tls_cert_path: Option<String>,
226    #[clap(long)]
227    tls_key_path: Option<String>,
228    #[clap(long)]
229    user_provider: Option<String>,
230    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
231    pub env_prefix: String,
232    /// The working home directory of this standalone instance.
233    #[clap(long)]
234    data_home: Option<String>,
235}
236
237impl StartCommand {
238    /// Load the GreptimeDB options from various sources (command line, config file or env).
239    pub fn load_options(
240        &self,
241        global_options: &GlobalOptions,
242    ) -> Result<GreptimeOptions<StandaloneOptions>> {
243        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
244            self.config_file.as_deref(),
245            self.env_prefix.as_ref(),
246        )
247        .context(error::LoadLayeredConfigSnafu)?;
248
249        self.merge_with_cli_options(global_options, &mut opts.component)?;
250        opts.component.sanitize();
251
252        Ok(opts)
253    }
254
255    // The precedence order is: cli > config file > environment variables > default values.
256    pub fn merge_with_cli_options(
257        &self,
258        global_options: &GlobalOptions,
259        opts: &mut StandaloneOptions,
260    ) -> Result<()> {
261        if let Some(dir) = &global_options.log_dir {
262            opts.logging.dir.clone_from(dir);
263        }
264
265        if global_options.log_level.is_some() {
266            opts.logging.level.clone_from(&global_options.log_level);
267        }
268
269        opts.tracing = TracingOptions {
270            #[cfg(feature = "tokio-console")]
271            tokio_console_addr: global_options.tokio_console_addr.clone(),
272        };
273
274        let tls_opts = TlsOption::new(
275            self.tls_mode.clone(),
276            self.tls_cert_path.clone(),
277            self.tls_key_path.clone(),
278        );
279
280        if let Some(addr) = &self.http_addr {
281            opts.http.addr.clone_from(addr);
282        }
283
284        if let Some(data_home) = &self.data_home {
285            opts.storage.data_home.clone_from(data_home);
286        }
287
288        // If the logging dir is not set, use the default logs dir in the data home.
289        if opts.logging.dir.is_empty() {
290            opts.logging.dir = Path::new(&opts.storage.data_home)
291                .join(DEFAULT_LOGGING_DIR)
292                .to_string_lossy()
293                .to_string();
294        }
295
296        if let Some(addr) = &self.rpc_bind_addr {
297            // frontend grpc addr conflict with datanode default grpc addr
298            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
299            if addr.eq(&datanode_grpc_addr) {
300                return error::IllegalConfigSnafu {
301                    msg: format!(
302                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
303                    ),
304                }.fail();
305            }
306            opts.grpc.bind_addr.clone_from(addr)
307        }
308
309        if let Some(addr) = &self.mysql_addr {
310            opts.mysql.enable = true;
311            opts.mysql.addr.clone_from(addr);
312            opts.mysql.tls = tls_opts.clone();
313        }
314
315        if let Some(addr) = &self.postgres_addr {
316            opts.postgres.enable = true;
317            opts.postgres.addr.clone_from(addr);
318            opts.postgres.tls = tls_opts;
319        }
320
321        if self.influxdb_enable {
322            opts.influxdb.enable = self.influxdb_enable;
323        }
324
325        if let Some(user_provider) = &self.user_provider {
326            opts.user_provider = Some(user_provider.clone());
327        }
328
329        Ok(())
330    }
331
332    #[allow(unreachable_code)]
333    #[allow(unused_variables)]
334    #[allow(clippy::diverging_sub_expression)]
335    /// Build GreptimeDB instance with the loaded options.
336    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
337        common_runtime::init_global_runtimes(&opts.runtime);
338
339        let guard = common_telemetry::init_global_logging(
340            APP_NAME,
341            &opts.component.logging,
342            &opts.component.tracing,
343            None,
344            Some(&opts.component.slow_query),
345        );
346
347        log_versions(verbose_version(), short_version(), APP_NAME);
348        maybe_activate_heap_profile(&opts.component.memory);
349        create_resource_limit_metrics(APP_NAME);
350
351        info!("Standalone start command: {:#?}", self);
352        info!("Standalone options: {opts:#?}");
353
354        let mut plugins = Plugins::new();
355        let plugin_opts = opts.plugins;
356        let mut opts = opts.component;
357        opts.grpc.detect_server_addr();
358        let fe_opts = opts.frontend_options();
359        let dn_opts = opts.datanode_options();
360
361        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
362            .await
363            .context(error::StartFrontendSnafu)?;
364
365        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
366            .await
367            .context(error::StartDatanodeSnafu)?;
368
369        set_default_timezone(fe_opts.default_timezone.as_deref())
370            .context(error::InitTimezoneSnafu)?;
371
372        let data_home = &dn_opts.storage.data_home;
373        // Ensure the data_home directory exists.
374        fs::create_dir_all(path::Path::new(data_home))
375            .context(error::CreateDirSnafu { dir: data_home })?;
376
377        let metadata_dir = metadata_store_dir(data_home);
378        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
379            .context(error::BuildMetadataKvbackendSnafu)?;
380        let procedure_manager =
381            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
382
383        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
384            .await
385            .context(error::SetupStandalonePluginsSnafu)?;
386
387        // Builds cache registry
388        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
389        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
390        let layered_cache_registry = Arc::new(
391            with_default_composite_cache_registry(
392                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
393            )
394            .context(error::BuildCacheRegistrySnafu)?
395            .build(),
396        );
397
398        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
399        builder.with_cache_registry(layered_cache_registry.clone());
400        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
401
402        let information_extension = Arc::new(StandaloneInformationExtension::new(
403            datanode.region_server(),
404            procedure_manager.clone(),
405        ));
406
407        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
408        let builder = KvBackendCatalogManagerBuilder::new(
409            information_extension.clone(),
410            kv_backend.clone(),
411            layered_cache_registry.clone(),
412        )
413        .with_procedure_manager(procedure_manager.clone())
414        .with_process_manager(process_manager.clone());
415        #[cfg(feature = "enterprise")]
416        let builder = if let Some(factories) = plugins.get() {
417            builder.with_extra_information_table_factories(factories)
418        } else {
419            builder
420        };
421        let catalog_manager = builder.build();
422
423        let table_metadata_manager =
424            Self::create_table_metadata_manager(kv_backend.clone()).await?;
425
426        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
427        let flownode_options = FlownodeOptions {
428            flow: opts.flow.clone(),
429            ..Default::default()
430        };
431
432        // for standalone not use grpc, but get a handler to frontend grpc client without
433        // actually make a connection
434        let (frontend_client, frontend_instance_handler) =
435            FrontendClient::from_empty_grpc_handler(opts.query.clone());
436        let frontend_client = Arc::new(frontend_client);
437        let flow_builder = FlownodeBuilder::new(
438            flownode_options,
439            plugins.clone(),
440            table_metadata_manager.clone(),
441            catalog_manager.clone(),
442            flow_metadata_manager.clone(),
443            frontend_client.clone(),
444        );
445        let flownode = flow_builder
446            .build()
447            .await
448            .map_err(BoxedError::new)
449            .context(error::OtherSnafu)?;
450
451        // set the ref to query for the local flow state
452        {
453            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
454            information_extension
455                .set_flow_streaming_engine(flow_streaming_engine)
456                .await;
457        }
458
459        let node_manager = Arc::new(StandaloneDatanodeManager {
460            region_server: datanode.region_server(),
461            flow_server: flownode.flow_engine(),
462        });
463
464        let table_id_sequence = Arc::new(
465            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
466                .initial(MIN_USER_TABLE_ID as u64)
467                .step(10)
468                .build(),
469        );
470        let flow_id_sequence = Arc::new(
471            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
472                .initial(MIN_USER_FLOW_ID as u64)
473                .step(10)
474                .build(),
475        );
476        let kafka_options = opts.wal.clone().into();
477        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
478            .await
479            .context(error::BuildWalOptionsAllocatorSnafu)?;
480        let wal_options_allocator = Arc::new(wal_options_allocator);
481        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
482            table_id_sequence,
483            wal_options_allocator.clone(),
484        ));
485        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
486            flow_id_sequence,
487        ));
488
489        let ddl_context = DdlContext {
490            node_manager: node_manager.clone(),
491            cache_invalidator: layered_cache_registry.clone(),
492            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
493            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
494            table_metadata_manager: table_metadata_manager.clone(),
495            table_metadata_allocator: table_metadata_allocator.clone(),
496            flow_metadata_manager: flow_metadata_manager.clone(),
497            flow_metadata_allocator: flow_metadata_allocator.clone(),
498            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
499        };
500
501        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
502            .context(error::InitDdlManagerSnafu)?;
503        #[cfg(feature = "enterprise")]
504        let ddl_manager = {
505            let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
506                plugins.get();
507            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
508        };
509
510        let procedure_executor = Arc::new(LocalProcedureExecutor::new(
511            Arc::new(ddl_manager),
512            procedure_manager.clone(),
513        ));
514
515        let fe_instance = FrontendBuilder::new(
516            fe_opts.clone(),
517            kv_backend.clone(),
518            layered_cache_registry.clone(),
519            catalog_manager.clone(),
520            node_manager.clone(),
521            procedure_executor.clone(),
522            process_manager,
523        )
524        .with_plugin(plugins.clone())
525        .try_build()
526        .await
527        .context(error::StartFrontendSnafu)?;
528        let fe_instance = Arc::new(fe_instance);
529
530        // set the frontend client for flownode
531        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
532        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
533        frontend_instance_handler
534            .lock()
535            .unwrap()
536            .replace(weak_grpc_handler);
537
538        // set the frontend invoker for flownode
539        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
540        // flow server need to be able to use frontend to write insert requests back
541        let invoker = FrontendInvoker::build_from(
542            flow_streaming_engine.clone(),
543            catalog_manager.clone(),
544            kv_backend.clone(),
545            layered_cache_registry.clone(),
546            procedure_executor,
547            node_manager,
548        )
549        .await
550        .context(StartFlownodeSnafu)?;
551        flow_streaming_engine.set_frontend_invoker(invoker).await;
552
553        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
554            .context(error::ServersSnafu)?;
555
556        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
557            .build()
558            .context(error::StartFrontendSnafu)?;
559
560        let frontend = Frontend {
561            instance: fe_instance,
562            servers,
563            heartbeat_task: None,
564            export_metrics_task,
565        };
566
567        #[cfg(feature = "enterprise")]
568        let components = Components {
569            plugins,
570            kv_backend,
571            frontend_client,
572            catalog_manager,
573        };
574
575        Ok(Instance {
576            datanode,
577            frontend,
578            flownode,
579            procedure_manager,
580            wal_options_allocator,
581            #[cfg(feature = "enterprise")]
582            components,
583            _guard: guard,
584        })
585    }
586
587    pub async fn create_table_metadata_manager(
588        kv_backend: KvBackendRef,
589    ) -> Result<TableMetadataManagerRef> {
590        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
591
592        table_metadata_manager
593            .init()
594            .await
595            .context(error::InitMetadataSnafu)?;
596
597        Ok(table_metadata_manager)
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use std::default::Default;
604    use std::io::Write;
605    use std::time::Duration;
606
607    use auth::{Identity, Password, UserProviderRef};
608    use common_base::readable_size::ReadableSize;
609    use common_config::ENV_VAR_SEP;
610    use common_test_util::temp_dir::create_named_temp_file;
611    use common_wal::config::DatanodeWalConfig;
612    use frontend::frontend::FrontendOptions;
613    use object_store::config::{FileConfig, GcsConfig};
614    use servers::grpc::GrpcOptions;
615
616    use super::*;
617    use crate::options::GlobalOptions;
618
619    #[tokio::test]
620    async fn test_try_from_start_command_to_anymap() {
621        let fe_opts = FrontendOptions {
622            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
623            ..Default::default()
624        };
625
626        let mut plugins = Plugins::new();
627        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
628            .await
629            .unwrap();
630
631        let provider = plugins.get::<UserProviderRef>().unwrap();
632        let result = provider
633            .authenticate(
634                Identity::UserId("test", None),
635                Password::PlainText("test".to_string().into()),
636            )
637            .await;
638        let _ = result.unwrap();
639    }
640
641    #[test]
642    fn test_toml() {
643        let opts = StandaloneOptions::default();
644        let toml_string = toml::to_string(&opts).unwrap();
645        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
646    }
647
648    #[test]
649    fn test_read_from_config_file() {
650        let mut file = create_named_temp_file();
651        let toml_str = r#"
652            enable_memory_catalog = true
653
654            [wal]
655            provider = "raft_engine"
656            dir = "./greptimedb_data/test/wal"
657            file_size = "1GB"
658            purge_threshold = "50GB"
659            purge_interval = "10m"
660            read_batch_size = 128
661            sync_write = false
662
663            [storage]
664            data_home = "./greptimedb_data/"
665            type = "File"
666
667            [[storage.providers]]
668            type = "Gcs"
669            bucket = "foo"
670            endpoint = "bar"
671
672            [[storage.providers]]
673            type = "S3"
674            access_key_id = "access_key_id"
675            secret_access_key = "secret_access_key"
676
677            [storage.compaction]
678            max_inflight_tasks = 3
679            max_files_in_level0 = 7
680            max_purge_tasks = 32
681
682            [storage.manifest]
683            checkpoint_margin = 9
684            gc_duration = '7s'
685
686            [http]
687            addr = "127.0.0.1:4000"
688            timeout = "33s"
689            body_limit = "128MB"
690
691            [opentsdb]
692            enable = true
693
694            [logging]
695            level = "debug"
696            dir = "./greptimedb_data/test/logs"
697        "#;
698        write!(file, "{}", toml_str).unwrap();
699        let cmd = StartCommand {
700            config_file: Some(file.path().to_str().unwrap().to_string()),
701            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
702            ..Default::default()
703        };
704
705        let options = cmd
706            .load_options(&GlobalOptions::default())
707            .unwrap()
708            .component;
709        let fe_opts = options.frontend_options();
710        let dn_opts = options.datanode_options();
711        let logging_opts = options.logging;
712        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
713        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
714        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
715        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
716        assert!(fe_opts.mysql.enable);
717        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
718        assert_eq!(2, fe_opts.mysql.runtime_size);
719        assert_eq!(None, fe_opts.mysql.reject_no_database);
720        assert!(fe_opts.influxdb.enable);
721        assert!(fe_opts.opentsdb.enable);
722
723        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
724            unreachable!()
725        };
726        assert_eq!(
727            "./greptimedb_data/test/wal",
728            raft_engine_config.dir.unwrap()
729        );
730
731        assert!(matches!(
732            &dn_opts.storage.store,
733            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
734        ));
735        assert_eq!(dn_opts.storage.providers.len(), 2);
736        assert!(matches!(
737            dn_opts.storage.providers[0],
738            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
739        ));
740        match &dn_opts.storage.providers[1] {
741            object_store::config::ObjectStoreConfig::S3(s3_config) => {
742                assert_eq!(
743                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
744                    format!("{:?}", s3_config.connection.access_key_id)
745                );
746            }
747            _ => {
748                unreachable!()
749            }
750        }
751
752        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
753        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
754    }
755
756    #[test]
757    fn test_load_log_options_from_cli() {
758        let cmd = StartCommand {
759            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
760            ..Default::default()
761        };
762
763        let opts = cmd
764            .load_options(&GlobalOptions {
765                log_dir: Some("./greptimedb_data/test/logs".to_string()),
766                log_level: Some("debug".to_string()),
767
768                #[cfg(feature = "tokio-console")]
769                tokio_console_addr: None,
770            })
771            .unwrap()
772            .component;
773
774        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
775        assert_eq!("debug", opts.logging.level.unwrap());
776    }
777
778    #[test]
779    fn test_config_precedence_order() {
780        let mut file = create_named_temp_file();
781        let toml_str = r#"
782            [http]
783            addr = "127.0.0.1:4000"
784
785            [logging]
786            level = "debug"
787        "#;
788        write!(file, "{}", toml_str).unwrap();
789
790        let env_prefix = "STANDALONE_UT";
791        temp_env::with_vars(
792            [
793                (
794                    // logging.dir = /other/log/dir
795                    [
796                        env_prefix.to_string(),
797                        "logging".to_uppercase(),
798                        "dir".to_uppercase(),
799                    ]
800                    .join(ENV_VAR_SEP),
801                    Some("/other/log/dir"),
802                ),
803                (
804                    // logging.level = info
805                    [
806                        env_prefix.to_string(),
807                        "logging".to_uppercase(),
808                        "level".to_uppercase(),
809                    ]
810                    .join(ENV_VAR_SEP),
811                    Some("info"),
812                ),
813                (
814                    // http.addr = 127.0.0.1:24000
815                    [
816                        env_prefix.to_string(),
817                        "http".to_uppercase(),
818                        "addr".to_uppercase(),
819                    ]
820                    .join(ENV_VAR_SEP),
821                    Some("127.0.0.1:24000"),
822                ),
823            ],
824            || {
825                let command = StartCommand {
826                    config_file: Some(file.path().to_str().unwrap().to_string()),
827                    http_addr: Some("127.0.0.1:14000".to_string()),
828                    env_prefix: env_prefix.to_string(),
829                    ..Default::default()
830                };
831
832                let opts = command.load_options(&Default::default()).unwrap().component;
833
834                // Should be read from env, env > default values.
835                assert_eq!(opts.logging.dir, "/other/log/dir");
836
837                // Should be read from config file, config file > env > default values.
838                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
839
840                // Should be read from cli, cli > config file > env > default values.
841                let fe_opts = opts.frontend_options();
842                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
843                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
844
845                // Should be default value.
846                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
847            },
848        );
849    }
850
851    #[test]
852    fn test_load_default_standalone_options() {
853        let options =
854            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
855        let default_options = StandaloneOptions::default();
856        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
857        assert_eq!(options.http, default_options.http);
858        assert_eq!(options.grpc, default_options.grpc);
859        assert_eq!(options.mysql, default_options.mysql);
860        assert_eq!(options.postgres, default_options.postgres);
861        assert_eq!(options.opentsdb, default_options.opentsdb);
862        assert_eq!(options.influxdb, default_options.influxdb);
863        assert_eq!(options.prom_store, default_options.prom_store);
864        assert_eq!(options.wal, default_options.wal);
865        assert_eq!(options.metadata_store, default_options.metadata_store);
866        assert_eq!(options.procedure, default_options.procedure);
867        assert_eq!(options.logging, default_options.logging);
868        assert_eq!(options.region_engine, default_options.region_engine);
869    }
870
871    #[test]
872    fn test_cache_config() {
873        let toml_str = r#"
874            [storage]
875            data_home = "test_data_home"
876            type = "S3"
877            [storage.cache_config]
878            enable_read_cache = true
879        "#;
880        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
881        opts.sanitize();
882        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
883        assert_eq!(
884            opts.storage.store.cache_config().unwrap().cache_path,
885            "test_data_home"
886        );
887    }
888}