cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtensionRef;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{Configurable, metadata_store_dir};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::ddl::flow_meta::FlowMetadataAllocator;
32use common_meta::ddl::table_meta::TableMetadataAllocator;
33use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
34use common_meta::ddl_manager::DdlManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
37use common_meta::kv_backend::KvBackendRef;
38use common_meta::procedure_executor::LocalProcedureExecutor;
39use common_meta::region_keeper::MemoryRegionKeeper;
40use common_meta::region_registry::LeaderRegionRegistry;
41use common_meta::sequence::SequenceBuilder;
42use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::info;
45use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
46use common_time::timezone::set_default_timezone;
47use common_version::{short_version, verbose_version};
48use datanode::config::DatanodeOptions;
49use datanode::datanode::{Datanode, DatanodeBuilder};
50use flow::{
51    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
52    GrpcQueryHandlerWithBoxedError,
53};
54use frontend::frontend::Frontend;
55use frontend::instance::StandaloneDatanodeManager;
56use frontend::instance::builder::FrontendBuilder;
57use frontend::server::Services;
58use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
59use servers::export_metrics::ExportMetricsTask;
60use servers::tls::{TlsMode, TlsOption};
61use snafu::ResultExt;
62use standalone::StandaloneInformationExtension;
63use standalone::options::StandaloneOptions;
64use tracing_appender::non_blocking::WorkerGuard;
65
66use crate::error::{Result, StartFlownodeSnafu};
67use crate::options::{GlobalOptions, GreptimeOptions};
68use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
69
70pub const APP_NAME: &str = "greptime-standalone";
71
72#[derive(Parser)]
73pub struct Command {
74    #[clap(subcommand)]
75    subcmd: SubCommand,
76}
77
78impl Command {
79    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
80        self.subcmd.build(opts).await
81    }
82
83    pub fn load_options(
84        &self,
85        global_options: &GlobalOptions,
86    ) -> Result<GreptimeOptions<StandaloneOptions>> {
87        self.subcmd.load_options(global_options)
88    }
89}
90
91#[derive(Parser)]
92enum SubCommand {
93    Start(StartCommand),
94}
95
96impl SubCommand {
97    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98        match self {
99            SubCommand::Start(cmd) => cmd.build(opts).await,
100        }
101    }
102
103    fn load_options(
104        &self,
105        global_options: &GlobalOptions,
106    ) -> Result<GreptimeOptions<StandaloneOptions>> {
107        match self {
108            SubCommand::Start(cmd) => cmd.load_options(global_options),
109        }
110    }
111}
112
113pub struct Instance {
114    datanode: Datanode,
115    frontend: Frontend,
116    flownode: FlownodeInstance,
117    procedure_manager: ProcedureManagerRef,
118    wal_options_allocator: WalOptionsAllocatorRef,
119
120    // The components of standalone, which make it easier to expand based
121    // on the components.
122    #[cfg(feature = "enterprise")]
123    components: Components,
124
125    // Keep the logging guard to prevent the worker from being dropped.
126    _guard: Vec<WorkerGuard>,
127}
128
129#[cfg(feature = "enterprise")]
130pub struct Components {
131    pub plugins: Plugins,
132    pub kv_backend: KvBackendRef,
133    pub frontend_client: Arc<FrontendClient>,
134    pub catalog_manager: catalog::CatalogManagerRef,
135}
136
137impl Instance {
138    /// Find the socket addr of a server by its `name`.
139    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
140        self.frontend.server_handlers().addr(name)
141    }
142
143    #[cfg(feature = "enterprise")]
144    pub fn components(&self) -> &Components {
145        &self.components
146    }
147}
148
149#[async_trait]
150impl App for Instance {
151    fn name(&self) -> &str {
152        APP_NAME
153    }
154
155    async fn start(&mut self) -> Result<()> {
156        self.datanode.start_telemetry();
157
158        self.procedure_manager
159            .start()
160            .await
161            .context(error::StartProcedureManagerSnafu)?;
162
163        self.wal_options_allocator
164            .start()
165            .await
166            .context(error::StartWalOptionsAllocatorSnafu)?;
167
168        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
169            .await
170            .context(error::StartFrontendSnafu)?;
171
172        self.frontend
173            .start()
174            .await
175            .context(error::StartFrontendSnafu)?;
176
177        self.flownode.start().await.context(StartFlownodeSnafu)?;
178
179        Ok(())
180    }
181
182    async fn stop(&mut self) -> Result<()> {
183        self.frontend
184            .shutdown()
185            .await
186            .context(error::ShutdownFrontendSnafu)?;
187
188        self.procedure_manager
189            .stop()
190            .await
191            .context(error::StopProcedureManagerSnafu)?;
192
193        self.datanode
194            .shutdown()
195            .await
196            .context(error::ShutdownDatanodeSnafu)?;
197
198        self.flownode
199            .shutdown()
200            .await
201            .context(error::ShutdownFlownodeSnafu)?;
202
203        info!("Datanode instance stopped.");
204
205        Ok(())
206    }
207}
208
209#[derive(Debug, Default, Parser)]
210pub struct StartCommand {
211    #[clap(long)]
212    http_addr: Option<String>,
213    #[clap(long, alias = "rpc-addr")]
214    rpc_bind_addr: Option<String>,
215    #[clap(long)]
216    mysql_addr: Option<String>,
217    #[clap(long)]
218    postgres_addr: Option<String>,
219    #[clap(short, long)]
220    influxdb_enable: bool,
221    #[clap(short, long)]
222    pub config_file: Option<String>,
223    #[clap(long)]
224    tls_mode: Option<TlsMode>,
225    #[clap(long)]
226    tls_cert_path: Option<String>,
227    #[clap(long)]
228    tls_key_path: Option<String>,
229    #[clap(long)]
230    user_provider: Option<String>,
231    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
232    pub env_prefix: String,
233    /// The working home directory of this standalone instance.
234    #[clap(long)]
235    data_home: Option<String>,
236}
237
238impl StartCommand {
239    /// Load the GreptimeDB options from various sources (command line, config file or env).
240    pub fn load_options(
241        &self,
242        global_options: &GlobalOptions,
243    ) -> Result<GreptimeOptions<StandaloneOptions>> {
244        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
245            self.config_file.as_deref(),
246            self.env_prefix.as_ref(),
247        )
248        .context(error::LoadLayeredConfigSnafu)?;
249
250        self.merge_with_cli_options(global_options, &mut opts.component)?;
251        opts.component.sanitize();
252
253        Ok(opts)
254    }
255
256    // The precedence order is: cli > config file > environment variables > default values.
257    pub fn merge_with_cli_options(
258        &self,
259        global_options: &GlobalOptions,
260        opts: &mut StandaloneOptions,
261    ) -> Result<()> {
262        if let Some(dir) = &global_options.log_dir {
263            opts.logging.dir.clone_from(dir);
264        }
265
266        if global_options.log_level.is_some() {
267            opts.logging.level.clone_from(&global_options.log_level);
268        }
269
270        opts.tracing = TracingOptions {
271            #[cfg(feature = "tokio-console")]
272            tokio_console_addr: global_options.tokio_console_addr.clone(),
273        };
274
275        let tls_opts = TlsOption::new(
276            self.tls_mode.clone(),
277            self.tls_cert_path.clone(),
278            self.tls_key_path.clone(),
279        );
280
281        if let Some(addr) = &self.http_addr {
282            opts.http.addr.clone_from(addr);
283        }
284
285        if let Some(data_home) = &self.data_home {
286            opts.storage.data_home.clone_from(data_home);
287        }
288
289        // If the logging dir is not set, use the default logs dir in the data home.
290        if opts.logging.dir.is_empty() {
291            opts.logging.dir = Path::new(&opts.storage.data_home)
292                .join(DEFAULT_LOGGING_DIR)
293                .to_string_lossy()
294                .to_string();
295        }
296
297        if let Some(addr) = &self.rpc_bind_addr {
298            // frontend grpc addr conflict with datanode default grpc addr
299            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
300            if addr.eq(&datanode_grpc_addr) {
301                return error::IllegalConfigSnafu {
302                    msg: format!(
303                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
304                    ),
305                }.fail();
306            }
307            opts.grpc.bind_addr.clone_from(addr)
308        }
309
310        if let Some(addr) = &self.mysql_addr {
311            opts.mysql.enable = true;
312            opts.mysql.addr.clone_from(addr);
313            opts.mysql.tls = tls_opts.clone();
314        }
315
316        if let Some(addr) = &self.postgres_addr {
317            opts.postgres.enable = true;
318            opts.postgres.addr.clone_from(addr);
319            opts.postgres.tls = tls_opts;
320        }
321
322        if self.influxdb_enable {
323            opts.influxdb.enable = self.influxdb_enable;
324        }
325
326        if let Some(user_provider) = &self.user_provider {
327            opts.user_provider = Some(user_provider.clone());
328        }
329
330        Ok(())
331    }
332
333    #[allow(unreachable_code)]
334    #[allow(unused_variables)]
335    #[allow(clippy::diverging_sub_expression)]
336    /// Build GreptimeDB instance with the loaded options.
337    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
338        common_runtime::init_global_runtimes(&opts.runtime);
339
340        let guard = common_telemetry::init_global_logging(
341            APP_NAME,
342            &opts.component.logging,
343            &opts.component.tracing,
344            None,
345            Some(&opts.component.slow_query),
346        );
347
348        log_versions(verbose_version(), short_version(), APP_NAME);
349        maybe_activate_heap_profile(&opts.component.memory);
350        create_resource_limit_metrics(APP_NAME);
351
352        info!("Standalone start command: {:#?}", self);
353        info!("Standalone options: {opts:#?}");
354
355        let mut plugins = Plugins::new();
356        let plugin_opts = opts.plugins;
357        let mut opts = opts.component;
358        opts.grpc.detect_server_addr();
359        let fe_opts = opts.frontend_options();
360        let dn_opts = opts.datanode_options();
361
362        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
363            .await
364            .context(error::StartFrontendSnafu)?;
365
366        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
367            .await
368            .context(error::StartDatanodeSnafu)?;
369
370        set_default_timezone(fe_opts.default_timezone.as_deref())
371            .context(error::InitTimezoneSnafu)?;
372
373        let data_home = &dn_opts.storage.data_home;
374        // Ensure the data_home directory exists.
375        fs::create_dir_all(path::Path::new(data_home))
376            .context(error::CreateDirSnafu { dir: data_home })?;
377
378        let metadata_dir = metadata_store_dir(data_home);
379        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
380            .context(error::BuildMetadataKvbackendSnafu)?;
381        let procedure_manager =
382            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
383
384        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
385            .await
386            .context(error::SetupStandalonePluginsSnafu)?;
387
388        // Builds cache registry
389        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
390        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
391        let layered_cache_registry = Arc::new(
392            with_default_composite_cache_registry(
393                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
394            )
395            .context(error::BuildCacheRegistrySnafu)?
396            .build(),
397        );
398
399        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
400        builder.with_cache_registry(layered_cache_registry.clone());
401        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
402
403        let information_extension = Arc::new(StandaloneInformationExtension::new(
404            datanode.region_server(),
405            procedure_manager.clone(),
406        ));
407
408        plugins.insert::<InformationExtensionRef>(information_extension.clone());
409
410        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
411        let builder = KvBackendCatalogManagerBuilder::new(
412            information_extension.clone(),
413            kv_backend.clone(),
414            layered_cache_registry.clone(),
415        )
416        .with_procedure_manager(procedure_manager.clone())
417        .with_process_manager(process_manager.clone());
418        #[cfg(feature = "enterprise")]
419        let builder = if let Some(factories) = plugins.get() {
420            builder.with_extra_information_table_factories(factories)
421        } else {
422            builder
423        };
424        let catalog_manager = builder.build();
425
426        let table_metadata_manager =
427            Self::create_table_metadata_manager(kv_backend.clone()).await?;
428
429        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
430        let flownode_options = FlownodeOptions {
431            flow: opts.flow.clone(),
432            ..Default::default()
433        };
434
435        // for standalone not use grpc, but get a handler to frontend grpc client without
436        // actually make a connection
437        let (frontend_client, frontend_instance_handler) =
438            FrontendClient::from_empty_grpc_handler(opts.query.clone());
439        let frontend_client = Arc::new(frontend_client);
440        let flow_builder = FlownodeBuilder::new(
441            flownode_options,
442            plugins.clone(),
443            table_metadata_manager.clone(),
444            catalog_manager.clone(),
445            flow_metadata_manager.clone(),
446            frontend_client.clone(),
447        );
448        let flownode = flow_builder
449            .build()
450            .await
451            .map_err(BoxedError::new)
452            .context(error::OtherSnafu)?;
453
454        // set the ref to query for the local flow state
455        {
456            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
457            information_extension
458                .set_flow_streaming_engine(flow_streaming_engine)
459                .await;
460        }
461
462        let node_manager = Arc::new(StandaloneDatanodeManager {
463            region_server: datanode.region_server(),
464            flow_server: flownode.flow_engine(),
465        });
466
467        let table_id_sequence = Arc::new(
468            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
469                .initial(MIN_USER_TABLE_ID as u64)
470                .step(10)
471                .build(),
472        );
473        let flow_id_sequence = Arc::new(
474            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
475                .initial(MIN_USER_FLOW_ID as u64)
476                .step(10)
477                .build(),
478        );
479        let kafka_options = opts
480            .wal
481            .clone()
482            .try_into()
483            .context(error::InvalidWalProviderSnafu)?;
484        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
485            .await
486            .context(error::BuildWalOptionsAllocatorSnafu)?;
487        let wal_options_allocator = Arc::new(wal_options_allocator);
488        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
489            table_id_sequence,
490            wal_options_allocator.clone(),
491        ));
492        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
493            flow_id_sequence,
494        ));
495
496        let ddl_context = DdlContext {
497            node_manager: node_manager.clone(),
498            cache_invalidator: layered_cache_registry.clone(),
499            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
500            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
501            table_metadata_manager: table_metadata_manager.clone(),
502            table_metadata_allocator: table_metadata_allocator.clone(),
503            flow_metadata_manager: flow_metadata_manager.clone(),
504            flow_metadata_allocator: flow_metadata_allocator.clone(),
505            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
506        };
507
508        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
509            .context(error::InitDdlManagerSnafu)?;
510        #[cfg(feature = "enterprise")]
511        let ddl_manager = {
512            let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
513                plugins.get();
514            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
515        };
516
517        let procedure_executor = Arc::new(LocalProcedureExecutor::new(
518            Arc::new(ddl_manager),
519            procedure_manager.clone(),
520        ));
521
522        let fe_instance = FrontendBuilder::new(
523            fe_opts.clone(),
524            kv_backend.clone(),
525            layered_cache_registry.clone(),
526            catalog_manager.clone(),
527            node_manager.clone(),
528            procedure_executor.clone(),
529            process_manager,
530        )
531        .with_plugin(plugins.clone())
532        .try_build()
533        .await
534        .context(error::StartFrontendSnafu)?;
535        let fe_instance = Arc::new(fe_instance);
536
537        // set the frontend client for flownode
538        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
539        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
540        frontend_instance_handler
541            .lock()
542            .unwrap()
543            .replace(weak_grpc_handler);
544
545        // set the frontend invoker for flownode
546        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
547        // flow server need to be able to use frontend to write insert requests back
548        let invoker = FrontendInvoker::build_from(
549            flow_streaming_engine.clone(),
550            catalog_manager.clone(),
551            kv_backend.clone(),
552            layered_cache_registry.clone(),
553            procedure_executor,
554            node_manager,
555        )
556        .await
557        .context(StartFlownodeSnafu)?;
558        flow_streaming_engine.set_frontend_invoker(invoker).await;
559
560        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
561            .context(error::ServersSnafu)?;
562
563        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
564            .build()
565            .context(error::StartFrontendSnafu)?;
566
567        let frontend = Frontend {
568            instance: fe_instance,
569            servers,
570            heartbeat_task: None,
571            export_metrics_task,
572        };
573
574        #[cfg(feature = "enterprise")]
575        let components = Components {
576            plugins,
577            kv_backend,
578            frontend_client,
579            catalog_manager,
580        };
581
582        Ok(Instance {
583            datanode,
584            frontend,
585            flownode,
586            procedure_manager,
587            wal_options_allocator,
588            #[cfg(feature = "enterprise")]
589            components,
590            _guard: guard,
591        })
592    }
593
594    pub async fn create_table_metadata_manager(
595        kv_backend: KvBackendRef,
596    ) -> Result<TableMetadataManagerRef> {
597        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
598
599        table_metadata_manager
600            .init()
601            .await
602            .context(error::InitMetadataSnafu)?;
603
604        Ok(table_metadata_manager)
605    }
606}
607
608#[cfg(test)]
609mod tests {
610    use std::default::Default;
611    use std::io::Write;
612    use std::time::Duration;
613
614    use auth::{Identity, Password, UserProviderRef};
615    use common_base::readable_size::ReadableSize;
616    use common_config::ENV_VAR_SEP;
617    use common_test_util::temp_dir::create_named_temp_file;
618    use common_wal::config::DatanodeWalConfig;
619    use frontend::frontend::FrontendOptions;
620    use object_store::config::{FileConfig, GcsConfig};
621    use servers::grpc::GrpcOptions;
622
623    use super::*;
624    use crate::options::GlobalOptions;
625
626    #[tokio::test]
627    async fn test_try_from_start_command_to_anymap() {
628        let fe_opts = FrontendOptions {
629            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
630            ..Default::default()
631        };
632
633        let mut plugins = Plugins::new();
634        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
635            .await
636            .unwrap();
637
638        let provider = plugins.get::<UserProviderRef>().unwrap();
639        let result = provider
640            .authenticate(
641                Identity::UserId("test", None),
642                Password::PlainText("test".to_string().into()),
643            )
644            .await;
645        let _ = result.unwrap();
646    }
647
648    #[test]
649    fn test_toml() {
650        let opts = StandaloneOptions::default();
651        let toml_string = toml::to_string(&opts).unwrap();
652        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
653    }
654
655    #[test]
656    fn test_read_from_config_file() {
657        let mut file = create_named_temp_file();
658        let toml_str = r#"
659            enable_memory_catalog = true
660
661            [wal]
662            provider = "raft_engine"
663            dir = "./greptimedb_data/test/wal"
664            file_size = "1GB"
665            purge_threshold = "50GB"
666            purge_interval = "10m"
667            read_batch_size = 128
668            sync_write = false
669
670            [storage]
671            data_home = "./greptimedb_data/"
672            type = "File"
673
674            [[storage.providers]]
675            type = "Gcs"
676            bucket = "foo"
677            endpoint = "bar"
678
679            [[storage.providers]]
680            type = "S3"
681            access_key_id = "access_key_id"
682            secret_access_key = "secret_access_key"
683
684            [storage.compaction]
685            max_inflight_tasks = 3
686            max_files_in_level0 = 7
687            max_purge_tasks = 32
688
689            [storage.manifest]
690            checkpoint_margin = 9
691            gc_duration = '7s'
692
693            [http]
694            addr = "127.0.0.1:4000"
695            timeout = "33s"
696            body_limit = "128MB"
697
698            [opentsdb]
699            enable = true
700
701            [logging]
702            level = "debug"
703            dir = "./greptimedb_data/test/logs"
704        "#;
705        write!(file, "{}", toml_str).unwrap();
706        let cmd = StartCommand {
707            config_file: Some(file.path().to_str().unwrap().to_string()),
708            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
709            ..Default::default()
710        };
711
712        let options = cmd
713            .load_options(&GlobalOptions::default())
714            .unwrap()
715            .component;
716        let fe_opts = options.frontend_options();
717        let dn_opts = options.datanode_options();
718        let logging_opts = options.logging;
719        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
720        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
721        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
722        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
723        assert!(fe_opts.mysql.enable);
724        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
725        assert_eq!(2, fe_opts.mysql.runtime_size);
726        assert_eq!(None, fe_opts.mysql.reject_no_database);
727        assert!(fe_opts.influxdb.enable);
728        assert!(fe_opts.opentsdb.enable);
729
730        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
731            unreachable!()
732        };
733        assert_eq!(
734            "./greptimedb_data/test/wal",
735            raft_engine_config.dir.unwrap()
736        );
737
738        assert!(matches!(
739            &dn_opts.storage.store,
740            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
741        ));
742        assert_eq!(dn_opts.storage.providers.len(), 2);
743        assert!(matches!(
744            dn_opts.storage.providers[0],
745            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
746        ));
747        match &dn_opts.storage.providers[1] {
748            object_store::config::ObjectStoreConfig::S3(s3_config) => {
749                assert_eq!(
750                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
751                    format!("{:?}", s3_config.connection.access_key_id)
752                );
753            }
754            _ => {
755                unreachable!()
756            }
757        }
758
759        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
760        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
761    }
762
763    #[test]
764    fn test_load_log_options_from_cli() {
765        let cmd = StartCommand {
766            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
767            ..Default::default()
768        };
769
770        let opts = cmd
771            .load_options(&GlobalOptions {
772                log_dir: Some("./greptimedb_data/test/logs".to_string()),
773                log_level: Some("debug".to_string()),
774
775                #[cfg(feature = "tokio-console")]
776                tokio_console_addr: None,
777            })
778            .unwrap()
779            .component;
780
781        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
782        assert_eq!("debug", opts.logging.level.unwrap());
783    }
784
785    #[test]
786    fn test_config_precedence_order() {
787        let mut file = create_named_temp_file();
788        let toml_str = r#"
789            [http]
790            addr = "127.0.0.1:4000"
791
792            [logging]
793            level = "debug"
794        "#;
795        write!(file, "{}", toml_str).unwrap();
796
797        let env_prefix = "STANDALONE_UT";
798        temp_env::with_vars(
799            [
800                (
801                    // logging.dir = /other/log/dir
802                    [
803                        env_prefix.to_string(),
804                        "logging".to_uppercase(),
805                        "dir".to_uppercase(),
806                    ]
807                    .join(ENV_VAR_SEP),
808                    Some("/other/log/dir"),
809                ),
810                (
811                    // logging.level = info
812                    [
813                        env_prefix.to_string(),
814                        "logging".to_uppercase(),
815                        "level".to_uppercase(),
816                    ]
817                    .join(ENV_VAR_SEP),
818                    Some("info"),
819                ),
820                (
821                    // http.addr = 127.0.0.1:24000
822                    [
823                        env_prefix.to_string(),
824                        "http".to_uppercase(),
825                        "addr".to_uppercase(),
826                    ]
827                    .join(ENV_VAR_SEP),
828                    Some("127.0.0.1:24000"),
829                ),
830            ],
831            || {
832                let command = StartCommand {
833                    config_file: Some(file.path().to_str().unwrap().to_string()),
834                    http_addr: Some("127.0.0.1:14000".to_string()),
835                    env_prefix: env_prefix.to_string(),
836                    ..Default::default()
837                };
838
839                let opts = command.load_options(&Default::default()).unwrap().component;
840
841                // Should be read from env, env > default values.
842                assert_eq!(opts.logging.dir, "/other/log/dir");
843
844                // Should be read from config file, config file > env > default values.
845                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
846
847                // Should be read from cli, cli > config file > env > default values.
848                let fe_opts = opts.frontend_options();
849                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
850                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
851
852                // Should be default value.
853                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
854            },
855        );
856    }
857
858    #[test]
859    fn test_load_default_standalone_options() {
860        let options =
861            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
862        let default_options = StandaloneOptions::default();
863        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
864        assert_eq!(options.http, default_options.http);
865        assert_eq!(options.grpc, default_options.grpc);
866        assert_eq!(options.mysql, default_options.mysql);
867        assert_eq!(options.postgres, default_options.postgres);
868        assert_eq!(options.opentsdb, default_options.opentsdb);
869        assert_eq!(options.influxdb, default_options.influxdb);
870        assert_eq!(options.prom_store, default_options.prom_store);
871        assert_eq!(options.wal, default_options.wal);
872        assert_eq!(options.metadata_store, default_options.metadata_store);
873        assert_eq!(options.procedure, default_options.procedure);
874        assert_eq!(options.logging, default_options.logging);
875        assert_eq!(options.region_engine, default_options.region_engine);
876    }
877
878    #[test]
879    fn test_cache_config() {
880        let toml_str = r#"
881            [storage]
882            data_home = "test_data_home"
883            type = "S3"
884            [storage.cache_config]
885            enable_read_cache = true
886        "#;
887        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
888        opts.sanitize();
889        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
890        assert_eq!(
891            opts.storage.store.cache_config().unwrap().cache_path,
892            "test_data_home"
893        );
894    }
895}