cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
32use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
33use common_stat::ResourceStatImpl;
34use common_telemetry::info;
35use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
36use common_time::timezone::set_default_timezone;
37use common_version::{short_version, verbose_version};
38use frontend::frontend::Frontend;
39use frontend::heartbeat::HeartbeatTask;
40use frontend::instance::builder::FrontendBuilder;
41use frontend::server::Services;
42use meta_client::{MetaClientOptions, MetaClientType};
43use servers::addrs;
44use servers::export_metrics::ExportMetricsTask;
45use servers::grpc::GrpcOptions;
46use servers::tls::{TlsMode, TlsOption};
47use snafu::{OptionExt, ResultExt};
48use tracing_appender::non_blocking::WorkerGuard;
49
50use crate::error::{self, Result};
51use crate::options::{GlobalOptions, GreptimeOptions};
52use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
53
54type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
55
56pub struct Instance {
57    frontend: Frontend,
58    // Keep the logging guard to prevent the worker from being dropped.
59    _guard: Vec<WorkerGuard>,
60}
61
62pub const APP_NAME: &str = "greptime-frontend";
63
64impl Instance {
65    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
66        Self { frontend, _guard }
67    }
68
69    pub fn inner(&self) -> &Frontend {
70        &self.frontend
71    }
72
73    pub fn mut_inner(&mut self) -> &mut Frontend {
74        &mut self.frontend
75    }
76}
77
78#[async_trait]
79impl App for Instance {
80    fn name(&self) -> &str {
81        APP_NAME
82    }
83
84    async fn start(&mut self) -> Result<()> {
85        let plugins = self.frontend.instance.plugins().clone();
86        plugins::start_frontend_plugins(plugins)
87            .await
88            .context(error::StartFrontendSnafu)?;
89
90        self.frontend
91            .start()
92            .await
93            .context(error::StartFrontendSnafu)
94    }
95
96    async fn stop(&mut self) -> Result<()> {
97        self.frontend
98            .shutdown()
99            .await
100            .context(error::ShutdownFrontendSnafu)
101    }
102}
103
104#[derive(Parser)]
105pub struct Command {
106    #[clap(subcommand)]
107    pub subcmd: SubCommand,
108}
109
110impl Command {
111    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
112        self.subcmd.build(opts).await
113    }
114
115    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
116        self.subcmd.load_options(global_options)
117    }
118}
119
120#[derive(Parser)]
121pub enum SubCommand {
122    Start(StartCommand),
123}
124
125impl SubCommand {
126    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
127        match self {
128            SubCommand::Start(cmd) => cmd.build(opts).await,
129        }
130    }
131
132    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
133        match self {
134            SubCommand::Start(cmd) => cmd.load_options(global_options),
135        }
136    }
137}
138
139#[derive(Debug, Default, Parser)]
140pub struct StartCommand {
141    /// The address to bind the gRPC server.
142    #[clap(long, alias = "rpc-addr")]
143    rpc_bind_addr: Option<String>,
144    /// The address advertised to the metasrv, and used for connections from outside the host.
145    /// If left empty or unset, the server will automatically use the IP address of the first network interface
146    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
147    #[clap(long, alias = "rpc-hostname")]
148    rpc_server_addr: Option<String>,
149    /// The address to bind the internal gRPC server.
150    #[clap(long, alias = "internal-rpc-addr")]
151    internal_rpc_bind_addr: Option<String>,
152    /// The address advertised to the metasrv, and used for connections from outside the host.
153    /// If left empty or unset, the server will automatically use the IP address of the first network interface
154    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
155    #[clap(long, alias = "internal-rpc-hostname")]
156    internal_rpc_server_addr: Option<String>,
157    #[clap(long)]
158    http_addr: Option<String>,
159    #[clap(long)]
160    http_timeout: Option<u64>,
161    #[clap(long)]
162    mysql_addr: Option<String>,
163    #[clap(long)]
164    postgres_addr: Option<String>,
165    #[clap(short, long)]
166    pub config_file: Option<String>,
167    #[clap(short, long)]
168    influxdb_enable: Option<bool>,
169    #[clap(long, value_delimiter = ',', num_args = 1..)]
170    metasrv_addrs: Option<Vec<String>>,
171    #[clap(long)]
172    tls_mode: Option<TlsMode>,
173    #[clap(long)]
174    tls_cert_path: Option<String>,
175    #[clap(long)]
176    tls_key_path: Option<String>,
177    #[clap(long)]
178    user_provider: Option<String>,
179    #[clap(long)]
180    disable_dashboard: Option<bool>,
181    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
182    pub env_prefix: String,
183}
184
185impl StartCommand {
186    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
187        let mut opts = FrontendOptions::load_layered_options(
188            self.config_file.as_deref(),
189            self.env_prefix.as_ref(),
190        )
191        .context(error::LoadLayeredConfigSnafu)?;
192
193        self.merge_with_cli_options(global_options, &mut opts)?;
194
195        Ok(opts)
196    }
197
198    // The precedence order is: cli > config file > environment variables > default values.
199    fn merge_with_cli_options(
200        &self,
201        global_options: &GlobalOptions,
202        opts: &mut FrontendOptions,
203    ) -> Result<()> {
204        let opts = &mut opts.component;
205
206        if let Some(dir) = &global_options.log_dir {
207            opts.logging.dir.clone_from(dir);
208        }
209
210        // If the logging dir is not set, use the default logs dir in the data home.
211        if opts.logging.dir.is_empty() {
212            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
213                .join(DEFAULT_LOGGING_DIR)
214                .to_string_lossy()
215                .to_string();
216        }
217
218        if global_options.log_level.is_some() {
219            opts.logging.level.clone_from(&global_options.log_level);
220        }
221
222        opts.tracing = TracingOptions {
223            #[cfg(feature = "tokio-console")]
224            tokio_console_addr: global_options.tokio_console_addr.clone(),
225        };
226
227        let tls_opts = TlsOption::new(
228            self.tls_mode.clone(),
229            self.tls_cert_path.clone(),
230            self.tls_key_path.clone(),
231        );
232
233        if let Some(addr) = &self.http_addr {
234            opts.http.addr.clone_from(addr);
235        }
236
237        if let Some(http_timeout) = self.http_timeout {
238            opts.http.timeout = Duration::from_secs(http_timeout)
239        }
240
241        if let Some(disable_dashboard) = self.disable_dashboard {
242            opts.http.disable_dashboard = disable_dashboard;
243        }
244
245        if let Some(addr) = &self.rpc_bind_addr {
246            opts.grpc.bind_addr.clone_from(addr);
247            opts.grpc.tls = tls_opts.clone();
248        }
249
250        if let Some(addr) = &self.rpc_server_addr {
251            opts.grpc.server_addr.clone_from(addr);
252        }
253
254        if let Some(addr) = &self.internal_rpc_bind_addr {
255            if let Some(internal_grpc) = &mut opts.internal_grpc {
256                internal_grpc.bind_addr = addr.clone();
257            } else {
258                let grpc_options = GrpcOptions {
259                    bind_addr: addr.clone(),
260                    ..Default::default()
261                };
262
263                opts.internal_grpc = Some(grpc_options);
264            }
265        }
266
267        if let Some(addr) = &self.internal_rpc_server_addr {
268            if let Some(internal_grpc) = &mut opts.internal_grpc {
269                internal_grpc.server_addr = addr.clone();
270            } else {
271                let grpc_options = GrpcOptions {
272                    server_addr: addr.clone(),
273                    ..Default::default()
274                };
275                opts.internal_grpc = Some(grpc_options);
276            }
277        }
278
279        if let Some(addr) = &self.mysql_addr {
280            opts.mysql.enable = true;
281            opts.mysql.addr.clone_from(addr);
282            opts.mysql.tls = tls_opts.clone();
283        }
284
285        if let Some(addr) = &self.postgres_addr {
286            opts.postgres.enable = true;
287            opts.postgres.addr.clone_from(addr);
288            opts.postgres.tls = tls_opts;
289        }
290
291        if let Some(enable) = self.influxdb_enable {
292            opts.influxdb.enable = enable;
293        }
294
295        if let Some(metasrv_addrs) = &self.metasrv_addrs {
296            opts.meta_client
297                .get_or_insert_with(MetaClientOptions::default)
298                .metasrv_addrs
299                .clone_from(metasrv_addrs);
300        }
301
302        if let Some(user_provider) = &self.user_provider {
303            opts.user_provider = Some(user_provider.clone());
304        }
305
306        Ok(())
307    }
308
309    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
310        common_runtime::init_global_runtimes(&opts.runtime);
311
312        let guard = common_telemetry::init_global_logging(
313            APP_NAME,
314            &opts.component.logging,
315            &opts.component.tracing,
316            opts.component.node_id.clone(),
317            Some(&opts.component.slow_query),
318        );
319
320        log_versions(verbose_version(), short_version(), APP_NAME);
321        maybe_activate_heap_profile(&opts.component.memory);
322        create_resource_limit_metrics(APP_NAME);
323
324        info!("Frontend start command: {:#?}", self);
325        info!("Frontend options: {:#?}", opts);
326
327        let plugin_opts = opts.plugins;
328        let mut opts = opts.component;
329        opts.grpc.detect_server_addr();
330        let mut plugins = Plugins::new();
331        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
332            .await
333            .context(error::StartFrontendSnafu)?;
334
335        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
336
337        let meta_client_options = opts
338            .meta_client
339            .as_ref()
340            .context(error::MissingConfigSnafu {
341                msg: "'meta_client'",
342            })?;
343
344        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
345        let cache_ttl = meta_client_options.metadata_cache_ttl;
346        let cache_tti = meta_client_options.metadata_cache_tti;
347
348        let meta_client = meta_client::create_meta_client(
349            MetaClientType::Frontend,
350            meta_client_options,
351            Some(&plugins),
352            None,
353        )
354        .await
355        .context(error::MetaClientInitSnafu)?;
356
357        // TODO(discord9): add helper function to ease the creation of cache registry&such
358        let cached_meta_backend =
359            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
360                .cache_max_capacity(cache_max_capacity)
361                .cache_ttl(cache_ttl)
362                .cache_tti(cache_tti)
363                .build();
364        let cached_meta_backend = Arc::new(cached_meta_backend);
365
366        // Builds cache registry
367        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
368            CacheRegistryBuilder::default()
369                .add_cache(cached_meta_backend.clone())
370                .build(),
371        );
372        let fundamental_cache_registry =
373            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
374        let layered_cache_registry = Arc::new(
375            with_default_composite_cache_registry(
376                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
377            )
378            .context(error::BuildCacheRegistrySnafu)?
379            .build(),
380        );
381
382        // frontend to datanode need not timeout.
383        // Some queries are expected to take long time.
384        let mut channel_config = ChannelConfig {
385            timeout: None,
386            tcp_nodelay: opts.datanode.client.tcp_nodelay,
387            connect_timeout: Some(opts.datanode.client.connect_timeout),
388            ..Default::default()
389        };
390        if opts.grpc.flight_compression.transport_compression() {
391            channel_config.accept_compression = true;
392            channel_config.send_compression = true;
393        }
394        let client = Arc::new(NodeClients::new(channel_config));
395
396        let information_extension = Arc::new(DistributedInformationExtension::new(
397            meta_client.clone(),
398            client.clone(),
399        ));
400
401        let process_manager = Arc::new(ProcessManager::new(
402            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
403            Some(meta_client.clone()),
404        ));
405
406        let builder = KvBackendCatalogManagerBuilder::new(
407            information_extension,
408            cached_meta_backend.clone(),
409            layered_cache_registry.clone(),
410        )
411        .with_process_manager(process_manager.clone());
412        #[cfg(feature = "enterprise")]
413        let builder = if let Some(factories) = plugins.get() {
414            builder.with_extra_information_table_factories(factories)
415        } else {
416            builder
417        };
418        let catalog_manager = builder.build();
419
420        let executor = HandlerGroupExecutor::new(vec![
421            Arc::new(ParseMailboxMessageHandler),
422            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
423        ]);
424
425        let mut resource_stat = ResourceStatImpl::default();
426        resource_stat.start_collect_cpu_usage();
427
428        let heartbeat_task = HeartbeatTask::new(
429            &opts,
430            meta_client.clone(),
431            opts.heartbeat.clone(),
432            Arc::new(executor),
433            Arc::new(resource_stat),
434        );
435        let heartbeat_task = Some(heartbeat_task);
436
437        let instance = FrontendBuilder::new(
438            opts.clone(),
439            cached_meta_backend.clone(),
440            layered_cache_registry.clone(),
441            catalog_manager,
442            client,
443            meta_client,
444            process_manager,
445        )
446        .with_plugin(plugins.clone())
447        .with_local_cache_invalidator(layered_cache_registry)
448        .try_build()
449        .await
450        .context(error::StartFrontendSnafu)?;
451        let instance = Arc::new(instance);
452
453        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
454            .context(error::ServersSnafu)?;
455
456        let servers = Services::new(opts, instance.clone(), plugins)
457            .build()
458            .context(error::StartFrontendSnafu)?;
459
460        let frontend = Frontend {
461            instance,
462            servers,
463            heartbeat_task,
464            export_metrics_task,
465        };
466
467        Ok(Instance::new(frontend, guard))
468    }
469}
470
471#[cfg(test)]
472mod tests {
473    use std::io::Write;
474    use std::time::Duration;
475
476    use auth::{Identity, Password, UserProviderRef};
477    use common_base::readable_size::ReadableSize;
478    use common_config::ENV_VAR_SEP;
479    use common_test_util::temp_dir::create_named_temp_file;
480    use servers::grpc::GrpcOptions;
481    use servers::http::HttpOptions;
482
483    use super::*;
484    use crate::options::GlobalOptions;
485
486    #[test]
487    fn test_try_from_start_command() {
488        let command = StartCommand {
489            http_addr: Some("127.0.0.1:1234".to_string()),
490            mysql_addr: Some("127.0.0.1:5678".to_string()),
491            postgres_addr: Some("127.0.0.1:5432".to_string()),
492            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
493            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
494            influxdb_enable: Some(false),
495            disable_dashboard: Some(false),
496            ..Default::default()
497        };
498
499        let opts = command.load_options(&Default::default()).unwrap().component;
500
501        assert_eq!(opts.http.addr, "127.0.0.1:1234");
502        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
503        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
504        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
505
506        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
507        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
508        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
509
510        let default_opts = FrontendOptions::default().component;
511
512        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
513        assert!(opts.mysql.enable);
514        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
515        assert!(opts.postgres.enable);
516        assert_eq!(
517            opts.postgres.runtime_size,
518            default_opts.postgres.runtime_size
519        );
520        assert!(opts.opentsdb.enable);
521
522        assert!(!opts.influxdb.enable);
523    }
524
525    #[test]
526    fn test_read_from_config_file() {
527        let mut file = create_named_temp_file();
528        let toml_str = r#"
529            [http]
530            addr = "127.0.0.1:4000"
531            timeout = "0s"
532            body_limit = "2GB"
533
534            [opentsdb]
535            enable = false
536
537            [logging]
538            level = "debug"
539            dir = "./greptimedb_data/test/logs"
540        "#;
541        write!(file, "{}", toml_str).unwrap();
542
543        let command = StartCommand {
544            config_file: Some(file.path().to_str().unwrap().to_string()),
545            disable_dashboard: Some(false),
546            ..Default::default()
547        };
548
549        let fe_opts = command.load_options(&Default::default()).unwrap().component;
550
551        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
552        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
553
554        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
555
556        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
557        assert_eq!(
558            "./greptimedb_data/test/logs".to_string(),
559            fe_opts.logging.dir
560        );
561        assert!(!fe_opts.opentsdb.enable);
562    }
563
564    #[tokio::test]
565    async fn test_try_from_start_command_to_anymap() {
566        let fe_opts = frontend::frontend::FrontendOptions {
567            http: HttpOptions {
568                disable_dashboard: false,
569                ..Default::default()
570            },
571            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
572            ..Default::default()
573        };
574
575        let mut plugins = Plugins::new();
576        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
577            .await
578            .unwrap();
579
580        let provider = plugins.get::<UserProviderRef>().unwrap();
581        let result = provider
582            .authenticate(
583                Identity::UserId("test", None),
584                Password::PlainText("test".to_string().into()),
585            )
586            .await;
587        let _ = result.unwrap();
588    }
589
590    #[test]
591    fn test_load_log_options_from_cli() {
592        let cmd = StartCommand {
593            disable_dashboard: Some(false),
594            ..Default::default()
595        };
596
597        let options = cmd
598            .load_options(&GlobalOptions {
599                log_dir: Some("./greptimedb_data/test/logs".to_string()),
600                log_level: Some("debug".to_string()),
601
602                #[cfg(feature = "tokio-console")]
603                tokio_console_addr: None,
604            })
605            .unwrap()
606            .component;
607
608        let logging_opt = options.logging;
609        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
610        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
611    }
612
613    #[test]
614    fn test_config_precedence_order() {
615        let mut file = create_named_temp_file();
616        let toml_str = r#"
617            [http]
618            addr = "127.0.0.1:4000"
619
620            [meta_client]
621            timeout = "3s"
622            connect_timeout = "5s"
623            tcp_nodelay = true
624
625            [mysql]
626            addr = "127.0.0.1:4002"
627        "#;
628        write!(file, "{}", toml_str).unwrap();
629
630        let env_prefix = "FRONTEND_UT";
631        temp_env::with_vars(
632            [
633                (
634                    // mysql.addr = 127.0.0.1:14002
635                    [
636                        env_prefix.to_string(),
637                        "mysql".to_uppercase(),
638                        "addr".to_uppercase(),
639                    ]
640                    .join(ENV_VAR_SEP),
641                    Some("127.0.0.1:14002"),
642                ),
643                (
644                    // mysql.runtime_size = 11
645                    [
646                        env_prefix.to_string(),
647                        "mysql".to_uppercase(),
648                        "runtime_size".to_uppercase(),
649                    ]
650                    .join(ENV_VAR_SEP),
651                    Some("11"),
652                ),
653                (
654                    // http.addr = 127.0.0.1:24000
655                    [
656                        env_prefix.to_string(),
657                        "http".to_uppercase(),
658                        "addr".to_uppercase(),
659                    ]
660                    .join(ENV_VAR_SEP),
661                    Some("127.0.0.1:24000"),
662                ),
663                (
664                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
665                    [
666                        env_prefix.to_string(),
667                        "meta_client".to_uppercase(),
668                        "metasrv_addrs".to_uppercase(),
669                    ]
670                    .join(ENV_VAR_SEP),
671                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
672                ),
673            ],
674            || {
675                let command = StartCommand {
676                    config_file: Some(file.path().to_str().unwrap().to_string()),
677                    http_addr: Some("127.0.0.1:14000".to_string()),
678                    env_prefix: env_prefix.to_string(),
679                    ..Default::default()
680                };
681
682                let fe_opts = command.load_options(&Default::default()).unwrap().component;
683
684                // Should be read from env, env > default values.
685                assert_eq!(fe_opts.mysql.runtime_size, 11);
686                assert_eq!(
687                    fe_opts.meta_client.unwrap().metasrv_addrs,
688                    vec![
689                        "127.0.0.1:3001".to_string(),
690                        "127.0.0.1:3002".to_string(),
691                        "127.0.0.1:3003".to_string()
692                    ]
693                );
694
695                // Should be read from config file, config file > env > default values.
696                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
697
698                // Should be read from cli, cli > config file > env > default values.
699                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
700
701                // Should be default value.
702                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
703            },
704        );
705    }
706}