cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{
25    CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
26    MetaKvBackend,
27};
28use catalog::process_manager::ProcessManager;
29use clap::Parser;
30use client::client_manager::NodeClients;
31use common_base::Plugins;
32use common_config::{Configurable, DEFAULT_DATA_HOME};
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::ChannelConfig;
35use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
36use common_meta::heartbeat::handler::HandlerGroupExecutor;
37use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
38use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
39use common_meta::heartbeat::handler::suspend::SuspendHandler;
40use common_query::prelude::set_default_prefix;
41use common_stat::ResourceStatImpl;
42use common_telemetry::info;
43use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
44use common_time::timezone::set_default_timezone;
45use common_version::{short_version, verbose_version};
46use frontend::frontend::Frontend;
47use frontend::heartbeat::HeartbeatTask;
48use frontend::instance::builder::FrontendBuilder;
49use frontend::server::Services;
50use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
51use plugins::PluginOptions;
52use plugins::frontend::context::{
53    CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
54};
55use plugins::frontend::setup_frontend_dynamic_plugins;
56use plugins::options::PluginOptionsDeserializerImpl;
57use servers::addrs;
58use servers::grpc::GrpcOptions;
59use servers::tls::{TlsMode, TlsOption, merge_tls_option};
60use snafu::{OptionExt, ResultExt};
61use tracing_appender::non_blocking::WorkerGuard;
62
63use crate::error::{self, OtherSnafu, Result};
64use crate::options::{GlobalOptions, GreptimeOptions};
65use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
66
67type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
68
69pub struct Instance {
70    frontend: Frontend,
71    // Keep the logging guard to prevent the worker from being dropped.
72    _guard: Vec<WorkerGuard>,
73}
74
75pub const APP_NAME: &str = "greptime-frontend";
76
77impl Instance {
78    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
79        Self { frontend, _guard }
80    }
81
82    pub fn inner(&self) -> &Frontend {
83        &self.frontend
84    }
85
86    pub fn mut_inner(&mut self) -> &mut Frontend {
87        &mut self.frontend
88    }
89}
90
91#[async_trait]
92impl App for Instance {
93    fn name(&self) -> &str {
94        APP_NAME
95    }
96
97    async fn start(&mut self) -> Result<()> {
98        let plugins = self.frontend.instance.plugins().clone();
99        plugins::start_frontend_plugins(plugins)
100            .await
101            .context(error::StartFrontendSnafu)?;
102
103        self.frontend
104            .start()
105            .await
106            .context(error::StartFrontendSnafu)
107    }
108
109    async fn stop(&mut self) -> Result<()> {
110        self.frontend
111            .shutdown()
112            .await
113            .context(error::ShutdownFrontendSnafu)
114    }
115}
116
117#[derive(Parser)]
118pub struct Command {
119    #[clap(subcommand)]
120    pub subcmd: SubCommand,
121}
122
123impl Command {
124    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
125        self.subcmd.build(opts).await
126    }
127
128    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
129        self.subcmd.load_options(global_options)
130    }
131}
132
133#[derive(Parser)]
134pub enum SubCommand {
135    Start(StartCommand),
136}
137
138impl SubCommand {
139    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
140        match self {
141            SubCommand::Start(cmd) => cmd.build(opts).await,
142        }
143    }
144
145    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
146        match self {
147            SubCommand::Start(cmd) => cmd.load_options(global_options),
148        }
149    }
150}
151
152#[derive(Debug, Default, Parser)]
153pub struct StartCommand {
154    /// The address to bind the gRPC server.
155    #[clap(long, alias = "rpc-addr")]
156    rpc_bind_addr: Option<String>,
157    /// The address advertised to the metasrv, and used for connections from outside the host.
158    /// If left empty or unset, the server will automatically use the IP address of the first network interface
159    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
160    #[clap(long, alias = "rpc-hostname")]
161    rpc_server_addr: Option<String>,
162    /// The address to bind the internal gRPC server.
163    #[clap(long, alias = "internal-rpc-addr")]
164    internal_rpc_bind_addr: Option<String>,
165    /// The address advertised to the metasrv, and used for connections from outside the host.
166    /// If left empty or unset, the server will automatically use the IP address of the first network interface
167    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
168    #[clap(long, alias = "internal-rpc-hostname")]
169    internal_rpc_server_addr: Option<String>,
170    #[clap(long)]
171    http_addr: Option<String>,
172    #[clap(long)]
173    http_timeout: Option<u64>,
174    #[clap(long)]
175    mysql_addr: Option<String>,
176    #[clap(long)]
177    postgres_addr: Option<String>,
178    #[clap(short, long)]
179    pub config_file: Option<String>,
180    #[clap(short, long)]
181    influxdb_enable: Option<bool>,
182    #[clap(long, value_delimiter = ',', num_args = 1..)]
183    metasrv_addrs: Option<Vec<String>>,
184    #[clap(long)]
185    tls_mode: Option<TlsMode>,
186    #[clap(long)]
187    tls_cert_path: Option<String>,
188    #[clap(long)]
189    tls_key_path: Option<String>,
190    #[clap(long)]
191    tls_watch: bool,
192    #[clap(long)]
193    user_provider: Option<String>,
194    #[clap(long)]
195    disable_dashboard: Option<bool>,
196    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
197    pub env_prefix: String,
198}
199
200impl StartCommand {
201    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
202        let mut opts = FrontendOptions::load_layered_options(
203            self.config_file.as_deref(),
204            self.env_prefix.as_ref(),
205        )
206        .context(error::LoadLayeredConfigSnafu)?;
207
208        self.merge_with_cli_options(global_options, &mut opts)?;
209
210        Ok(opts)
211    }
212
213    // The precedence order is: cli > config file > environment variables > default values.
214    fn merge_with_cli_options(
215        &self,
216        global_options: &GlobalOptions,
217        opts: &mut FrontendOptions,
218    ) -> Result<()> {
219        let opts = &mut opts.component;
220
221        if let Some(dir) = &global_options.log_dir {
222            opts.logging.dir.clone_from(dir);
223        }
224
225        // If the logging dir is not set, use the default logs dir in the data home.
226        if opts.logging.dir.is_empty() {
227            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
228                .join(DEFAULT_LOGGING_DIR)
229                .to_string_lossy()
230                .to_string();
231        }
232
233        if global_options.log_level.is_some() {
234            opts.logging.level.clone_from(&global_options.log_level);
235        }
236
237        opts.tracing = TracingOptions {
238            #[cfg(feature = "tokio-console")]
239            tokio_console_addr: global_options.tokio_console_addr.clone(),
240        };
241
242        let tls_opts = TlsOption::new(
243            self.tls_mode,
244            self.tls_cert_path.clone(),
245            self.tls_key_path.clone(),
246            self.tls_watch,
247        );
248
249        if let Some(addr) = &self.http_addr {
250            opts.http.addr.clone_from(addr);
251        }
252
253        if let Some(http_timeout) = self.http_timeout {
254            opts.http.timeout = Duration::from_secs(http_timeout)
255        }
256
257        if let Some(disable_dashboard) = self.disable_dashboard {
258            opts.http.disable_dashboard = disable_dashboard;
259        }
260
261        if let Some(addr) = &self.rpc_bind_addr {
262            opts.grpc.bind_addr.clone_from(addr);
263            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
264        }
265
266        if let Some(addr) = &self.rpc_server_addr {
267            opts.grpc.server_addr.clone_from(addr);
268        }
269
270        if let Some(addr) = &self.internal_rpc_bind_addr {
271            if let Some(internal_grpc) = &mut opts.internal_grpc {
272                internal_grpc.bind_addr = addr.clone();
273            } else {
274                let grpc_options = GrpcOptions {
275                    bind_addr: addr.clone(),
276                    ..Default::default()
277                };
278
279                opts.internal_grpc = Some(grpc_options);
280            }
281        }
282
283        if let Some(addr) = &self.internal_rpc_server_addr {
284            if let Some(internal_grpc) = &mut opts.internal_grpc {
285                internal_grpc.server_addr = addr.clone();
286            } else {
287                let grpc_options = GrpcOptions {
288                    server_addr: addr.clone(),
289                    ..Default::default()
290                };
291                opts.internal_grpc = Some(grpc_options);
292            }
293        }
294
295        if let Some(addr) = &self.mysql_addr {
296            opts.mysql.enable = true;
297            opts.mysql.addr.clone_from(addr);
298            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
299        }
300
301        if let Some(addr) = &self.postgres_addr {
302            opts.postgres.enable = true;
303            opts.postgres.addr.clone_from(addr);
304            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
305        }
306
307        if let Some(enable) = self.influxdb_enable {
308            opts.influxdb.enable = enable;
309        }
310
311        if let Some(metasrv_addrs) = &self.metasrv_addrs {
312            opts.meta_client
313                .get_or_insert_with(MetaClientOptions::default)
314                .metasrv_addrs
315                .clone_from(metasrv_addrs);
316        }
317
318        if let Some(user_provider) = &self.user_provider {
319            opts.user_provider = Some(user_provider.clone());
320        }
321
322        Ok(())
323    }
324
325    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
326        common_runtime::init_global_runtimes(&opts.runtime);
327
328        let guard = common_telemetry::init_global_logging(
329            APP_NAME,
330            &opts.component.logging,
331            &opts.component.tracing,
332            opts.component.node_id.clone(),
333            Some(&opts.component.slow_query),
334        );
335
336        log_versions(verbose_version(), short_version(), APP_NAME);
337        maybe_activate_heap_profile(&opts.component.memory);
338        create_resource_limit_metrics(APP_NAME);
339
340        info!("Frontend start command: {:#?}", self);
341        info!("Frontend options: {:#?}", opts);
342
343        let plugin_opts = opts.plugins;
344        let mut opts = opts.component;
345        opts.grpc.detect_server_addr();
346        let mut plugins = Plugins::new();
347        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
348            .await
349            .context(error::StartFrontendSnafu)?;
350
351        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
352        set_default_prefix(opts.default_column_prefix.as_deref())
353            .map_err(BoxedError::new)
354            .context(error::BuildCliSnafu)?;
355
356        let meta_client_options = opts
357            .meta_client
358            .as_ref()
359            .context(error::MissingConfigSnafu {
360                msg: "'meta_client'",
361            })?;
362
363        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
364        let cache_ttl = meta_client_options.metadata_cache_ttl;
365        let cache_tti = meta_client_options.metadata_cache_tti;
366
367        let meta_client = meta_client::create_meta_client(
368            MetaClientType::Frontend,
369            meta_client_options,
370            Some(&plugins),
371            None,
372        )
373        .await
374        .context(error::MetaClientInitSnafu)?;
375
376        let meta_config: Vec<PluginOptions> = meta_client
377            .pull_config(PluginOptionsDeserializerImpl)
378            .await
379            .context(error::MetaClientInitSnafu)?;
380        setup_frontend_dynamic_plugins(meta_config, &mut plugins)
381            .await
382            .context(error::StartFrontendSnafu)?;
383
384        // TODO(discord9): add helper function to ease the creation of cache registry&such
385        let cached_meta_backend =
386            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
387                .cache_max_capacity(cache_max_capacity)
388                .cache_ttl(cache_ttl)
389                .cache_tti(cache_tti)
390                .build();
391        let cached_meta_backend = Arc::new(cached_meta_backend);
392
393        // Builds cache registry
394        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
395            CacheRegistryBuilder::default()
396                .add_cache(cached_meta_backend.clone())
397                .build(),
398        );
399        let fundamental_cache_registry =
400            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
401        let layered_cache_registry = Arc::new(
402            with_default_composite_cache_registry(
403                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
404            )
405            .context(error::BuildCacheRegistrySnafu)?
406            .build(),
407        );
408
409        // frontend to datanode need not timeout.
410        // Some queries are expected to take long time.
411        let mut channel_config = ChannelConfig {
412            timeout: None,
413            tcp_nodelay: opts.datanode.client.tcp_nodelay,
414            connect_timeout: Some(opts.datanode.client.connect_timeout),
415            ..Default::default()
416        };
417        if opts.grpc.flight_compression.transport_compression() {
418            channel_config.accept_compression = true;
419            channel_config.send_compression = true;
420        }
421        let client = Arc::new(NodeClients::new(channel_config));
422
423        let information_extension = Arc::new(DistributedInformationExtension::new(
424            meta_client.clone(),
425            client.clone(),
426        ));
427        plugins.insert::<InformationExtensionRef>(information_extension.clone());
428
429        let process_manager = Arc::new(ProcessManager::new(
430            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
431            Some(meta_client.clone()),
432        ));
433
434        let builder = KvBackendCatalogManagerBuilder::new(
435            information_extension,
436            cached_meta_backend.clone(),
437            layered_cache_registry.clone(),
438        )
439        .with_process_manager(process_manager.clone());
440        let builder = if let Some(configurator) =
441            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
442        {
443            let ctx = DistributedCatalogManagerConfigureContext {
444                meta_client: meta_client.clone(),
445            };
446            let ctx = CatalogManagerConfigureContext::Distributed(ctx);
447
448            configurator
449                .configure(builder, ctx)
450                .await
451                .context(OtherSnafu)?
452        } else {
453            builder
454        };
455        let catalog_manager = builder.build();
456
457        let instance = FrontendBuilder::new(
458            opts.clone(),
459            cached_meta_backend.clone(),
460            layered_cache_registry.clone(),
461            catalog_manager,
462            client,
463            meta_client.clone(),
464            process_manager,
465        )
466        .with_plugin(plugins.clone())
467        .with_local_cache_invalidator(layered_cache_registry)
468        .try_build()
469        .await
470        .context(error::StartFrontendSnafu)?;
471
472        let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
473
474        let instance = Arc::new(instance);
475
476        let servers = Services::new(opts, instance.clone(), plugins)
477            .build()
478            .context(error::StartFrontendSnafu)?;
479
480        let frontend = Frontend {
481            instance,
482            servers,
483            heartbeat_task,
484        };
485
486        Ok(Instance::new(frontend, guard))
487    }
488}
489
490pub fn create_heartbeat_task(
491    options: &frontend::frontend::FrontendOptions,
492    meta_client: MetaClientRef,
493    instance: &frontend::instance::Instance,
494) -> HeartbeatTask {
495    let executor = Arc::new(HandlerGroupExecutor::new(vec![
496        Arc::new(ParseMailboxMessageHandler),
497        Arc::new(SuspendHandler::new(instance.suspend_state())),
498        Arc::new(InvalidateCacheHandler::new(
499            instance.cache_invalidator().clone(),
500        )),
501    ]));
502
503    let stat = {
504        let mut stat = ResourceStatImpl::default();
505        stat.start_collect_cpu_usage();
506        Arc::new(stat)
507    };
508
509    HeartbeatTask::new(options, meta_client, executor, stat)
510}
511
512#[cfg(test)]
513mod tests {
514    use std::io::Write;
515    use std::time::Duration;
516
517    use auth::{Identity, Password, UserProviderRef};
518    use common_base::readable_size::ReadableSize;
519    use common_config::ENV_VAR_SEP;
520    use common_test_util::temp_dir::create_named_temp_file;
521    use servers::grpc::GrpcOptions;
522    use servers::http::HttpOptions;
523
524    use super::*;
525    use crate::options::GlobalOptions;
526
527    #[test]
528    fn test_try_from_start_command() {
529        let command = StartCommand {
530            http_addr: Some("127.0.0.1:1234".to_string()),
531            mysql_addr: Some("127.0.0.1:5678".to_string()),
532            postgres_addr: Some("127.0.0.1:5432".to_string()),
533            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
534            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
535            influxdb_enable: Some(false),
536            disable_dashboard: Some(false),
537            ..Default::default()
538        };
539
540        let opts = command.load_options(&Default::default()).unwrap().component;
541
542        assert_eq!(opts.http.addr, "127.0.0.1:1234");
543        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
544        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
545        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
546
547        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
548        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
549        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
550
551        let default_opts = FrontendOptions::default().component;
552
553        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
554        assert!(opts.mysql.enable);
555        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
556        assert!(opts.postgres.enable);
557        assert_eq!(
558            opts.postgres.runtime_size,
559            default_opts.postgres.runtime_size
560        );
561        assert!(opts.opentsdb.enable);
562
563        assert!(!opts.influxdb.enable);
564    }
565
566    #[test]
567    fn test_read_from_config_file() {
568        let mut file = create_named_temp_file();
569        let toml_str = r#"
570            [http]
571            addr = "127.0.0.1:4000"
572            timeout = "0s"
573            body_limit = "2GB"
574
575            [opentsdb]
576            enable = false
577
578            [logging]
579            level = "debug"
580            dir = "./greptimedb_data/test/logs"
581        "#;
582        write!(file, "{}", toml_str).unwrap();
583
584        let command = StartCommand {
585            config_file: Some(file.path().to_str().unwrap().to_string()),
586            disable_dashboard: Some(false),
587            ..Default::default()
588        };
589
590        let fe_opts = command.load_options(&Default::default()).unwrap().component;
591
592        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
593        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
594
595        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
596
597        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
598        assert_eq!(
599            "./greptimedb_data/test/logs".to_string(),
600            fe_opts.logging.dir
601        );
602        assert!(!fe_opts.opentsdb.enable);
603    }
604
605    #[tokio::test]
606    async fn test_try_from_start_command_to_anymap() {
607        let fe_opts = frontend::frontend::FrontendOptions {
608            http: HttpOptions {
609                disable_dashboard: false,
610                ..Default::default()
611            },
612            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
613            ..Default::default()
614        };
615
616        let mut plugins = Plugins::new();
617        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
618            .await
619            .unwrap();
620
621        let provider = plugins.get::<UserProviderRef>().unwrap();
622        let result = provider
623            .authenticate(
624                Identity::UserId("test", None),
625                Password::PlainText("test".to_string().into()),
626            )
627            .await;
628        let _ = result.unwrap();
629    }
630
631    #[test]
632    fn test_load_log_options_from_cli() {
633        let cmd = StartCommand {
634            disable_dashboard: Some(false),
635            ..Default::default()
636        };
637
638        let options = cmd
639            .load_options(&GlobalOptions {
640                log_dir: Some("./greptimedb_data/test/logs".to_string()),
641                log_level: Some("debug".to_string()),
642
643                #[cfg(feature = "tokio-console")]
644                tokio_console_addr: None,
645            })
646            .unwrap()
647            .component;
648
649        let logging_opt = options.logging;
650        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
651        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
652    }
653
654    #[test]
655    fn test_config_precedence_order() {
656        let mut file = create_named_temp_file();
657        let toml_str = r#"
658            [http]
659            addr = "127.0.0.1:4000"
660
661            [meta_client]
662            timeout = "3s"
663            connect_timeout = "5s"
664            tcp_nodelay = true
665
666            [mysql]
667            addr = "127.0.0.1:4002"
668        "#;
669        write!(file, "{}", toml_str).unwrap();
670
671        let env_prefix = "FRONTEND_UT";
672        temp_env::with_vars(
673            [
674                (
675                    // mysql.addr = 127.0.0.1:14002
676                    [
677                        env_prefix.to_string(),
678                        "mysql".to_uppercase(),
679                        "addr".to_uppercase(),
680                    ]
681                    .join(ENV_VAR_SEP),
682                    Some("127.0.0.1:14002"),
683                ),
684                (
685                    // mysql.runtime_size = 11
686                    [
687                        env_prefix.to_string(),
688                        "mysql".to_uppercase(),
689                        "runtime_size".to_uppercase(),
690                    ]
691                    .join(ENV_VAR_SEP),
692                    Some("11"),
693                ),
694                (
695                    // http.addr = 127.0.0.1:24000
696                    [
697                        env_prefix.to_string(),
698                        "http".to_uppercase(),
699                        "addr".to_uppercase(),
700                    ]
701                    .join(ENV_VAR_SEP),
702                    Some("127.0.0.1:24000"),
703                ),
704                (
705                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
706                    [
707                        env_prefix.to_string(),
708                        "meta_client".to_uppercase(),
709                        "metasrv_addrs".to_uppercase(),
710                    ]
711                    .join(ENV_VAR_SEP),
712                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
713                ),
714            ],
715            || {
716                let command = StartCommand {
717                    config_file: Some(file.path().to_str().unwrap().to_string()),
718                    http_addr: Some("127.0.0.1:14000".to_string()),
719                    env_prefix: env_prefix.to_string(),
720                    ..Default::default()
721                };
722
723                let fe_opts = command.load_options(&Default::default()).unwrap().component;
724
725                // Should be read from env, env > default values.
726                assert_eq!(fe_opts.mysql.runtime_size, 11);
727                assert_eq!(
728                    fe_opts.meta_client.unwrap().metasrv_addrs,
729                    vec![
730                        "127.0.0.1:3001".to_string(),
731                        "127.0.0.1:3002".to_string(),
732                        "127.0.0.1:3003".to_string()
733                    ]
734                );
735
736                // Should be read from config file, config file > env > default values.
737                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
738
739                // Should be read from cli, cli > config file > env > default values.
740                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
741
742                // Should be default value.
743                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
744            },
745        );
746    }
747}