cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
32use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
33use common_telemetry::info;
34use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
35use common_time::timezone::set_default_timezone;
36use common_version::{short_version, verbose_version};
37use frontend::frontend::Frontend;
38use frontend::heartbeat::HeartbeatTask;
39use frontend::instance::builder::FrontendBuilder;
40use frontend::server::Services;
41use meta_client::{MetaClientOptions, MetaClientType};
42use servers::addrs;
43use servers::export_metrics::ExportMetricsTask;
44use servers::grpc::GrpcOptions;
45use servers::tls::{TlsMode, TlsOption};
46use snafu::{OptionExt, ResultExt};
47use tracing_appender::non_blocking::WorkerGuard;
48
49use crate::error::{self, Result};
50use crate::options::{GlobalOptions, GreptimeOptions};
51use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
52
53type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
54
55pub struct Instance {
56    frontend: Frontend,
57    // Keep the logging guard to prevent the worker from being dropped.
58    _guard: Vec<WorkerGuard>,
59}
60
61pub const APP_NAME: &str = "greptime-frontend";
62
63impl Instance {
64    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
65        Self { frontend, _guard }
66    }
67
68    pub fn inner(&self) -> &Frontend {
69        &self.frontend
70    }
71
72    pub fn mut_inner(&mut self) -> &mut Frontend {
73        &mut self.frontend
74    }
75}
76
77#[async_trait]
78impl App for Instance {
79    fn name(&self) -> &str {
80        APP_NAME
81    }
82
83    async fn start(&mut self) -> Result<()> {
84        let plugins = self.frontend.instance.plugins().clone();
85        plugins::start_frontend_plugins(plugins)
86            .await
87            .context(error::StartFrontendSnafu)?;
88
89        self.frontend
90            .start()
91            .await
92            .context(error::StartFrontendSnafu)
93    }
94
95    async fn stop(&mut self) -> Result<()> {
96        self.frontend
97            .shutdown()
98            .await
99            .context(error::ShutdownFrontendSnafu)
100    }
101}
102
103#[derive(Parser)]
104pub struct Command {
105    #[clap(subcommand)]
106    pub subcmd: SubCommand,
107}
108
109impl Command {
110    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
111        self.subcmd.build(opts).await
112    }
113
114    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
115        self.subcmd.load_options(global_options)
116    }
117}
118
119#[derive(Parser)]
120pub enum SubCommand {
121    Start(StartCommand),
122}
123
124impl SubCommand {
125    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
126        match self {
127            SubCommand::Start(cmd) => cmd.build(opts).await,
128        }
129    }
130
131    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
132        match self {
133            SubCommand::Start(cmd) => cmd.load_options(global_options),
134        }
135    }
136}
137
138#[derive(Debug, Default, Parser)]
139pub struct StartCommand {
140    /// The address to bind the gRPC server.
141    #[clap(long, alias = "rpc-addr")]
142    rpc_bind_addr: Option<String>,
143    /// The address advertised to the metasrv, and used for connections from outside the host.
144    /// If left empty or unset, the server will automatically use the IP address of the first network interface
145    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
146    #[clap(long, alias = "rpc-hostname")]
147    rpc_server_addr: Option<String>,
148    /// The address to bind the internal gRPC server.
149    #[clap(long, alias = "internal-rpc-addr")]
150    internal_rpc_bind_addr: Option<String>,
151    /// The address advertised to the metasrv, and used for connections from outside the host.
152    /// If left empty or unset, the server will automatically use the IP address of the first network interface
153    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
154    #[clap(long, alias = "internal-rpc-hostname")]
155    internal_rpc_server_addr: Option<String>,
156    #[clap(long)]
157    http_addr: Option<String>,
158    #[clap(long)]
159    http_timeout: Option<u64>,
160    #[clap(long)]
161    mysql_addr: Option<String>,
162    #[clap(long)]
163    postgres_addr: Option<String>,
164    #[clap(short, long)]
165    pub config_file: Option<String>,
166    #[clap(short, long)]
167    influxdb_enable: Option<bool>,
168    #[clap(long, value_delimiter = ',', num_args = 1..)]
169    metasrv_addrs: Option<Vec<String>>,
170    #[clap(long)]
171    tls_mode: Option<TlsMode>,
172    #[clap(long)]
173    tls_cert_path: Option<String>,
174    #[clap(long)]
175    tls_key_path: Option<String>,
176    #[clap(long)]
177    user_provider: Option<String>,
178    #[clap(long)]
179    disable_dashboard: Option<bool>,
180    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
181    pub env_prefix: String,
182}
183
184impl StartCommand {
185    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
186        let mut opts = FrontendOptions::load_layered_options(
187            self.config_file.as_deref(),
188            self.env_prefix.as_ref(),
189        )
190        .context(error::LoadLayeredConfigSnafu)?;
191
192        self.merge_with_cli_options(global_options, &mut opts)?;
193
194        Ok(opts)
195    }
196
197    // The precedence order is: cli > config file > environment variables > default values.
198    fn merge_with_cli_options(
199        &self,
200        global_options: &GlobalOptions,
201        opts: &mut FrontendOptions,
202    ) -> Result<()> {
203        let opts = &mut opts.component;
204
205        if let Some(dir) = &global_options.log_dir {
206            opts.logging.dir.clone_from(dir);
207        }
208
209        // If the logging dir is not set, use the default logs dir in the data home.
210        if opts.logging.dir.is_empty() {
211            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
212                .join(DEFAULT_LOGGING_DIR)
213                .to_string_lossy()
214                .to_string();
215        }
216
217        if global_options.log_level.is_some() {
218            opts.logging.level.clone_from(&global_options.log_level);
219        }
220
221        opts.tracing = TracingOptions {
222            #[cfg(feature = "tokio-console")]
223            tokio_console_addr: global_options.tokio_console_addr.clone(),
224        };
225
226        let tls_opts = TlsOption::new(
227            self.tls_mode.clone(),
228            self.tls_cert_path.clone(),
229            self.tls_key_path.clone(),
230        );
231
232        if let Some(addr) = &self.http_addr {
233            opts.http.addr.clone_from(addr);
234        }
235
236        if let Some(http_timeout) = self.http_timeout {
237            opts.http.timeout = Duration::from_secs(http_timeout)
238        }
239
240        if let Some(disable_dashboard) = self.disable_dashboard {
241            opts.http.disable_dashboard = disable_dashboard;
242        }
243
244        if let Some(addr) = &self.rpc_bind_addr {
245            opts.grpc.bind_addr.clone_from(addr);
246            opts.grpc.tls = tls_opts.clone();
247        }
248
249        if let Some(addr) = &self.rpc_server_addr {
250            opts.grpc.server_addr.clone_from(addr);
251        }
252
253        if let Some(addr) = &self.internal_rpc_bind_addr {
254            if let Some(internal_grpc) = &mut opts.internal_grpc {
255                internal_grpc.bind_addr = addr.to_string();
256            } else {
257                let grpc_options = GrpcOptions {
258                    bind_addr: addr.to_string(),
259                    ..Default::default()
260                };
261
262                opts.internal_grpc = Some(grpc_options);
263            }
264        }
265
266        if let Some(addr) = &self.internal_rpc_server_addr {
267            if let Some(internal_grpc) = &mut opts.internal_grpc {
268                internal_grpc.server_addr = addr.to_string();
269            } else {
270                let grpc_options = GrpcOptions {
271                    server_addr: addr.to_string(),
272                    ..Default::default()
273                };
274                opts.internal_grpc = Some(grpc_options);
275            }
276        }
277
278        if let Some(addr) = &self.mysql_addr {
279            opts.mysql.enable = true;
280            opts.mysql.addr.clone_from(addr);
281            opts.mysql.tls = tls_opts.clone();
282        }
283
284        if let Some(addr) = &self.postgres_addr {
285            opts.postgres.enable = true;
286            opts.postgres.addr.clone_from(addr);
287            opts.postgres.tls = tls_opts;
288        }
289
290        if let Some(enable) = self.influxdb_enable {
291            opts.influxdb.enable = enable;
292        }
293
294        if let Some(metasrv_addrs) = &self.metasrv_addrs {
295            opts.meta_client
296                .get_or_insert_with(MetaClientOptions::default)
297                .metasrv_addrs
298                .clone_from(metasrv_addrs);
299        }
300
301        if let Some(user_provider) = &self.user_provider {
302            opts.user_provider = Some(user_provider.clone());
303        }
304
305        Ok(())
306    }
307
308    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
309        common_runtime::init_global_runtimes(&opts.runtime);
310
311        let guard = common_telemetry::init_global_logging(
312            APP_NAME,
313            &opts.component.logging,
314            &opts.component.tracing,
315            opts.component.node_id.clone(),
316            Some(&opts.component.slow_query),
317        );
318
319        log_versions(verbose_version(), short_version(), APP_NAME);
320        maybe_activate_heap_profile(&opts.component.memory);
321        create_resource_limit_metrics(APP_NAME);
322
323        info!("Frontend start command: {:#?}", self);
324        info!("Frontend options: {:#?}", opts);
325
326        let plugin_opts = opts.plugins;
327        let mut opts = opts.component;
328        opts.grpc.detect_server_addr();
329        let mut plugins = Plugins::new();
330        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
331            .await
332            .context(error::StartFrontendSnafu)?;
333
334        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
335
336        let meta_client_options = opts
337            .meta_client
338            .as_ref()
339            .context(error::MissingConfigSnafu {
340                msg: "'meta_client'",
341            })?;
342
343        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
344        let cache_ttl = meta_client_options.metadata_cache_ttl;
345        let cache_tti = meta_client_options.metadata_cache_tti;
346
347        let meta_client = meta_client::create_meta_client(
348            MetaClientType::Frontend,
349            meta_client_options,
350            Some(&plugins),
351            None,
352        )
353        .await
354        .context(error::MetaClientInitSnafu)?;
355
356        // TODO(discord9): add helper function to ease the creation of cache registry&such
357        let cached_meta_backend =
358            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
359                .cache_max_capacity(cache_max_capacity)
360                .cache_ttl(cache_ttl)
361                .cache_tti(cache_tti)
362                .build();
363        let cached_meta_backend = Arc::new(cached_meta_backend);
364
365        // Builds cache registry
366        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
367            CacheRegistryBuilder::default()
368                .add_cache(cached_meta_backend.clone())
369                .build(),
370        );
371        let fundamental_cache_registry =
372            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
373        let layered_cache_registry = Arc::new(
374            with_default_composite_cache_registry(
375                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
376            )
377            .context(error::BuildCacheRegistrySnafu)?
378            .build(),
379        );
380
381        // frontend to datanode need not timeout.
382        // Some queries are expected to take long time.
383        let mut channel_config = ChannelConfig {
384            timeout: None,
385            tcp_nodelay: opts.datanode.client.tcp_nodelay,
386            connect_timeout: Some(opts.datanode.client.connect_timeout),
387            ..Default::default()
388        };
389        if opts.grpc.flight_compression.transport_compression() {
390            channel_config.accept_compression = true;
391            channel_config.send_compression = true;
392        }
393        let client = Arc::new(NodeClients::new(channel_config));
394
395        let information_extension = Arc::new(DistributedInformationExtension::new(
396            meta_client.clone(),
397            client.clone(),
398        ));
399
400        let process_manager = Arc::new(ProcessManager::new(
401            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
402            Some(meta_client.clone()),
403        ));
404
405        let builder = KvBackendCatalogManagerBuilder::new(
406            information_extension,
407            cached_meta_backend.clone(),
408            layered_cache_registry.clone(),
409        )
410        .with_process_manager(process_manager.clone());
411        #[cfg(feature = "enterprise")]
412        let builder = if let Some(factories) = plugins.get() {
413            builder.with_extra_information_table_factories(factories)
414        } else {
415            builder
416        };
417        let catalog_manager = builder.build();
418
419        let executor = HandlerGroupExecutor::new(vec![
420            Arc::new(ParseMailboxMessageHandler),
421            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
422        ]);
423
424        let heartbeat_task = HeartbeatTask::new(
425            &opts,
426            meta_client.clone(),
427            opts.heartbeat.clone(),
428            Arc::new(executor),
429        );
430        let heartbeat_task = Some(heartbeat_task);
431
432        let instance = FrontendBuilder::new(
433            opts.clone(),
434            cached_meta_backend.clone(),
435            layered_cache_registry.clone(),
436            catalog_manager,
437            client,
438            meta_client,
439            process_manager,
440        )
441        .with_plugin(plugins.clone())
442        .with_local_cache_invalidator(layered_cache_registry)
443        .try_build()
444        .await
445        .context(error::StartFrontendSnafu)?;
446        let instance = Arc::new(instance);
447
448        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
449            .context(error::ServersSnafu)?;
450
451        let servers = Services::new(opts, instance.clone(), plugins)
452            .build()
453            .context(error::StartFrontendSnafu)?;
454
455        let frontend = Frontend {
456            instance,
457            servers,
458            heartbeat_task,
459            export_metrics_task,
460        };
461
462        Ok(Instance::new(frontend, guard))
463    }
464}
465
466#[cfg(test)]
467mod tests {
468    use std::io::Write;
469    use std::time::Duration;
470
471    use auth::{Identity, Password, UserProviderRef};
472    use common_base::readable_size::ReadableSize;
473    use common_config::ENV_VAR_SEP;
474    use common_test_util::temp_dir::create_named_temp_file;
475    use servers::grpc::GrpcOptions;
476    use servers::http::HttpOptions;
477
478    use super::*;
479    use crate::options::GlobalOptions;
480
481    #[test]
482    fn test_try_from_start_command() {
483        let command = StartCommand {
484            http_addr: Some("127.0.0.1:1234".to_string()),
485            mysql_addr: Some("127.0.0.1:5678".to_string()),
486            postgres_addr: Some("127.0.0.1:5432".to_string()),
487            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
488            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
489            influxdb_enable: Some(false),
490            disable_dashboard: Some(false),
491            ..Default::default()
492        };
493
494        let opts = command.load_options(&Default::default()).unwrap().component;
495
496        assert_eq!(opts.http.addr, "127.0.0.1:1234");
497        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
498        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
499        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
500
501        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
502        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
503        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
504
505        let default_opts = FrontendOptions::default().component;
506
507        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
508        assert!(opts.mysql.enable);
509        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
510        assert!(opts.postgres.enable);
511        assert_eq!(
512            opts.postgres.runtime_size,
513            default_opts.postgres.runtime_size
514        );
515        assert!(opts.opentsdb.enable);
516
517        assert!(!opts.influxdb.enable);
518    }
519
520    #[test]
521    fn test_read_from_config_file() {
522        let mut file = create_named_temp_file();
523        let toml_str = r#"
524            [http]
525            addr = "127.0.0.1:4000"
526            timeout = "0s"
527            body_limit = "2GB"
528
529            [opentsdb]
530            enable = false
531
532            [logging]
533            level = "debug"
534            dir = "./greptimedb_data/test/logs"
535        "#;
536        write!(file, "{}", toml_str).unwrap();
537
538        let command = StartCommand {
539            config_file: Some(file.path().to_str().unwrap().to_string()),
540            disable_dashboard: Some(false),
541            ..Default::default()
542        };
543
544        let fe_opts = command.load_options(&Default::default()).unwrap().component;
545
546        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
547        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
548
549        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
550
551        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
552        assert_eq!(
553            "./greptimedb_data/test/logs".to_string(),
554            fe_opts.logging.dir
555        );
556        assert!(!fe_opts.opentsdb.enable);
557    }
558
559    #[tokio::test]
560    async fn test_try_from_start_command_to_anymap() {
561        let fe_opts = frontend::frontend::FrontendOptions {
562            http: HttpOptions {
563                disable_dashboard: false,
564                ..Default::default()
565            },
566            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
567            ..Default::default()
568        };
569
570        let mut plugins = Plugins::new();
571        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
572            .await
573            .unwrap();
574
575        let provider = plugins.get::<UserProviderRef>().unwrap();
576        let result = provider
577            .authenticate(
578                Identity::UserId("test", None),
579                Password::PlainText("test".to_string().into()),
580            )
581            .await;
582        let _ = result.unwrap();
583    }
584
585    #[test]
586    fn test_load_log_options_from_cli() {
587        let cmd = StartCommand {
588            disable_dashboard: Some(false),
589            ..Default::default()
590        };
591
592        let options = cmd
593            .load_options(&GlobalOptions {
594                log_dir: Some("./greptimedb_data/test/logs".to_string()),
595                log_level: Some("debug".to_string()),
596
597                #[cfg(feature = "tokio-console")]
598                tokio_console_addr: None,
599            })
600            .unwrap()
601            .component;
602
603        let logging_opt = options.logging;
604        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
605        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
606    }
607
608    #[test]
609    fn test_config_precedence_order() {
610        let mut file = create_named_temp_file();
611        let toml_str = r#"
612            [http]
613            addr = "127.0.0.1:4000"
614
615            [meta_client]
616            timeout = "3s"
617            connect_timeout = "5s"
618            tcp_nodelay = true
619
620            [mysql]
621            addr = "127.0.0.1:4002"
622        "#;
623        write!(file, "{}", toml_str).unwrap();
624
625        let env_prefix = "FRONTEND_UT";
626        temp_env::with_vars(
627            [
628                (
629                    // mysql.addr = 127.0.0.1:14002
630                    [
631                        env_prefix.to_string(),
632                        "mysql".to_uppercase(),
633                        "addr".to_uppercase(),
634                    ]
635                    .join(ENV_VAR_SEP),
636                    Some("127.0.0.1:14002"),
637                ),
638                (
639                    // mysql.runtime_size = 11
640                    [
641                        env_prefix.to_string(),
642                        "mysql".to_uppercase(),
643                        "runtime_size".to_uppercase(),
644                    ]
645                    .join(ENV_VAR_SEP),
646                    Some("11"),
647                ),
648                (
649                    // http.addr = 127.0.0.1:24000
650                    [
651                        env_prefix.to_string(),
652                        "http".to_uppercase(),
653                        "addr".to_uppercase(),
654                    ]
655                    .join(ENV_VAR_SEP),
656                    Some("127.0.0.1:24000"),
657                ),
658                (
659                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
660                    [
661                        env_prefix.to_string(),
662                        "meta_client".to_uppercase(),
663                        "metasrv_addrs".to_uppercase(),
664                    ]
665                    .join(ENV_VAR_SEP),
666                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
667                ),
668            ],
669            || {
670                let command = StartCommand {
671                    config_file: Some(file.path().to_str().unwrap().to_string()),
672                    http_addr: Some("127.0.0.1:14000".to_string()),
673                    env_prefix: env_prefix.to_string(),
674                    ..Default::default()
675                };
676
677                let fe_opts = command.load_options(&Default::default()).unwrap().component;
678
679                // Should be read from env, env > default values.
680                assert_eq!(fe_opts.mysql.runtime_size, 11);
681                assert_eq!(
682                    fe_opts.meta_client.unwrap().metasrv_addrs,
683                    vec![
684                        "127.0.0.1:3001".to_string(),
685                        "127.0.0.1:3002".to_string(),
686                        "127.0.0.1:3003".to_string()
687                    ]
688                );
689
690                // Should be read from config file, config file > env > default values.
691                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
692
693                // Should be read from cli, cli > config file > env > default values.
694                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
695
696                // Should be default value.
697                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
698            },
699        );
700    }
701}