cmd/
standalone.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::net::SocketAddr;
16use std::path::Path;
17use std::sync::Arc;
18use std::{fs, path};
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_schema::InformationExtensionRef;
23use catalog::kvbackend::KvBackendCatalogManagerBuilder;
24use catalog::process_manager::ProcessManager;
25use clap::Parser;
26use common_base::Plugins;
27use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
28use common_config::{Configurable, metadata_store_dir};
29use common_error::ext::BoxedError;
30use common_meta::cache::LayeredCacheRegistryBuilder;
31use common_meta::ddl::flow_meta::FlowMetadataAllocator;
32use common_meta::ddl::table_meta::TableMetadataAllocator;
33use common_meta::ddl::{DdlContext, NoopRegionFailureDetectorControl};
34use common_meta::ddl_manager::DdlManager;
35use common_meta::key::flow::FlowMetadataManager;
36use common_meta::key::{TableMetadataManager, TableMetadataManagerRef};
37use common_meta::kv_backend::KvBackendRef;
38use common_meta::procedure_executor::LocalProcedureExecutor;
39use common_meta::region_keeper::MemoryRegionKeeper;
40use common_meta::region_registry::LeaderRegionRegistry;
41use common_meta::sequence::SequenceBuilder;
42use common_meta::wal_options_allocator::{WalOptionsAllocatorRef, build_wal_options_allocator};
43use common_procedure::ProcedureManagerRef;
44use common_query::prelude::set_default_prefix;
45use common_telemetry::info;
46use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
47use common_time::timezone::set_default_timezone;
48use common_version::{short_version, verbose_version};
49use datanode::config::DatanodeOptions;
50use datanode::datanode::{Datanode, DatanodeBuilder};
51use flow::{
52    FlownodeBuilder, FlownodeInstance, FlownodeOptions, FrontendClient, FrontendInvoker,
53    GrpcQueryHandlerWithBoxedError,
54};
55use frontend::frontend::Frontend;
56use frontend::instance::StandaloneDatanodeManager;
57use frontend::instance::builder::FrontendBuilder;
58use frontend::server::Services;
59use meta_srv::metasrv::{FLOW_ID_SEQ, TABLE_ID_SEQ};
60use servers::tls::{TlsMode, TlsOption};
61use snafu::ResultExt;
62use standalone::StandaloneInformationExtension;
63use standalone::options::StandaloneOptions;
64use tracing_appender::non_blocking::WorkerGuard;
65
66use crate::error::{Result, StartFlownodeSnafu};
67use crate::options::{GlobalOptions, GreptimeOptions};
68use crate::{App, create_resource_limit_metrics, error, log_versions, maybe_activate_heap_profile};
69
70pub const APP_NAME: &str = "greptime-standalone";
71
72#[derive(Parser)]
73pub struct Command {
74    #[clap(subcommand)]
75    subcmd: SubCommand,
76}
77
78impl Command {
79    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
80        self.subcmd.build(opts).await
81    }
82
83    pub fn load_options(
84        &self,
85        global_options: &GlobalOptions,
86    ) -> Result<GreptimeOptions<StandaloneOptions>> {
87        self.subcmd.load_options(global_options)
88    }
89}
90
91#[derive(Parser)]
92enum SubCommand {
93    Start(StartCommand),
94}
95
96impl SubCommand {
97    async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
98        match self {
99            SubCommand::Start(cmd) => cmd.build(opts).await,
100        }
101    }
102
103    fn load_options(
104        &self,
105        global_options: &GlobalOptions,
106    ) -> Result<GreptimeOptions<StandaloneOptions>> {
107        match self {
108            SubCommand::Start(cmd) => cmd.load_options(global_options),
109        }
110    }
111}
112
113pub struct Instance {
114    datanode: Datanode,
115    frontend: Frontend,
116    flownode: FlownodeInstance,
117    procedure_manager: ProcedureManagerRef,
118    wal_options_allocator: WalOptionsAllocatorRef,
119
120    // The components of standalone, which make it easier to expand based
121    // on the components.
122    #[cfg(feature = "enterprise")]
123    components: Components,
124
125    // Keep the logging guard to prevent the worker from being dropped.
126    _guard: Vec<WorkerGuard>,
127}
128
129#[cfg(feature = "enterprise")]
130pub struct Components {
131    pub plugins: Plugins,
132    pub kv_backend: KvBackendRef,
133    pub frontend_client: Arc<FrontendClient>,
134    pub catalog_manager: catalog::CatalogManagerRef,
135}
136
137impl Instance {
138    /// Find the socket addr of a server by its `name`.
139    pub fn server_addr(&self, name: &str) -> Option<SocketAddr> {
140        self.frontend.server_handlers().addr(name)
141    }
142
143    #[cfg(feature = "enterprise")]
144    pub fn components(&self) -> &Components {
145        &self.components
146    }
147}
148
149#[async_trait]
150impl App for Instance {
151    fn name(&self) -> &str {
152        APP_NAME
153    }
154
155    async fn start(&mut self) -> Result<()> {
156        self.datanode.start_telemetry();
157
158        self.procedure_manager
159            .start()
160            .await
161            .context(error::StartProcedureManagerSnafu)?;
162
163        self.wal_options_allocator
164            .start()
165            .await
166            .context(error::StartWalOptionsAllocatorSnafu)?;
167
168        plugins::start_frontend_plugins(self.frontend.instance.plugins().clone())
169            .await
170            .context(error::StartFrontendSnafu)?;
171
172        self.frontend
173            .start()
174            .await
175            .context(error::StartFrontendSnafu)?;
176
177        self.flownode.start().await.context(StartFlownodeSnafu)?;
178
179        Ok(())
180    }
181
182    async fn stop(&mut self) -> Result<()> {
183        self.frontend
184            .shutdown()
185            .await
186            .context(error::ShutdownFrontendSnafu)?;
187
188        self.procedure_manager
189            .stop()
190            .await
191            .context(error::StopProcedureManagerSnafu)?;
192
193        self.datanode
194            .shutdown()
195            .await
196            .context(error::ShutdownDatanodeSnafu)?;
197
198        self.flownode
199            .shutdown()
200            .await
201            .context(error::ShutdownFlownodeSnafu)?;
202
203        info!("Datanode instance stopped.");
204
205        Ok(())
206    }
207}
208
209#[derive(Debug, Default, Parser)]
210pub struct StartCommand {
211    #[clap(long)]
212    http_addr: Option<String>,
213    #[clap(long, alias = "rpc-addr")]
214    rpc_bind_addr: Option<String>,
215    #[clap(long)]
216    mysql_addr: Option<String>,
217    #[clap(long)]
218    postgres_addr: Option<String>,
219    #[clap(short, long)]
220    influxdb_enable: bool,
221    #[clap(short, long)]
222    pub config_file: Option<String>,
223    #[clap(long)]
224    tls_mode: Option<TlsMode>,
225    #[clap(long)]
226    tls_cert_path: Option<String>,
227    #[clap(long)]
228    tls_key_path: Option<String>,
229    #[clap(long)]
230    tls_watch: bool,
231    #[clap(long)]
232    user_provider: Option<String>,
233    #[clap(long, default_value = "GREPTIMEDB_STANDALONE")]
234    pub env_prefix: String,
235    /// The working home directory of this standalone instance.
236    #[clap(long)]
237    data_home: Option<String>,
238}
239
240impl StartCommand {
241    /// Load the GreptimeDB options from various sources (command line, config file or env).
242    pub fn load_options(
243        &self,
244        global_options: &GlobalOptions,
245    ) -> Result<GreptimeOptions<StandaloneOptions>> {
246        let mut opts = GreptimeOptions::<StandaloneOptions>::load_layered_options(
247            self.config_file.as_deref(),
248            self.env_prefix.as_ref(),
249        )
250        .context(error::LoadLayeredConfigSnafu)?;
251
252        self.merge_with_cli_options(global_options, &mut opts.component)?;
253        opts.component.sanitize();
254
255        Ok(opts)
256    }
257
258    // The precedence order is: cli > config file > environment variables > default values.
259    pub fn merge_with_cli_options(
260        &self,
261        global_options: &GlobalOptions,
262        opts: &mut StandaloneOptions,
263    ) -> Result<()> {
264        if let Some(dir) = &global_options.log_dir {
265            opts.logging.dir.clone_from(dir);
266        }
267
268        if global_options.log_level.is_some() {
269            opts.logging.level.clone_from(&global_options.log_level);
270        }
271
272        opts.tracing = TracingOptions {
273            #[cfg(feature = "tokio-console")]
274            tokio_console_addr: global_options.tokio_console_addr.clone(),
275        };
276
277        let tls_opts = TlsOption::new(
278            self.tls_mode.clone(),
279            self.tls_cert_path.clone(),
280            self.tls_key_path.clone(),
281            self.tls_watch,
282        );
283
284        if let Some(addr) = &self.http_addr {
285            opts.http.addr.clone_from(addr);
286        }
287
288        if let Some(data_home) = &self.data_home {
289            opts.storage.data_home.clone_from(data_home);
290        }
291
292        // If the logging dir is not set, use the default logs dir in the data home.
293        if opts.logging.dir.is_empty() {
294            opts.logging.dir = Path::new(&opts.storage.data_home)
295                .join(DEFAULT_LOGGING_DIR)
296                .to_string_lossy()
297                .to_string();
298        }
299
300        if let Some(addr) = &self.rpc_bind_addr {
301            // frontend grpc addr conflict with datanode default grpc addr
302            let datanode_grpc_addr = DatanodeOptions::default().grpc.bind_addr;
303            if addr.eq(&datanode_grpc_addr) {
304                return error::IllegalConfigSnafu {
305                    msg: format!(
306                        "gRPC listen address conflicts with datanode reserved gRPC addr: {datanode_grpc_addr}",
307                    ),
308                }.fail();
309            }
310            opts.grpc.bind_addr.clone_from(addr)
311        }
312
313        if let Some(addr) = &self.mysql_addr {
314            opts.mysql.enable = true;
315            opts.mysql.addr.clone_from(addr);
316            opts.mysql.tls = tls_opts.clone();
317        }
318
319        if let Some(addr) = &self.postgres_addr {
320            opts.postgres.enable = true;
321            opts.postgres.addr.clone_from(addr);
322            opts.postgres.tls = tls_opts;
323        }
324
325        if self.influxdb_enable {
326            opts.influxdb.enable = self.influxdb_enable;
327        }
328
329        if let Some(user_provider) = &self.user_provider {
330            opts.user_provider = Some(user_provider.clone());
331        }
332
333        Ok(())
334    }
335
336    #[allow(unreachable_code)]
337    #[allow(unused_variables)]
338    #[allow(clippy::diverging_sub_expression)]
339    /// Build GreptimeDB instance with the loaded options.
340    pub async fn build(&self, opts: GreptimeOptions<StandaloneOptions>) -> Result<Instance> {
341        common_runtime::init_global_runtimes(&opts.runtime);
342
343        let guard = common_telemetry::init_global_logging(
344            APP_NAME,
345            &opts.component.logging,
346            &opts.component.tracing,
347            None,
348            Some(&opts.component.slow_query),
349        );
350
351        log_versions(verbose_version(), short_version(), APP_NAME);
352        maybe_activate_heap_profile(&opts.component.memory);
353        create_resource_limit_metrics(APP_NAME);
354
355        info!("Standalone start command: {:#?}", self);
356        info!("Standalone options: {opts:#?}");
357
358        let mut plugins = Plugins::new();
359        let plugin_opts = opts.plugins;
360        let mut opts = opts.component;
361        set_default_prefix(opts.default_column_prefix.as_deref())
362            .map_err(BoxedError::new)
363            .context(error::BuildCliSnafu)?;
364
365        opts.grpc.detect_server_addr();
366        let fe_opts = opts.frontend_options();
367        let dn_opts = opts.datanode_options();
368
369        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &fe_opts)
370            .await
371            .context(error::StartFrontendSnafu)?;
372
373        plugins::setup_datanode_plugins(&mut plugins, &plugin_opts, &dn_opts)
374            .await
375            .context(error::StartDatanodeSnafu)?;
376
377        set_default_timezone(fe_opts.default_timezone.as_deref())
378            .context(error::InitTimezoneSnafu)?;
379
380        let data_home = &dn_opts.storage.data_home;
381        // Ensure the data_home directory exists.
382        fs::create_dir_all(path::Path::new(data_home))
383            .context(error::CreateDirSnafu { dir: data_home })?;
384
385        let metadata_dir = metadata_store_dir(data_home);
386        let kv_backend = standalone::build_metadata_kvbackend(metadata_dir, opts.metadata_store)
387            .context(error::BuildMetadataKvbackendSnafu)?;
388        let procedure_manager =
389            standalone::build_procedure_manager(kv_backend.clone(), opts.procedure);
390
391        plugins::setup_standalone_plugins(&mut plugins, &plugin_opts, &opts, kv_backend.clone())
392            .await
393            .context(error::SetupStandalonePluginsSnafu)?;
394
395        // Builds cache registry
396        let layered_cache_builder = LayeredCacheRegistryBuilder::default();
397        let fundamental_cache_registry = build_fundamental_cache_registry(kv_backend.clone());
398        let layered_cache_registry = Arc::new(
399            with_default_composite_cache_registry(
400                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
401            )
402            .context(error::BuildCacheRegistrySnafu)?
403            .build(),
404        );
405
406        let mut builder = DatanodeBuilder::new(dn_opts, plugins.clone(), kv_backend.clone());
407        builder.with_cache_registry(layered_cache_registry.clone());
408        let datanode = builder.build().await.context(error::StartDatanodeSnafu)?;
409
410        let information_extension = Arc::new(StandaloneInformationExtension::new(
411            datanode.region_server(),
412            procedure_manager.clone(),
413        ));
414
415        plugins.insert::<InformationExtensionRef>(information_extension.clone());
416
417        let process_manager = Arc::new(ProcessManager::new(opts.grpc.server_addr.clone(), None));
418        let builder = KvBackendCatalogManagerBuilder::new(
419            information_extension.clone(),
420            kv_backend.clone(),
421            layered_cache_registry.clone(),
422        )
423        .with_procedure_manager(procedure_manager.clone())
424        .with_process_manager(process_manager.clone());
425        #[cfg(feature = "enterprise")]
426        let builder = if let Some(factories) = plugins.get() {
427            builder.with_extra_information_table_factories(factories)
428        } else {
429            builder
430        };
431        let catalog_manager = builder.build();
432
433        let table_metadata_manager =
434            Self::create_table_metadata_manager(kv_backend.clone()).await?;
435
436        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(kv_backend.clone()));
437        let flownode_options = FlownodeOptions {
438            flow: opts.flow.clone(),
439            ..Default::default()
440        };
441
442        // for standalone not use grpc, but get a handler to frontend grpc client without
443        // actually make a connection
444        let (frontend_client, frontend_instance_handler) =
445            FrontendClient::from_empty_grpc_handler(opts.query.clone());
446        let frontend_client = Arc::new(frontend_client);
447        let flow_builder = FlownodeBuilder::new(
448            flownode_options,
449            plugins.clone(),
450            table_metadata_manager.clone(),
451            catalog_manager.clone(),
452            flow_metadata_manager.clone(),
453            frontend_client.clone(),
454        );
455        let flownode = flow_builder
456            .build()
457            .await
458            .map_err(BoxedError::new)
459            .context(error::OtherSnafu)?;
460
461        // set the ref to query for the local flow state
462        {
463            let flow_streaming_engine = flownode.flow_engine().streaming_engine();
464            information_extension
465                .set_flow_streaming_engine(flow_streaming_engine)
466                .await;
467        }
468
469        let node_manager = Arc::new(StandaloneDatanodeManager {
470            region_server: datanode.region_server(),
471            flow_server: flownode.flow_engine(),
472        });
473
474        let table_id_sequence = Arc::new(
475            SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
476                .initial(MIN_USER_TABLE_ID as u64)
477                .step(10)
478                .build(),
479        );
480        let flow_id_sequence = Arc::new(
481            SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
482                .initial(MIN_USER_FLOW_ID as u64)
483                .step(10)
484                .build(),
485        );
486        let kafka_options = opts
487            .wal
488            .clone()
489            .try_into()
490            .context(error::InvalidWalProviderSnafu)?;
491        let wal_options_allocator = build_wal_options_allocator(&kafka_options, kv_backend.clone())
492            .await
493            .context(error::BuildWalOptionsAllocatorSnafu)?;
494        let wal_options_allocator = Arc::new(wal_options_allocator);
495        let table_metadata_allocator = Arc::new(TableMetadataAllocator::new(
496            table_id_sequence,
497            wal_options_allocator.clone(),
498        ));
499        let flow_metadata_allocator = Arc::new(FlowMetadataAllocator::with_noop_peer_allocator(
500            flow_id_sequence,
501        ));
502
503        let ddl_context = DdlContext {
504            node_manager: node_manager.clone(),
505            cache_invalidator: layered_cache_registry.clone(),
506            memory_region_keeper: Arc::new(MemoryRegionKeeper::default()),
507            leader_region_registry: Arc::new(LeaderRegionRegistry::default()),
508            table_metadata_manager: table_metadata_manager.clone(),
509            table_metadata_allocator: table_metadata_allocator.clone(),
510            flow_metadata_manager: flow_metadata_manager.clone(),
511            flow_metadata_allocator: flow_metadata_allocator.clone(),
512            region_failure_detector_controller: Arc::new(NoopRegionFailureDetectorControl),
513        };
514
515        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager.clone(), true)
516            .context(error::InitDdlManagerSnafu)?;
517        #[cfg(feature = "enterprise")]
518        let ddl_manager = {
519            let trigger_ddl_manager: Option<common_meta::ddl_manager::TriggerDdlManagerRef> =
520                plugins.get();
521            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
522        };
523
524        let procedure_executor = Arc::new(LocalProcedureExecutor::new(
525            Arc::new(ddl_manager),
526            procedure_manager.clone(),
527        ));
528
529        let fe_instance = FrontendBuilder::new(
530            fe_opts.clone(),
531            kv_backend.clone(),
532            layered_cache_registry.clone(),
533            catalog_manager.clone(),
534            node_manager.clone(),
535            procedure_executor.clone(),
536            process_manager,
537        )
538        .with_plugin(plugins.clone())
539        .try_build()
540        .await
541        .context(error::StartFrontendSnafu)?;
542        let fe_instance = Arc::new(fe_instance);
543
544        // set the frontend client for flownode
545        let grpc_handler = fe_instance.clone() as Arc<dyn GrpcQueryHandlerWithBoxedError>;
546        let weak_grpc_handler = Arc::downgrade(&grpc_handler);
547        frontend_instance_handler
548            .lock()
549            .unwrap()
550            .replace(weak_grpc_handler);
551
552        // set the frontend invoker for flownode
553        let flow_streaming_engine = flownode.flow_engine().streaming_engine();
554        // flow server need to be able to use frontend to write insert requests back
555        let invoker = FrontendInvoker::build_from(
556            flow_streaming_engine.clone(),
557            catalog_manager.clone(),
558            kv_backend.clone(),
559            layered_cache_registry.clone(),
560            procedure_executor,
561            node_manager,
562        )
563        .await
564        .context(StartFlownodeSnafu)?;
565        flow_streaming_engine.set_frontend_invoker(invoker).await;
566
567        let servers = Services::new(opts, fe_instance.clone(), plugins.clone())
568            .build()
569            .context(error::StartFrontendSnafu)?;
570
571        let frontend = Frontend {
572            instance: fe_instance,
573            servers,
574            heartbeat_task: None,
575        };
576
577        #[cfg(feature = "enterprise")]
578        let components = Components {
579            plugins,
580            kv_backend,
581            frontend_client,
582            catalog_manager,
583        };
584
585        Ok(Instance {
586            datanode,
587            frontend,
588            flownode,
589            procedure_manager,
590            wal_options_allocator,
591            #[cfg(feature = "enterprise")]
592            components,
593            _guard: guard,
594        })
595    }
596
597    pub async fn create_table_metadata_manager(
598        kv_backend: KvBackendRef,
599    ) -> Result<TableMetadataManagerRef> {
600        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend));
601
602        table_metadata_manager
603            .init()
604            .await
605            .context(error::InitMetadataSnafu)?;
606
607        Ok(table_metadata_manager)
608    }
609}
610
611#[cfg(test)]
612mod tests {
613    use std::default::Default;
614    use std::io::Write;
615    use std::time::Duration;
616
617    use auth::{Identity, Password, UserProviderRef};
618    use common_base::readable_size::ReadableSize;
619    use common_config::ENV_VAR_SEP;
620    use common_test_util::temp_dir::create_named_temp_file;
621    use common_wal::config::DatanodeWalConfig;
622    use frontend::frontend::FrontendOptions;
623    use object_store::config::{FileConfig, GcsConfig};
624    use servers::grpc::GrpcOptions;
625
626    use super::*;
627    use crate::options::GlobalOptions;
628
629    #[tokio::test]
630    async fn test_try_from_start_command_to_anymap() {
631        let fe_opts = FrontendOptions {
632            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
633            ..Default::default()
634        };
635
636        let mut plugins = Plugins::new();
637        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
638            .await
639            .unwrap();
640
641        let provider = plugins.get::<UserProviderRef>().unwrap();
642        let result = provider
643            .authenticate(
644                Identity::UserId("test", None),
645                Password::PlainText("test".to_string().into()),
646            )
647            .await;
648        let _ = result.unwrap();
649    }
650
651    #[test]
652    fn test_toml() {
653        let opts = StandaloneOptions::default();
654        let toml_string = toml::to_string(&opts).unwrap();
655        let _parsed: StandaloneOptions = toml::from_str(&toml_string).unwrap();
656    }
657
658    #[test]
659    fn test_read_from_config_file() {
660        let mut file = create_named_temp_file();
661        let toml_str = r#"
662            enable_memory_catalog = true
663
664            [wal]
665            provider = "raft_engine"
666            dir = "./greptimedb_data/test/wal"
667            file_size = "1GB"
668            purge_threshold = "50GB"
669            purge_interval = "10m"
670            read_batch_size = 128
671            sync_write = false
672
673            [storage]
674            data_home = "./greptimedb_data/"
675            type = "File"
676
677            [[storage.providers]]
678            type = "Gcs"
679            bucket = "foo"
680            endpoint = "bar"
681
682            [[storage.providers]]
683            type = "S3"
684            access_key_id = "access_key_id"
685            secret_access_key = "secret_access_key"
686
687            [storage.compaction]
688            max_inflight_tasks = 3
689            max_files_in_level0 = 7
690            max_purge_tasks = 32
691
692            [storage.manifest]
693            checkpoint_margin = 9
694            gc_duration = '7s'
695
696            [http]
697            addr = "127.0.0.1:4000"
698            timeout = "33s"
699            body_limit = "128MB"
700
701            [opentsdb]
702            enable = true
703
704            [logging]
705            level = "debug"
706            dir = "./greptimedb_data/test/logs"
707        "#;
708        write!(file, "{}", toml_str).unwrap();
709        let cmd = StartCommand {
710            config_file: Some(file.path().to_str().unwrap().to_string()),
711            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
712            ..Default::default()
713        };
714
715        let options = cmd
716            .load_options(&GlobalOptions::default())
717            .unwrap()
718            .component;
719        let fe_opts = options.frontend_options();
720        let dn_opts = options.datanode_options();
721        let logging_opts = options.logging;
722        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
723        assert_eq!(Duration::from_secs(33), fe_opts.http.timeout);
724        assert_eq!(ReadableSize::mb(128), fe_opts.http.body_limit);
725        assert_eq!("127.0.0.1:4001".to_string(), fe_opts.grpc.bind_addr);
726        assert!(fe_opts.mysql.enable);
727        assert_eq!("127.0.0.1:4002", fe_opts.mysql.addr);
728        assert_eq!(2, fe_opts.mysql.runtime_size);
729        assert_eq!(None, fe_opts.mysql.reject_no_database);
730        assert!(fe_opts.influxdb.enable);
731        assert!(fe_opts.opentsdb.enable);
732
733        let DatanodeWalConfig::RaftEngine(raft_engine_config) = dn_opts.wal else {
734            unreachable!()
735        };
736        assert_eq!(
737            "./greptimedb_data/test/wal",
738            raft_engine_config.dir.unwrap()
739        );
740
741        assert!(matches!(
742            &dn_opts.storage.store,
743            object_store::config::ObjectStoreConfig::File(FileConfig { .. })
744        ));
745        assert_eq!(dn_opts.storage.providers.len(), 2);
746        assert!(matches!(
747            dn_opts.storage.providers[0],
748            object_store::config::ObjectStoreConfig::Gcs(GcsConfig { .. })
749        ));
750        match &dn_opts.storage.providers[1] {
751            object_store::config::ObjectStoreConfig::S3(s3_config) => {
752                assert_eq!(
753                    "SecretBox<alloc::string::String>([REDACTED])".to_string(),
754                    format!("{:?}", s3_config.connection.access_key_id)
755                );
756            }
757            _ => {
758                unreachable!()
759            }
760        }
761
762        assert_eq!("debug", logging_opts.level.as_ref().unwrap());
763        assert_eq!("./greptimedb_data/test/logs".to_string(), logging_opts.dir);
764    }
765
766    #[test]
767    fn test_load_log_options_from_cli() {
768        let cmd = StartCommand {
769            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
770            mysql_addr: Some("127.0.0.1:4002".to_string()),
771            postgres_addr: Some("127.0.0.1:4003".to_string()),
772            tls_watch: true,
773            ..Default::default()
774        };
775
776        let opts = cmd
777            .load_options(&GlobalOptions {
778                log_dir: Some("./greptimedb_data/test/logs".to_string()),
779                log_level: Some("debug".to_string()),
780
781                #[cfg(feature = "tokio-console")]
782                tokio_console_addr: None,
783            })
784            .unwrap()
785            .component;
786
787        assert_eq!("./greptimedb_data/test/logs", opts.logging.dir);
788        assert_eq!("debug", opts.logging.level.unwrap());
789        assert!(opts.mysql.tls.watch);
790        assert!(opts.postgres.tls.watch);
791    }
792
793    #[test]
794    fn test_config_precedence_order() {
795        let mut file = create_named_temp_file();
796        let toml_str = r#"
797            [http]
798            addr = "127.0.0.1:4000"
799
800            [logging]
801            level = "debug"
802        "#;
803        write!(file, "{}", toml_str).unwrap();
804
805        let env_prefix = "STANDALONE_UT";
806        temp_env::with_vars(
807            [
808                (
809                    // logging.dir = /other/log/dir
810                    [
811                        env_prefix.to_string(),
812                        "logging".to_uppercase(),
813                        "dir".to_uppercase(),
814                    ]
815                    .join(ENV_VAR_SEP),
816                    Some("/other/log/dir"),
817                ),
818                (
819                    // logging.level = info
820                    [
821                        env_prefix.to_string(),
822                        "logging".to_uppercase(),
823                        "level".to_uppercase(),
824                    ]
825                    .join(ENV_VAR_SEP),
826                    Some("info"),
827                ),
828                (
829                    // http.addr = 127.0.0.1:24000
830                    [
831                        env_prefix.to_string(),
832                        "http".to_uppercase(),
833                        "addr".to_uppercase(),
834                    ]
835                    .join(ENV_VAR_SEP),
836                    Some("127.0.0.1:24000"),
837                ),
838            ],
839            || {
840                let command = StartCommand {
841                    config_file: Some(file.path().to_str().unwrap().to_string()),
842                    http_addr: Some("127.0.0.1:14000".to_string()),
843                    env_prefix: env_prefix.to_string(),
844                    ..Default::default()
845                };
846
847                let opts = command.load_options(&Default::default()).unwrap().component;
848
849                // Should be read from env, env > default values.
850                assert_eq!(opts.logging.dir, "/other/log/dir");
851
852                // Should be read from config file, config file > env > default values.
853                assert_eq!(opts.logging.level.as_ref().unwrap(), "debug");
854
855                // Should be read from cli, cli > config file > env > default values.
856                let fe_opts = opts.frontend_options();
857                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
858                assert_eq!(ReadableSize::mb(64), fe_opts.http.body_limit);
859
860                // Should be default value.
861                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
862            },
863        );
864    }
865
866    #[test]
867    fn test_load_default_standalone_options() {
868        let options =
869            StandaloneOptions::load_layered_options(None, "GREPTIMEDB_STANDALONE").unwrap();
870        let default_options = StandaloneOptions::default();
871        assert_eq!(options.enable_telemetry, default_options.enable_telemetry);
872        assert_eq!(options.http, default_options.http);
873        assert_eq!(options.grpc, default_options.grpc);
874        assert_eq!(options.mysql, default_options.mysql);
875        assert_eq!(options.postgres, default_options.postgres);
876        assert_eq!(options.opentsdb, default_options.opentsdb);
877        assert_eq!(options.influxdb, default_options.influxdb);
878        assert_eq!(options.prom_store, default_options.prom_store);
879        assert_eq!(options.wal, default_options.wal);
880        assert_eq!(options.metadata_store, default_options.metadata_store);
881        assert_eq!(options.procedure, default_options.procedure);
882        assert_eq!(options.logging, default_options.logging);
883        assert_eq!(options.region_engine, default_options.region_engine);
884    }
885
886    #[test]
887    fn test_cache_config() {
888        let toml_str = r#"
889            [storage]
890            data_home = "test_data_home"
891            type = "S3"
892            [storage.cache_config]
893            enable_read_cache = true
894        "#;
895        let mut opts: StandaloneOptions = toml::from_str(toml_str).unwrap();
896        opts.sanitize();
897        assert!(opts.storage.store.cache_config().unwrap().enable_read_cache);
898        assert_eq!(
899            opts.storage.store.cache_config().unwrap().cache_path,
900            "test_data_home"
901        );
902    }
903}