cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::kvbackend::{
24    CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
25    MetaKvBackend,
26};
27use catalog::process_manager::ProcessManager;
28use clap::Parser;
29use client::client_manager::NodeClients;
30use common_base::Plugins;
31use common_config::{Configurable, DEFAULT_DATA_HOME};
32use common_error::ext::BoxedError;
33use common_grpc::channel_manager::ChannelConfig;
34use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
35use common_meta::heartbeat::handler::HandlerGroupExecutor;
36use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
37use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
38use common_meta::heartbeat::handler::suspend::SuspendHandler;
39use common_query::prelude::set_default_prefix;
40use common_stat::ResourceStatImpl;
41use common_telemetry::info;
42use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
43use common_time::timezone::set_default_timezone;
44use common_version::{short_version, verbose_version};
45use frontend::frontend::Frontend;
46use frontend::heartbeat::HeartbeatTask;
47use frontend::instance::builder::FrontendBuilder;
48use frontend::server::Services;
49use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
50use plugins::frontend::context::{
51    CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
52};
53use servers::addrs;
54use servers::grpc::GrpcOptions;
55use servers::tls::{TlsMode, TlsOption, merge_tls_option};
56use snafu::{OptionExt, ResultExt};
57use tracing_appender::non_blocking::WorkerGuard;
58
59use crate::error::{self, OtherSnafu, Result};
60use crate::options::{GlobalOptions, GreptimeOptions};
61use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
62
63type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
64
65pub struct Instance {
66    frontend: Frontend,
67    // Keep the logging guard to prevent the worker from being dropped.
68    _guard: Vec<WorkerGuard>,
69}
70
71pub const APP_NAME: &str = "greptime-frontend";
72
73impl Instance {
74    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
75        Self { frontend, _guard }
76    }
77
78    pub fn inner(&self) -> &Frontend {
79        &self.frontend
80    }
81
82    pub fn mut_inner(&mut self) -> &mut Frontend {
83        &mut self.frontend
84    }
85}
86
87#[async_trait]
88impl App for Instance {
89    fn name(&self) -> &str {
90        APP_NAME
91    }
92
93    async fn start(&mut self) -> Result<()> {
94        let plugins = self.frontend.instance.plugins().clone();
95        plugins::start_frontend_plugins(plugins)
96            .await
97            .context(error::StartFrontendSnafu)?;
98
99        self.frontend
100            .start()
101            .await
102            .context(error::StartFrontendSnafu)
103    }
104
105    async fn stop(&mut self) -> Result<()> {
106        self.frontend
107            .shutdown()
108            .await
109            .context(error::ShutdownFrontendSnafu)
110    }
111}
112
113#[derive(Parser)]
114pub struct Command {
115    #[clap(subcommand)]
116    pub subcmd: SubCommand,
117}
118
119impl Command {
120    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
121        self.subcmd.build(opts).await
122    }
123
124    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
125        self.subcmd.load_options(global_options)
126    }
127}
128
129#[derive(Parser)]
130pub enum SubCommand {
131    Start(StartCommand),
132}
133
134impl SubCommand {
135    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
136        match self {
137            SubCommand::Start(cmd) => cmd.build(opts).await,
138        }
139    }
140
141    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
142        match self {
143            SubCommand::Start(cmd) => cmd.load_options(global_options),
144        }
145    }
146}
147
148#[derive(Debug, Default, Parser)]
149pub struct StartCommand {
150    /// The address to bind the gRPC server.
151    #[clap(long, alias = "rpc-addr")]
152    rpc_bind_addr: Option<String>,
153    /// The address advertised to the metasrv, and used for connections from outside the host.
154    /// If left empty or unset, the server will automatically use the IP address of the first network interface
155    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
156    #[clap(long, alias = "rpc-hostname")]
157    rpc_server_addr: Option<String>,
158    /// The address to bind the internal gRPC server.
159    #[clap(long, alias = "internal-rpc-addr")]
160    internal_rpc_bind_addr: Option<String>,
161    /// The address advertised to the metasrv, and used for connections from outside the host.
162    /// If left empty or unset, the server will automatically use the IP address of the first network interface
163    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
164    #[clap(long, alias = "internal-rpc-hostname")]
165    internal_rpc_server_addr: Option<String>,
166    #[clap(long)]
167    http_addr: Option<String>,
168    #[clap(long)]
169    http_timeout: Option<u64>,
170    #[clap(long)]
171    mysql_addr: Option<String>,
172    #[clap(long)]
173    postgres_addr: Option<String>,
174    #[clap(short, long)]
175    pub config_file: Option<String>,
176    #[clap(short, long)]
177    influxdb_enable: Option<bool>,
178    #[clap(long, value_delimiter = ',', num_args = 1..)]
179    metasrv_addrs: Option<Vec<String>>,
180    #[clap(long)]
181    tls_mode: Option<TlsMode>,
182    #[clap(long)]
183    tls_cert_path: Option<String>,
184    #[clap(long)]
185    tls_key_path: Option<String>,
186    #[clap(long)]
187    tls_watch: bool,
188    #[clap(long)]
189    user_provider: Option<String>,
190    #[clap(long)]
191    disable_dashboard: Option<bool>,
192    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
193    pub env_prefix: String,
194}
195
196impl StartCommand {
197    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
198        let mut opts = FrontendOptions::load_layered_options(
199            self.config_file.as_deref(),
200            self.env_prefix.as_ref(),
201        )
202        .context(error::LoadLayeredConfigSnafu)?;
203
204        self.merge_with_cli_options(global_options, &mut opts)?;
205
206        Ok(opts)
207    }
208
209    // The precedence order is: cli > config file > environment variables > default values.
210    fn merge_with_cli_options(
211        &self,
212        global_options: &GlobalOptions,
213        opts: &mut FrontendOptions,
214    ) -> Result<()> {
215        let opts = &mut opts.component;
216
217        if let Some(dir) = &global_options.log_dir {
218            opts.logging.dir.clone_from(dir);
219        }
220
221        // If the logging dir is not set, use the default logs dir in the data home.
222        if opts.logging.dir.is_empty() {
223            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
224                .join(DEFAULT_LOGGING_DIR)
225                .to_string_lossy()
226                .to_string();
227        }
228
229        if global_options.log_level.is_some() {
230            opts.logging.level.clone_from(&global_options.log_level);
231        }
232
233        opts.tracing = TracingOptions {
234            #[cfg(feature = "tokio-console")]
235            tokio_console_addr: global_options.tokio_console_addr.clone(),
236        };
237
238        let tls_opts = TlsOption::new(
239            self.tls_mode.clone(),
240            self.tls_cert_path.clone(),
241            self.tls_key_path.clone(),
242            self.tls_watch,
243        );
244
245        if let Some(addr) = &self.http_addr {
246            opts.http.addr.clone_from(addr);
247        }
248
249        if let Some(http_timeout) = self.http_timeout {
250            opts.http.timeout = Duration::from_secs(http_timeout)
251        }
252
253        if let Some(disable_dashboard) = self.disable_dashboard {
254            opts.http.disable_dashboard = disable_dashboard;
255        }
256
257        if let Some(addr) = &self.rpc_bind_addr {
258            opts.grpc.bind_addr.clone_from(addr);
259            opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
260        }
261
262        if let Some(addr) = &self.rpc_server_addr {
263            opts.grpc.server_addr.clone_from(addr);
264        }
265
266        if let Some(addr) = &self.internal_rpc_bind_addr {
267            if let Some(internal_grpc) = &mut opts.internal_grpc {
268                internal_grpc.bind_addr = addr.clone();
269            } else {
270                let grpc_options = GrpcOptions {
271                    bind_addr: addr.clone(),
272                    ..Default::default()
273                };
274
275                opts.internal_grpc = Some(grpc_options);
276            }
277        }
278
279        if let Some(addr) = &self.internal_rpc_server_addr {
280            if let Some(internal_grpc) = &mut opts.internal_grpc {
281                internal_grpc.server_addr = addr.clone();
282            } else {
283                let grpc_options = GrpcOptions {
284                    server_addr: addr.clone(),
285                    ..Default::default()
286                };
287                opts.internal_grpc = Some(grpc_options);
288            }
289        }
290
291        if let Some(addr) = &self.mysql_addr {
292            opts.mysql.enable = true;
293            opts.mysql.addr.clone_from(addr);
294            opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
295        }
296
297        if let Some(addr) = &self.postgres_addr {
298            opts.postgres.enable = true;
299            opts.postgres.addr.clone_from(addr);
300            opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
301        }
302
303        if let Some(enable) = self.influxdb_enable {
304            opts.influxdb.enable = enable;
305        }
306
307        if let Some(metasrv_addrs) = &self.metasrv_addrs {
308            opts.meta_client
309                .get_or_insert_with(MetaClientOptions::default)
310                .metasrv_addrs
311                .clone_from(metasrv_addrs);
312        }
313
314        if let Some(user_provider) = &self.user_provider {
315            opts.user_provider = Some(user_provider.clone());
316        }
317
318        Ok(())
319    }
320
321    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
322        common_runtime::init_global_runtimes(&opts.runtime);
323
324        let guard = common_telemetry::init_global_logging(
325            APP_NAME,
326            &opts.component.logging,
327            &opts.component.tracing,
328            opts.component.node_id.clone(),
329            Some(&opts.component.slow_query),
330        );
331
332        log_versions(verbose_version(), short_version(), APP_NAME);
333        maybe_activate_heap_profile(&opts.component.memory);
334        create_resource_limit_metrics(APP_NAME);
335
336        info!("Frontend start command: {:#?}", self);
337        info!("Frontend options: {:#?}", opts);
338
339        let plugin_opts = opts.plugins;
340        let mut opts = opts.component;
341        opts.grpc.detect_server_addr();
342        let mut plugins = Plugins::new();
343        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
344            .await
345            .context(error::StartFrontendSnafu)?;
346
347        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
348        set_default_prefix(opts.default_column_prefix.as_deref())
349            .map_err(BoxedError::new)
350            .context(error::BuildCliSnafu)?;
351
352        let meta_client_options = opts
353            .meta_client
354            .as_ref()
355            .context(error::MissingConfigSnafu {
356                msg: "'meta_client'",
357            })?;
358
359        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
360        let cache_ttl = meta_client_options.metadata_cache_ttl;
361        let cache_tti = meta_client_options.metadata_cache_tti;
362
363        let meta_client = meta_client::create_meta_client(
364            MetaClientType::Frontend,
365            meta_client_options,
366            Some(&plugins),
367            None,
368        )
369        .await
370        .context(error::MetaClientInitSnafu)?;
371
372        // TODO(discord9): add helper function to ease the creation of cache registry&such
373        let cached_meta_backend =
374            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
375                .cache_max_capacity(cache_max_capacity)
376                .cache_ttl(cache_ttl)
377                .cache_tti(cache_tti)
378                .build();
379        let cached_meta_backend = Arc::new(cached_meta_backend);
380
381        // Builds cache registry
382        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
383            CacheRegistryBuilder::default()
384                .add_cache(cached_meta_backend.clone())
385                .build(),
386        );
387        let fundamental_cache_registry =
388            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
389        let layered_cache_registry = Arc::new(
390            with_default_composite_cache_registry(
391                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
392            )
393            .context(error::BuildCacheRegistrySnafu)?
394            .build(),
395        );
396
397        // frontend to datanode need not timeout.
398        // Some queries are expected to take long time.
399        let mut channel_config = ChannelConfig {
400            timeout: None,
401            tcp_nodelay: opts.datanode.client.tcp_nodelay,
402            connect_timeout: Some(opts.datanode.client.connect_timeout),
403            ..Default::default()
404        };
405        if opts.grpc.flight_compression.transport_compression() {
406            channel_config.accept_compression = true;
407            channel_config.send_compression = true;
408        }
409        let client = Arc::new(NodeClients::new(channel_config));
410
411        let information_extension = Arc::new(DistributedInformationExtension::new(
412            meta_client.clone(),
413            client.clone(),
414        ));
415
416        let process_manager = Arc::new(ProcessManager::new(
417            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
418            Some(meta_client.clone()),
419        ));
420
421        let builder = KvBackendCatalogManagerBuilder::new(
422            information_extension,
423            cached_meta_backend.clone(),
424            layered_cache_registry.clone(),
425        )
426        .with_process_manager(process_manager.clone());
427        let builder = if let Some(configurator) =
428            plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
429        {
430            let ctx = DistributedCatalogManagerConfigureContext {
431                meta_client: meta_client.clone(),
432            };
433            let ctx = CatalogManagerConfigureContext::Distributed(ctx);
434
435            configurator
436                .configure(builder, ctx)
437                .await
438                .context(OtherSnafu)?
439        } else {
440            builder
441        };
442        let catalog_manager = builder.build();
443
444        let instance = FrontendBuilder::new(
445            opts.clone(),
446            cached_meta_backend.clone(),
447            layered_cache_registry.clone(),
448            catalog_manager,
449            client,
450            meta_client.clone(),
451            process_manager,
452        )
453        .with_plugin(plugins.clone())
454        .with_local_cache_invalidator(layered_cache_registry)
455        .try_build()
456        .await
457        .context(error::StartFrontendSnafu)?;
458
459        let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
460
461        let instance = Arc::new(instance);
462
463        let servers = Services::new(opts, instance.clone(), plugins)
464            .build()
465            .context(error::StartFrontendSnafu)?;
466
467        let frontend = Frontend {
468            instance,
469            servers,
470            heartbeat_task,
471        };
472
473        Ok(Instance::new(frontend, guard))
474    }
475}
476
477pub fn create_heartbeat_task(
478    options: &frontend::frontend::FrontendOptions,
479    meta_client: MetaClientRef,
480    instance: &frontend::instance::Instance,
481) -> HeartbeatTask {
482    let executor = Arc::new(HandlerGroupExecutor::new(vec![
483        Arc::new(ParseMailboxMessageHandler),
484        Arc::new(SuspendHandler::new(instance.suspend_state())),
485        Arc::new(InvalidateCacheHandler::new(
486            instance.cache_invalidator().clone(),
487        )),
488    ]));
489
490    let stat = {
491        let mut stat = ResourceStatImpl::default();
492        stat.start_collect_cpu_usage();
493        Arc::new(stat)
494    };
495
496    HeartbeatTask::new(options, meta_client, executor, stat)
497}
498
499#[cfg(test)]
500mod tests {
501    use std::io::Write;
502    use std::time::Duration;
503
504    use auth::{Identity, Password, UserProviderRef};
505    use common_base::readable_size::ReadableSize;
506    use common_config::ENV_VAR_SEP;
507    use common_test_util::temp_dir::create_named_temp_file;
508    use servers::grpc::GrpcOptions;
509    use servers::http::HttpOptions;
510
511    use super::*;
512    use crate::options::GlobalOptions;
513
514    #[test]
515    fn test_try_from_start_command() {
516        let command = StartCommand {
517            http_addr: Some("127.0.0.1:1234".to_string()),
518            mysql_addr: Some("127.0.0.1:5678".to_string()),
519            postgres_addr: Some("127.0.0.1:5432".to_string()),
520            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
521            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
522            influxdb_enable: Some(false),
523            disable_dashboard: Some(false),
524            ..Default::default()
525        };
526
527        let opts = command.load_options(&Default::default()).unwrap().component;
528
529        assert_eq!(opts.http.addr, "127.0.0.1:1234");
530        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
531        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
532        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
533
534        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
535        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
536        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
537
538        let default_opts = FrontendOptions::default().component;
539
540        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
541        assert!(opts.mysql.enable);
542        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
543        assert!(opts.postgres.enable);
544        assert_eq!(
545            opts.postgres.runtime_size,
546            default_opts.postgres.runtime_size
547        );
548        assert!(opts.opentsdb.enable);
549
550        assert!(!opts.influxdb.enable);
551    }
552
553    #[test]
554    fn test_read_from_config_file() {
555        let mut file = create_named_temp_file();
556        let toml_str = r#"
557            [http]
558            addr = "127.0.0.1:4000"
559            timeout = "0s"
560            body_limit = "2GB"
561
562            [opentsdb]
563            enable = false
564
565            [logging]
566            level = "debug"
567            dir = "./greptimedb_data/test/logs"
568        "#;
569        write!(file, "{}", toml_str).unwrap();
570
571        let command = StartCommand {
572            config_file: Some(file.path().to_str().unwrap().to_string()),
573            disable_dashboard: Some(false),
574            ..Default::default()
575        };
576
577        let fe_opts = command.load_options(&Default::default()).unwrap().component;
578
579        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
580        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
581
582        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
583
584        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
585        assert_eq!(
586            "./greptimedb_data/test/logs".to_string(),
587            fe_opts.logging.dir
588        );
589        assert!(!fe_opts.opentsdb.enable);
590    }
591
592    #[tokio::test]
593    async fn test_try_from_start_command_to_anymap() {
594        let fe_opts = frontend::frontend::FrontendOptions {
595            http: HttpOptions {
596                disable_dashboard: false,
597                ..Default::default()
598            },
599            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
600            ..Default::default()
601        };
602
603        let mut plugins = Plugins::new();
604        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
605            .await
606            .unwrap();
607
608        let provider = plugins.get::<UserProviderRef>().unwrap();
609        let result = provider
610            .authenticate(
611                Identity::UserId("test", None),
612                Password::PlainText("test".to_string().into()),
613            )
614            .await;
615        let _ = result.unwrap();
616    }
617
618    #[test]
619    fn test_load_log_options_from_cli() {
620        let cmd = StartCommand {
621            disable_dashboard: Some(false),
622            ..Default::default()
623        };
624
625        let options = cmd
626            .load_options(&GlobalOptions {
627                log_dir: Some("./greptimedb_data/test/logs".to_string()),
628                log_level: Some("debug".to_string()),
629
630                #[cfg(feature = "tokio-console")]
631                tokio_console_addr: None,
632            })
633            .unwrap()
634            .component;
635
636        let logging_opt = options.logging;
637        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
638        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
639    }
640
641    #[test]
642    fn test_config_precedence_order() {
643        let mut file = create_named_temp_file();
644        let toml_str = r#"
645            [http]
646            addr = "127.0.0.1:4000"
647
648            [meta_client]
649            timeout = "3s"
650            connect_timeout = "5s"
651            tcp_nodelay = true
652
653            [mysql]
654            addr = "127.0.0.1:4002"
655        "#;
656        write!(file, "{}", toml_str).unwrap();
657
658        let env_prefix = "FRONTEND_UT";
659        temp_env::with_vars(
660            [
661                (
662                    // mysql.addr = 127.0.0.1:14002
663                    [
664                        env_prefix.to_string(),
665                        "mysql".to_uppercase(),
666                        "addr".to_uppercase(),
667                    ]
668                    .join(ENV_VAR_SEP),
669                    Some("127.0.0.1:14002"),
670                ),
671                (
672                    // mysql.runtime_size = 11
673                    [
674                        env_prefix.to_string(),
675                        "mysql".to_uppercase(),
676                        "runtime_size".to_uppercase(),
677                    ]
678                    .join(ENV_VAR_SEP),
679                    Some("11"),
680                ),
681                (
682                    // http.addr = 127.0.0.1:24000
683                    [
684                        env_prefix.to_string(),
685                        "http".to_uppercase(),
686                        "addr".to_uppercase(),
687                    ]
688                    .join(ENV_VAR_SEP),
689                    Some("127.0.0.1:24000"),
690                ),
691                (
692                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
693                    [
694                        env_prefix.to_string(),
695                        "meta_client".to_uppercase(),
696                        "metasrv_addrs".to_uppercase(),
697                    ]
698                    .join(ENV_VAR_SEP),
699                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
700                ),
701            ],
702            || {
703                let command = StartCommand {
704                    config_file: Some(file.path().to_str().unwrap().to_string()),
705                    http_addr: Some("127.0.0.1:14000".to_string()),
706                    env_prefix: env_prefix.to_string(),
707                    ..Default::default()
708                };
709
710                let fe_opts = command.load_options(&Default::default()).unwrap().component;
711
712                // Should be read from env, env > default values.
713                assert_eq!(fe_opts.mysql.runtime_size, 11);
714                assert_eq!(
715                    fe_opts.meta_client.unwrap().metasrv_addrs,
716                    vec![
717                        "127.0.0.1:3001".to_string(),
718                        "127.0.0.1:3002".to_string(),
719                        "127.0.0.1:3003".to_string()
720                    ]
721                );
722
723                // Should be read from config file, config file > env > default values.
724                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
725
726                // Should be read from cli, cli > config file > env > default values.
727                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
728
729                // Should be default value.
730                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
731            },
732        );
733    }
734}