cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{
25    CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
26    MetaKvBackend,
27};
28use catalog::process_manager::ProcessManager;
29use clap::Parser;
30use client::client_manager::NodeClients;
31use common_base::Plugins;
32use common_config::{Configurable, DEFAULT_DATA_HOME};
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::ChannelConfig;
35use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
36use common_meta::heartbeat::handler::HandlerGroupExecutor;
37use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
38use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
39use common_meta::heartbeat::handler::suspend::SuspendHandler;
40use common_query::prelude::set_default_prefix;
41use common_stat::ResourceStatImpl;
42use common_telemetry::info;
43use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
44use common_time::timezone::set_default_timezone;
45use common_version::{short_version, verbose_version};
46use frontend::frontend::Frontend;
47use frontend::heartbeat::HeartbeatTask;
48use frontend::instance::builder::FrontendBuilder;
49use frontend::server::Services;
50use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
51use plugins::frontend::context::{
52    CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
53};
54use servers::addrs;
55use servers::grpc::GrpcOptions;
56use servers::tls::{TlsMode, TlsOption, merge_tls_option};
57use snafu::{OptionExt, ResultExt};
58use tracing_appender::non_blocking::WorkerGuard;
59
60use crate::error::{self, OtherSnafu, Result};
61use crate::options::{GlobalOptions, GreptimeOptions};
62use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
63
64type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
65
66pub struct Instance {
67    frontend: Frontend,
68    // Keep the logging guard to prevent the worker from being dropped.
69    _guard: Vec<WorkerGuard>,
70}
71
72pub const APP_NAME: &str = "greptime-frontend";
73
74impl Instance {
75    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
76        Self { frontend, _guard }
77    }
78
79    pub fn inner(&self) -> &Frontend {
80        &self.frontend
81    }
82
83    pub fn mut_inner(&mut self) -> &mut Frontend {
84        &mut self.frontend
85    }
86}
87
88#[async_trait]
89impl App for Instance {
90    fn name(&self) -> &str {
91        APP_NAME
92    }
93
94    async fn start(&mut self) -> Result<()> {
95        let plugins = self.frontend.instance.plugins().clone();
96        plugins::start_frontend_plugins(plugins)
97            .await
98            .context(error::StartFrontendSnafu)?;
99
100        self.frontend
101            .start()
102            .await
103            .context(error::StartFrontendSnafu)
104    }
105
106    async fn stop(&mut self) -> Result<()> {
107        self.frontend
108            .shutdown()
109            .await
110            .context(error::ShutdownFrontendSnafu)
111    }
112}
113
114#[derive(Parser)]
115pub struct Command {
116    #[clap(subcommand)]
117    pub subcmd: SubCommand,
118}
119
120impl Command {
121    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
122        self.subcmd.build(opts).await
123    }
124
125    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
126        self.subcmd.load_options(global_options)
127    }
128}
129
130#[derive(Parser)]
131pub enum SubCommand {
132    Start(StartCommand),
133}
134
135impl SubCommand {
136    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
137        match self {
138            SubCommand::Start(cmd) => cmd.build(opts).await,
139        }
140    }
141
142    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
143        match self {
144            SubCommand::Start(cmd) => cmd.load_options(global_options),
145        }
146    }
147}
148
149#[derive(Debug, Default, Parser)]
150pub struct StartCommand {
151    /// The address to bind the gRPC server.
152    #[clap(long, alias = "rpc-addr")]
153    rpc_bind_addr: Option<String>,
154    /// The address advertised to the metasrv, and used for connections from outside the host.
155    /// If left empty or unset, the server will automatically use the IP address of the first network interface
156    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
157    #[clap(long, alias = "rpc-hostname")]
158    rpc_server_addr: Option<String>,
159    /// The address to bind the internal gRPC server.
160    #[clap(long, alias = "internal-rpc-addr")]
161    internal_rpc_bind_addr: Option<String>,
162    /// The address advertised to the metasrv, and used for connections from outside the host.
163    /// If left empty or unset, the server will automatically use the IP address of the first network interface
164    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
165    #[clap(long, alias = "internal-rpc-hostname")]
166    internal_rpc_server_addr: Option<String>,
167    #[clap(long)]
168    http_addr: Option<String>,
169    #[clap(long)]
170    http_timeout: Option<u64>,
171    #[clap(long)]
172    mysql_addr: Option<String>,
173    #[clap(long)]
174    postgres_addr: Option<String>,
175    #[clap(short, long)]
176    pub config_file: Option<String>,
177    #[clap(short, long)]
178    influxdb_enable: Option<bool>,
179    #[clap(long, value_delimiter = ',', num_args = 1..)]
180    metasrv_addrs: Option<Vec<String>>,
181    #[clap(long)]
182    tls_mode: Option<TlsMode>,
183    #[clap(long)]
184    tls_cert_path: Option<String>,
185    #[clap(long)]
186    tls_key_path: Option<String>,
187    #[clap(long)]
188    tls_watch: bool,
189    #[clap(long)]
190    user_provider: Option<String>,
191    #[clap(long)]
192    disable_dashboard: Option<bool>,
193    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
194    pub env_prefix: String,
195}
196
197impl StartCommand {
198    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
199        let mut opts = FrontendOptions::load_layered_options(
200            self.config_file.as_deref(),
201            self.env_prefix.as_ref(),
202        )
203        .context(error::LoadLayeredConfigSnafu)?;
204
205        self.merge_with_cli_options(global_options, &mut opts)?;
206
207        Ok(opts)
208    }
209
210    // The precedence order is: cli > config file > environment variables > default values.
211    fn merge_with_cli_options(
212        &self,
213        global_options: &GlobalOptions,
214        opts: &mut FrontendOptions,
215    ) -> Result<()> {
216        let opts = &mut opts.component;
217
218        if let Some(dir) = &global_options.log_dir {
219            opts.logging.dir.clone_from(dir);
220        }
221
222        // If the logging dir is not set, use the default logs dir in the data home.
223        if opts.logging.dir.is_empty() {
224            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
225                .join(DEFAULT_LOGGING_DIR)
226                .to_string_lossy()
227                .to_string();
228        }
229
230        if global_options.log_level.is_some() {
231            opts.logging.level.clone_from(&global_options.log_level);
232        }
233
234        opts.tracing = TracingOptions {
235            #[cfg(feature = "tokio-console")]
236            tokio_console_addr: global_options.tokio_console_addr.clone(),
237        };
238
239        let tls_opts = TlsOption::new(
240            self.tls_mode,
241            self.tls_cert_path.clone(),
242            self.tls_key_path.clone(),
243            self.tls_watch,
244        );
245
246        if let Some(addr) = &self.http_addr {
247            opts.http.addr.clone_from(addr);
248        }
249
250        if let Some(http_timeout) = self.http_timeout {
251            opts.http.timeout = Duration::from_secs(http_timeout)
252        }
253
254        if let Some(disable_dashboard) = self.disable_dashboard {
255            opts.http.disable_dashboard = disable_dashboard;
256        }
257
258        if let Some(addr) = &self.rpc_bind_addr {
259            opts.grpc.bind_addr.clone_from(addr);
260            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
261        }
262
263        if let Some(addr) = &self.rpc_server_addr {
264            opts.grpc.server_addr.clone_from(addr);
265        }
266
267        if let Some(addr) = &self.internal_rpc_bind_addr {
268            if let Some(internal_grpc) = &mut opts.internal_grpc {
269                internal_grpc.bind_addr = addr.clone();
270            } else {
271                let grpc_options = GrpcOptions {
272                    bind_addr: addr.clone(),
273                    ..Default::default()
274                };
275
276                opts.internal_grpc = Some(grpc_options);
277            }
278        }
279
280        if let Some(addr) = &self.internal_rpc_server_addr {
281            if let Some(internal_grpc) = &mut opts.internal_grpc {
282                internal_grpc.server_addr = addr.clone();
283            } else {
284                let grpc_options = GrpcOptions {
285                    server_addr: addr.clone(),
286                    ..Default::default()
287                };
288                opts.internal_grpc = Some(grpc_options);
289            }
290        }
291
292        if let Some(addr) = &self.mysql_addr {
293            opts.mysql.enable = true;
294            opts.mysql.addr.clone_from(addr);
295            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
296        }
297
298        if let Some(addr) = &self.postgres_addr {
299            opts.postgres.enable = true;
300            opts.postgres.addr.clone_from(addr);
301            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
302        }
303
304        if let Some(enable) = self.influxdb_enable {
305            opts.influxdb.enable = enable;
306        }
307
308        if let Some(metasrv_addrs) = &self.metasrv_addrs {
309            opts.meta_client
310                .get_or_insert_with(MetaClientOptions::default)
311                .metasrv_addrs
312                .clone_from(metasrv_addrs);
313        }
314
315        if let Some(user_provider) = &self.user_provider {
316            opts.user_provider = Some(user_provider.clone());
317        }
318
319        Ok(())
320    }
321
322    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
323        common_runtime::init_global_runtimes(&opts.runtime);
324
325        let guard = common_telemetry::init_global_logging(
326            APP_NAME,
327            &opts.component.logging,
328            &opts.component.tracing,
329            opts.component.node_id.clone(),
330            Some(&opts.component.slow_query),
331        );
332
333        log_versions(verbose_version(), short_version(), APP_NAME);
334        maybe_activate_heap_profile(&opts.component.memory);
335        create_resource_limit_metrics(APP_NAME);
336
337        info!("Frontend start command: {:#?}", self);
338        info!("Frontend options: {:#?}", opts);
339
340        let plugin_opts = opts.plugins;
341        let mut opts = opts.component;
342        opts.grpc.detect_server_addr();
343        let mut plugins = Plugins::new();
344        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
345            .await
346            .context(error::StartFrontendSnafu)?;
347
348        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
349        set_default_prefix(opts.default_column_prefix.as_deref())
350            .map_err(BoxedError::new)
351            .context(error::BuildCliSnafu)?;
352
353        let meta_client_options = opts
354            .meta_client
355            .as_ref()
356            .context(error::MissingConfigSnafu {
357                msg: "'meta_client'",
358            })?;
359
360        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
361        let cache_ttl = meta_client_options.metadata_cache_ttl;
362        let cache_tti = meta_client_options.metadata_cache_tti;
363
364        let meta_client = meta_client::create_meta_client(
365            MetaClientType::Frontend,
366            meta_client_options,
367            Some(&plugins),
368            None,
369        )
370        .await
371        .context(error::MetaClientInitSnafu)?;
372
373        // TODO(discord9): add helper function to ease the creation of cache registry&such
374        let cached_meta_backend =
375            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
376                .cache_max_capacity(cache_max_capacity)
377                .cache_ttl(cache_ttl)
378                .cache_tti(cache_tti)
379                .build();
380        let cached_meta_backend = Arc::new(cached_meta_backend);
381
382        // Builds cache registry
383        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
384            CacheRegistryBuilder::default()
385                .add_cache(cached_meta_backend.clone())
386                .build(),
387        );
388        let fundamental_cache_registry =
389            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
390        let layered_cache_registry = Arc::new(
391            with_default_composite_cache_registry(
392                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
393            )
394            .context(error::BuildCacheRegistrySnafu)?
395            .build(),
396        );
397
398        // frontend to datanode need not timeout.
399        // Some queries are expected to take long time.
400        let mut channel_config = ChannelConfig {
401            timeout: None,
402            tcp_nodelay: opts.datanode.client.tcp_nodelay,
403            connect_timeout: Some(opts.datanode.client.connect_timeout),
404            ..Default::default()
405        };
406        if opts.grpc.flight_compression.transport_compression() {
407            channel_config.accept_compression = true;
408            channel_config.send_compression = true;
409        }
410        let client = Arc::new(NodeClients::new(channel_config));
411
412        let information_extension = Arc::new(DistributedInformationExtension::new(
413            meta_client.clone(),
414            client.clone(),
415        ));
416        plugins.insert::<InformationExtensionRef>(information_extension.clone());
417
418        let process_manager = Arc::new(ProcessManager::new(
419            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
420            Some(meta_client.clone()),
421        ));
422
423        let builder = KvBackendCatalogManagerBuilder::new(
424            information_extension,
425            cached_meta_backend.clone(),
426            layered_cache_registry.clone(),
427        )
428        .with_process_manager(process_manager.clone());
429        let builder = if let Some(configurator) =
430            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
431        {
432            let ctx = DistributedCatalogManagerConfigureContext {
433                meta_client: meta_client.clone(),
434            };
435            let ctx = CatalogManagerConfigureContext::Distributed(ctx);
436
437            configurator
438                .configure(builder, ctx)
439                .await
440                .context(OtherSnafu)?
441        } else {
442            builder
443        };
444        let catalog_manager = builder.build();
445
446        let instance = FrontendBuilder::new(
447            opts.clone(),
448            cached_meta_backend.clone(),
449            layered_cache_registry.clone(),
450            catalog_manager,
451            client,
452            meta_client.clone(),
453            process_manager,
454        )
455        .with_plugin(plugins.clone())
456        .with_local_cache_invalidator(layered_cache_registry)
457        .try_build()
458        .await
459        .context(error::StartFrontendSnafu)?;
460
461        let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
462
463        let instance = Arc::new(instance);
464
465        let servers = Services::new(opts, instance.clone(), plugins)
466            .build()
467            .context(error::StartFrontendSnafu)?;
468
469        let frontend = Frontend {
470            instance,
471            servers,
472            heartbeat_task,
473        };
474
475        Ok(Instance::new(frontend, guard))
476    }
477}
478
479pub fn create_heartbeat_task(
480    options: &frontend::frontend::FrontendOptions,
481    meta_client: MetaClientRef,
482    instance: &frontend::instance::Instance,
483) -> HeartbeatTask {
484    let executor = Arc::new(HandlerGroupExecutor::new(vec![
485        Arc::new(ParseMailboxMessageHandler),
486        Arc::new(SuspendHandler::new(instance.suspend_state())),
487        Arc::new(InvalidateCacheHandler::new(
488            instance.cache_invalidator().clone(),
489        )),
490    ]));
491
492    let stat = {
493        let mut stat = ResourceStatImpl::default();
494        stat.start_collect_cpu_usage();
495        Arc::new(stat)
496    };
497
498    HeartbeatTask::new(options, meta_client, executor, stat)
499}
500
501#[cfg(test)]
502mod tests {
503    use std::io::Write;
504    use std::time::Duration;
505
506    use auth::{Identity, Password, UserProviderRef};
507    use common_base::readable_size::ReadableSize;
508    use common_config::ENV_VAR_SEP;
509    use common_test_util::temp_dir::create_named_temp_file;
510    use servers::grpc::GrpcOptions;
511    use servers::http::HttpOptions;
512
513    use super::*;
514    use crate::options::GlobalOptions;
515
516    #[test]
517    fn test_try_from_start_command() {
518        let command = StartCommand {
519            http_addr: Some("127.0.0.1:1234".to_string()),
520            mysql_addr: Some("127.0.0.1:5678".to_string()),
521            postgres_addr: Some("127.0.0.1:5432".to_string()),
522            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
523            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
524            influxdb_enable: Some(false),
525            disable_dashboard: Some(false),
526            ..Default::default()
527        };
528
529        let opts = command.load_options(&Default::default()).unwrap().component;
530
531        assert_eq!(opts.http.addr, "127.0.0.1:1234");
532        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
533        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
534        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
535
536        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
537        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
538        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
539
540        let default_opts = FrontendOptions::default().component;
541
542        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
543        assert!(opts.mysql.enable);
544        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
545        assert!(opts.postgres.enable);
546        assert_eq!(
547            opts.postgres.runtime_size,
548            default_opts.postgres.runtime_size
549        );
550        assert!(opts.opentsdb.enable);
551
552        assert!(!opts.influxdb.enable);
553    }
554
555    #[test]
556    fn test_read_from_config_file() {
557        let mut file = create_named_temp_file();
558        let toml_str = r#"
559            [http]
560            addr = "127.0.0.1:4000"
561            timeout = "0s"
562            body_limit = "2GB"
563
564            [opentsdb]
565            enable = false
566
567            [logging]
568            level = "debug"
569            dir = "./greptimedb_data/test/logs"
570        "#;
571        write!(file, "{}", toml_str).unwrap();
572
573        let command = StartCommand {
574            config_file: Some(file.path().to_str().unwrap().to_string()),
575            disable_dashboard: Some(false),
576            ..Default::default()
577        };
578
579        let fe_opts = command.load_options(&Default::default()).unwrap().component;
580
581        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
582        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
583
584        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
585
586        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
587        assert_eq!(
588            "./greptimedb_data/test/logs".to_string(),
589            fe_opts.logging.dir
590        );
591        assert!(!fe_opts.opentsdb.enable);
592    }
593
594    #[tokio::test]
595    async fn test_try_from_start_command_to_anymap() {
596        let fe_opts = frontend::frontend::FrontendOptions {
597            http: HttpOptions {
598                disable_dashboard: false,
599                ..Default::default()
600            },
601            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
602            ..Default::default()
603        };
604
605        let mut plugins = Plugins::new();
606        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
607            .await
608            .unwrap();
609
610        let provider = plugins.get::<UserProviderRef>().unwrap();
611        let result = provider
612            .authenticate(
613                Identity::UserId("test", None),
614                Password::PlainText("test".to_string().into()),
615            )
616            .await;
617        let _ = result.unwrap();
618    }
619
620    #[test]
621    fn test_load_log_options_from_cli() {
622        let cmd = StartCommand {
623            disable_dashboard: Some(false),
624            ..Default::default()
625        };
626
627        let options = cmd
628            .load_options(&GlobalOptions {
629                log_dir: Some("./greptimedb_data/test/logs".to_string()),
630                log_level: Some("debug".to_string()),
631
632                #[cfg(feature = "tokio-console")]
633                tokio_console_addr: None,
634            })
635            .unwrap()
636            .component;
637
638        let logging_opt = options.logging;
639        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
640        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
641    }
642
643    #[test]
644    fn test_config_precedence_order() {
645        let mut file = create_named_temp_file();
646        let toml_str = r#"
647            [http]
648            addr = "127.0.0.1:4000"
649
650            [meta_client]
651            timeout = "3s"
652            connect_timeout = "5s"
653            tcp_nodelay = true
654
655            [mysql]
656            addr = "127.0.0.1:4002"
657        "#;
658        write!(file, "{}", toml_str).unwrap();
659
660        let env_prefix = "FRONTEND_UT";
661        temp_env::with_vars(
662            [
663                (
664                    // mysql.addr = 127.0.0.1:14002
665                    [
666                        env_prefix.to_string(),
667                        "mysql".to_uppercase(),
668                        "addr".to_uppercase(),
669                    ]
670                    .join(ENV_VAR_SEP),
671                    Some("127.0.0.1:14002"),
672                ),
673                (
674                    // mysql.runtime_size = 11
675                    [
676                        env_prefix.to_string(),
677                        "mysql".to_uppercase(),
678                        "runtime_size".to_uppercase(),
679                    ]
680                    .join(ENV_VAR_SEP),
681                    Some("11"),
682                ),
683                (
684                    // http.addr = 127.0.0.1:24000
685                    [
686                        env_prefix.to_string(),
687                        "http".to_uppercase(),
688                        "addr".to_uppercase(),
689                    ]
690                    .join(ENV_VAR_SEP),
691                    Some("127.0.0.1:24000"),
692                ),
693                (
694                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
695                    [
696                        env_prefix.to_string(),
697                        "meta_client".to_uppercase(),
698                        "metasrv_addrs".to_uppercase(),
699                    ]
700                    .join(ENV_VAR_SEP),
701                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
702                ),
703            ],
704            || {
705                let command = StartCommand {
706                    config_file: Some(file.path().to_str().unwrap().to_string()),
707                    http_addr: Some("127.0.0.1:14000".to_string()),
708                    env_prefix: env_prefix.to_string(),
709                    ..Default::default()
710                };
711
712                let fe_opts = command.load_options(&Default::default()).unwrap().component;
713
714                // Should be read from env, env > default values.
715                assert_eq!(fe_opts.mysql.runtime_size, 11);
716                assert_eq!(
717                    fe_opts.meta_client.unwrap().metasrv_addrs,
718                    vec![
719                        "127.0.0.1:3001".to_string(),
720                        "127.0.0.1:3002".to_string(),
721                        "127.0.0.1:3003".to_string()
722                    ]
723                );
724
725                // Should be read from config file, config file > env > default values.
726                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
727
728                // Should be read from cli, cli > config file > env > default values.
729                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
730
731                // Should be default value.
732                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
733            },
734        );
735    }
736}