cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_error::ext::BoxedError;
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
31use common_meta::heartbeat::handler::HandlerGroupExecutor;
32use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
33use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
34use common_query::prelude::set_default_prefix;
35use common_stat::ResourceStatImpl;
36use common_telemetry::info;
37use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
38use common_time::timezone::set_default_timezone;
39use common_version::{short_version, verbose_version};
40use frontend::frontend::Frontend;
41use frontend::heartbeat::HeartbeatTask;
42use frontend::instance::builder::FrontendBuilder;
43use frontend::server::Services;
44use meta_client::{MetaClientOptions, MetaClientType};
45use servers::addrs;
46use servers::grpc::GrpcOptions;
47use servers::tls::{TlsMode, TlsOption};
48use snafu::{OptionExt, ResultExt};
49use tracing_appender::non_blocking::WorkerGuard;
50
51use crate::error::{self, Result};
52use crate::options::{GlobalOptions, GreptimeOptions};
53use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
54
55type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
56
57pub struct Instance {
58    frontend: Frontend,
59    // Keep the logging guard to prevent the worker from being dropped.
60    _guard: Vec<WorkerGuard>,
61}
62
63pub const APP_NAME: &str = "greptime-frontend";
64
65impl Instance {
66    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
67        Self { frontend, _guard }
68    }
69
70    pub fn inner(&self) -> &Frontend {
71        &self.frontend
72    }
73
74    pub fn mut_inner(&mut self) -> &mut Frontend {
75        &mut self.frontend
76    }
77}
78
79#[async_trait]
80impl App for Instance {
81    fn name(&self) -> &str {
82        APP_NAME
83    }
84
85    async fn start(&mut self) -> Result<()> {
86        let plugins = self.frontend.instance.plugins().clone();
87        plugins::start_frontend_plugins(plugins)
88            .await
89            .context(error::StartFrontendSnafu)?;
90
91        self.frontend
92            .start()
93            .await
94            .context(error::StartFrontendSnafu)
95    }
96
97    async fn stop(&mut self) -> Result<()> {
98        self.frontend
99            .shutdown()
100            .await
101            .context(error::ShutdownFrontendSnafu)
102    }
103}
104
105#[derive(Parser)]
106pub struct Command {
107    #[clap(subcommand)]
108    pub subcmd: SubCommand,
109}
110
111impl Command {
112    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
113        self.subcmd.build(opts).await
114    }
115
116    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
117        self.subcmd.load_options(global_options)
118    }
119}
120
121#[derive(Parser)]
122pub enum SubCommand {
123    Start(StartCommand),
124}
125
126impl SubCommand {
127    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
128        match self {
129            SubCommand::Start(cmd) => cmd.build(opts).await,
130        }
131    }
132
133    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
134        match self {
135            SubCommand::Start(cmd) => cmd.load_options(global_options),
136        }
137    }
138}
139
140#[derive(Debug, Default, Parser)]
141pub struct StartCommand {
142    /// The address to bind the gRPC server.
143    #[clap(long, alias = "rpc-addr")]
144    rpc_bind_addr: Option<String>,
145    /// The address advertised to the metasrv, and used for connections from outside the host.
146    /// If left empty or unset, the server will automatically use the IP address of the first network interface
147    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
148    #[clap(long, alias = "rpc-hostname")]
149    rpc_server_addr: Option<String>,
150    /// The address to bind the internal gRPC server.
151    #[clap(long, alias = "internal-rpc-addr")]
152    internal_rpc_bind_addr: Option<String>,
153    /// The address advertised to the metasrv, and used for connections from outside the host.
154    /// If left empty or unset, the server will automatically use the IP address of the first network interface
155    /// on the host, with the same port number as the one specified in `internal_rpc_bind_addr`.
156    #[clap(long, alias = "internal-rpc-hostname")]
157    internal_rpc_server_addr: Option<String>,
158    #[clap(long)]
159    http_addr: Option<String>,
160    #[clap(long)]
161    http_timeout: Option<u64>,
162    #[clap(long)]
163    mysql_addr: Option<String>,
164    #[clap(long)]
165    postgres_addr: Option<String>,
166    #[clap(short, long)]
167    pub config_file: Option<String>,
168    #[clap(short, long)]
169    influxdb_enable: Option<bool>,
170    #[clap(long, value_delimiter = ',', num_args = 1..)]
171    metasrv_addrs: Option<Vec<String>>,
172    #[clap(long)]
173    tls_mode: Option<TlsMode>,
174    #[clap(long)]
175    tls_cert_path: Option<String>,
176    #[clap(long)]
177    tls_key_path: Option<String>,
178    #[clap(long)]
179    tls_watch: bool,
180    #[clap(long)]
181    user_provider: Option<String>,
182    #[clap(long)]
183    disable_dashboard: Option<bool>,
184    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
185    pub env_prefix: String,
186}
187
188impl StartCommand {
189    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
190        let mut opts = FrontendOptions::load_layered_options(
191            self.config_file.as_deref(),
192            self.env_prefix.as_ref(),
193        )
194        .context(error::LoadLayeredConfigSnafu)?;
195
196        self.merge_with_cli_options(global_options, &mut opts)?;
197
198        Ok(opts)
199    }
200
201    // The precedence order is: cli > config file > environment variables > default values.
202    fn merge_with_cli_options(
203        &self,
204        global_options: &GlobalOptions,
205        opts: &mut FrontendOptions,
206    ) -> Result<()> {
207        let opts = &mut opts.component;
208
209        if let Some(dir) = &global_options.log_dir {
210            opts.logging.dir.clone_from(dir);
211        }
212
213        // If the logging dir is not set, use the default logs dir in the data home.
214        if opts.logging.dir.is_empty() {
215            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
216                .join(DEFAULT_LOGGING_DIR)
217                .to_string_lossy()
218                .to_string();
219        }
220
221        if global_options.log_level.is_some() {
222            opts.logging.level.clone_from(&global_options.log_level);
223        }
224
225        opts.tracing = TracingOptions {
226            #[cfg(feature = "tokio-console")]
227            tokio_console_addr: global_options.tokio_console_addr.clone(),
228        };
229
230        let tls_opts = TlsOption::new(
231            self.tls_mode.clone(),
232            self.tls_cert_path.clone(),
233            self.tls_key_path.clone(),
234            self.tls_watch,
235        );
236
237        if let Some(addr) = &self.http_addr {
238            opts.http.addr.clone_from(addr);
239        }
240
241        if let Some(http_timeout) = self.http_timeout {
242            opts.http.timeout = Duration::from_secs(http_timeout)
243        }
244
245        if let Some(disable_dashboard) = self.disable_dashboard {
246            opts.http.disable_dashboard = disable_dashboard;
247        }
248
249        if let Some(addr) = &self.rpc_bind_addr {
250            opts.grpc.bind_addr.clone_from(addr);
251            opts.grpc.tls = tls_opts.clone();
252        }
253
254        if let Some(addr) = &self.rpc_server_addr {
255            opts.grpc.server_addr.clone_from(addr);
256        }
257
258        if let Some(addr) = &self.internal_rpc_bind_addr {
259            if let Some(internal_grpc) = &mut opts.internal_grpc {
260                internal_grpc.bind_addr = addr.clone();
261            } else {
262                let grpc_options = GrpcOptions {
263                    bind_addr: addr.clone(),
264                    ..Default::default()
265                };
266
267                opts.internal_grpc = Some(grpc_options);
268            }
269        }
270
271        if let Some(addr) = &self.internal_rpc_server_addr {
272            if let Some(internal_grpc) = &mut opts.internal_grpc {
273                internal_grpc.server_addr = addr.clone();
274            } else {
275                let grpc_options = GrpcOptions {
276                    server_addr: addr.clone(),
277                    ..Default::default()
278                };
279                opts.internal_grpc = Some(grpc_options);
280            }
281        }
282
283        if let Some(addr) = &self.mysql_addr {
284            opts.mysql.enable = true;
285            opts.mysql.addr.clone_from(addr);
286            opts.mysql.tls = tls_opts.clone();
287        }
288
289        if let Some(addr) = &self.postgres_addr {
290            opts.postgres.enable = true;
291            opts.postgres.addr.clone_from(addr);
292            opts.postgres.tls = tls_opts;
293        }
294
295        if let Some(enable) = self.influxdb_enable {
296            opts.influxdb.enable = enable;
297        }
298
299        if let Some(metasrv_addrs) = &self.metasrv_addrs {
300            opts.meta_client
301                .get_or_insert_with(MetaClientOptions::default)
302                .metasrv_addrs
303                .clone_from(metasrv_addrs);
304        }
305
306        if let Some(user_provider) = &self.user_provider {
307            opts.user_provider = Some(user_provider.clone());
308        }
309
310        Ok(())
311    }
312
313    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
314        common_runtime::init_global_runtimes(&opts.runtime);
315
316        let guard = common_telemetry::init_global_logging(
317            APP_NAME,
318            &opts.component.logging,
319            &opts.component.tracing,
320            opts.component.node_id.clone(),
321            Some(&opts.component.slow_query),
322        );
323
324        log_versions(verbose_version(), short_version(), APP_NAME);
325        maybe_activate_heap_profile(&opts.component.memory);
326        create_resource_limit_metrics(APP_NAME);
327
328        info!("Frontend start command: {:#?}", self);
329        info!("Frontend options: {:#?}", opts);
330
331        let plugin_opts = opts.plugins;
332        let mut opts = opts.component;
333        opts.grpc.detect_server_addr();
334        let mut plugins = Plugins::new();
335        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
336            .await
337            .context(error::StartFrontendSnafu)?;
338
339        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
340        set_default_prefix(opts.default_column_prefix.as_deref())
341            .map_err(BoxedError::new)
342            .context(error::BuildCliSnafu)?;
343
344        let meta_client_options = opts
345            .meta_client
346            .as_ref()
347            .context(error::MissingConfigSnafu {
348                msg: "'meta_client'",
349            })?;
350
351        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
352        let cache_ttl = meta_client_options.metadata_cache_ttl;
353        let cache_tti = meta_client_options.metadata_cache_tti;
354
355        let meta_client = meta_client::create_meta_client(
356            MetaClientType::Frontend,
357            meta_client_options,
358            Some(&plugins),
359            None,
360        )
361        .await
362        .context(error::MetaClientInitSnafu)?;
363
364        // TODO(discord9): add helper function to ease the creation of cache registry&such
365        let cached_meta_backend =
366            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
367                .cache_max_capacity(cache_max_capacity)
368                .cache_ttl(cache_ttl)
369                .cache_tti(cache_tti)
370                .build();
371        let cached_meta_backend = Arc::new(cached_meta_backend);
372
373        // Builds cache registry
374        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
375            CacheRegistryBuilder::default()
376                .add_cache(cached_meta_backend.clone())
377                .build(),
378        );
379        let fundamental_cache_registry =
380            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
381        let layered_cache_registry = Arc::new(
382            with_default_composite_cache_registry(
383                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
384            )
385            .context(error::BuildCacheRegistrySnafu)?
386            .build(),
387        );
388
389        // frontend to datanode need not timeout.
390        // Some queries are expected to take long time.
391        let mut channel_config = ChannelConfig {
392            timeout: None,
393            tcp_nodelay: opts.datanode.client.tcp_nodelay,
394            connect_timeout: Some(opts.datanode.client.connect_timeout),
395            ..Default::default()
396        };
397        if opts.grpc.flight_compression.transport_compression() {
398            channel_config.accept_compression = true;
399            channel_config.send_compression = true;
400        }
401        let client = Arc::new(NodeClients::new(channel_config));
402
403        let information_extension = Arc::new(DistributedInformationExtension::new(
404            meta_client.clone(),
405            client.clone(),
406        ));
407
408        let process_manager = Arc::new(ProcessManager::new(
409            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
410            Some(meta_client.clone()),
411        ));
412
413        let builder = KvBackendCatalogManagerBuilder::new(
414            information_extension,
415            cached_meta_backend.clone(),
416            layered_cache_registry.clone(),
417        )
418        .with_process_manager(process_manager.clone());
419        #[cfg(feature = "enterprise")]
420        let builder = if let Some(factories) = plugins.get() {
421            builder.with_extra_information_table_factories(factories)
422        } else {
423            builder
424        };
425        let catalog_manager = builder.build();
426
427        let executor = HandlerGroupExecutor::new(vec![
428            Arc::new(ParseMailboxMessageHandler),
429            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
430        ]);
431
432        let mut resource_stat = ResourceStatImpl::default();
433        resource_stat.start_collect_cpu_usage();
434
435        let heartbeat_task = HeartbeatTask::new(
436            &opts,
437            meta_client.clone(),
438            opts.heartbeat.clone(),
439            Arc::new(executor),
440            Arc::new(resource_stat),
441        );
442        let heartbeat_task = Some(heartbeat_task);
443
444        let instance = FrontendBuilder::new(
445            opts.clone(),
446            cached_meta_backend.clone(),
447            layered_cache_registry.clone(),
448            catalog_manager,
449            client,
450            meta_client,
451            process_manager,
452        )
453        .with_plugin(plugins.clone())
454        .with_local_cache_invalidator(layered_cache_registry)
455        .try_build()
456        .await
457        .context(error::StartFrontendSnafu)?;
458        let instance = Arc::new(instance);
459
460        let servers = Services::new(opts, instance.clone(), plugins)
461            .build()
462            .context(error::StartFrontendSnafu)?;
463
464        let frontend = Frontend {
465            instance,
466            servers,
467            heartbeat_task,
468        };
469
470        Ok(Instance::new(frontend, guard))
471    }
472}
473
474#[cfg(test)]
475mod tests {
476    use std::io::Write;
477    use std::time::Duration;
478
479    use auth::{Identity, Password, UserProviderRef};
480    use common_base::readable_size::ReadableSize;
481    use common_config::ENV_VAR_SEP;
482    use common_test_util::temp_dir::create_named_temp_file;
483    use servers::grpc::GrpcOptions;
484    use servers::http::HttpOptions;
485
486    use super::*;
487    use crate::options::GlobalOptions;
488
489    #[test]
490    fn test_try_from_start_command() {
491        let command = StartCommand {
492            http_addr: Some("127.0.0.1:1234".to_string()),
493            mysql_addr: Some("127.0.0.1:5678".to_string()),
494            postgres_addr: Some("127.0.0.1:5432".to_string()),
495            internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
496            internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
497            influxdb_enable: Some(false),
498            disable_dashboard: Some(false),
499            ..Default::default()
500        };
501
502        let opts = command.load_options(&Default::default()).unwrap().component;
503
504        assert_eq!(opts.http.addr, "127.0.0.1:1234");
505        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
506        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
507        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
508
509        let internal_grpc = opts.internal_grpc.as_ref().unwrap();
510        assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
511        assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
512
513        let default_opts = FrontendOptions::default().component;
514
515        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
516        assert!(opts.mysql.enable);
517        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
518        assert!(opts.postgres.enable);
519        assert_eq!(
520            opts.postgres.runtime_size,
521            default_opts.postgres.runtime_size
522        );
523        assert!(opts.opentsdb.enable);
524
525        assert!(!opts.influxdb.enable);
526    }
527
528    #[test]
529    fn test_read_from_config_file() {
530        let mut file = create_named_temp_file();
531        let toml_str = r#"
532            [http]
533            addr = "127.0.0.1:4000"
534            timeout = "0s"
535            body_limit = "2GB"
536
537            [opentsdb]
538            enable = false
539
540            [logging]
541            level = "debug"
542            dir = "./greptimedb_data/test/logs"
543        "#;
544        write!(file, "{}", toml_str).unwrap();
545
546        let command = StartCommand {
547            config_file: Some(file.path().to_str().unwrap().to_string()),
548            disable_dashboard: Some(false),
549            ..Default::default()
550        };
551
552        let fe_opts = command.load_options(&Default::default()).unwrap().component;
553
554        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
555        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
556
557        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
558
559        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
560        assert_eq!(
561            "./greptimedb_data/test/logs".to_string(),
562            fe_opts.logging.dir
563        );
564        assert!(!fe_opts.opentsdb.enable);
565    }
566
567    #[tokio::test]
568    async fn test_try_from_start_command_to_anymap() {
569        let fe_opts = frontend::frontend::FrontendOptions {
570            http: HttpOptions {
571                disable_dashboard: false,
572                ..Default::default()
573            },
574            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
575            ..Default::default()
576        };
577
578        let mut plugins = Plugins::new();
579        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
580            .await
581            .unwrap();
582
583        let provider = plugins.get::<UserProviderRef>().unwrap();
584        let result = provider
585            .authenticate(
586                Identity::UserId("test", None),
587                Password::PlainText("test".to_string().into()),
588            )
589            .await;
590        let _ = result.unwrap();
591    }
592
593    #[test]
594    fn test_load_log_options_from_cli() {
595        let cmd = StartCommand {
596            disable_dashboard: Some(false),
597            ..Default::default()
598        };
599
600        let options = cmd
601            .load_options(&GlobalOptions {
602                log_dir: Some("./greptimedb_data/test/logs".to_string()),
603                log_level: Some("debug".to_string()),
604
605                #[cfg(feature = "tokio-console")]
606                tokio_console_addr: None,
607            })
608            .unwrap()
609            .component;
610
611        let logging_opt = options.logging;
612        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
613        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
614    }
615
616    #[test]
617    fn test_config_precedence_order() {
618        let mut file = create_named_temp_file();
619        let toml_str = r#"
620            [http]
621            addr = "127.0.0.1:4000"
622
623            [meta_client]
624            timeout = "3s"
625            connect_timeout = "5s"
626            tcp_nodelay = true
627
628            [mysql]
629            addr = "127.0.0.1:4002"
630        "#;
631        write!(file, "{}", toml_str).unwrap();
632
633        let env_prefix = "FRONTEND_UT";
634        temp_env::with_vars(
635            [
636                (
637                    // mysql.addr = 127.0.0.1:14002
638                    [
639                        env_prefix.to_string(),
640                        "mysql".to_uppercase(),
641                        "addr".to_uppercase(),
642                    ]
643                    .join(ENV_VAR_SEP),
644                    Some("127.0.0.1:14002"),
645                ),
646                (
647                    // mysql.runtime_size = 11
648                    [
649                        env_prefix.to_string(),
650                        "mysql".to_uppercase(),
651                        "runtime_size".to_uppercase(),
652                    ]
653                    .join(ENV_VAR_SEP),
654                    Some("11"),
655                ),
656                (
657                    // http.addr = 127.0.0.1:24000
658                    [
659                        env_prefix.to_string(),
660                        "http".to_uppercase(),
661                        "addr".to_uppercase(),
662                    ]
663                    .join(ENV_VAR_SEP),
664                    Some("127.0.0.1:24000"),
665                ),
666                (
667                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
668                    [
669                        env_prefix.to_string(),
670                        "meta_client".to_uppercase(),
671                        "metasrv_addrs".to_uppercase(),
672                    ]
673                    .join(ENV_VAR_SEP),
674                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
675                ),
676            ],
677            || {
678                let command = StartCommand {
679                    config_file: Some(file.path().to_str().unwrap().to_string()),
680                    http_addr: Some("127.0.0.1:14000".to_string()),
681                    env_prefix: env_prefix.to_string(),
682                    ..Default::default()
683                };
684
685                let fe_opts = command.load_options(&Default::default()).unwrap().component;
686
687                // Should be read from env, env > default values.
688                assert_eq!(fe_opts.mysql.runtime_size, 11);
689                assert_eq!(
690                    fe_opts.meta_client.unwrap().metasrv_addrs,
691                    vec![
692                        "127.0.0.1:3001".to_string(),
693                        "127.0.0.1:3002".to_string(),
694                        "127.0.0.1:3003".to_string()
695                    ]
696                );
697
698                // Should be read from config file, config file > env > default values.
699                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
700
701                // Should be read from cli, cli > config file > env > default values.
702                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
703
704                // Should be default value.
705                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
706            },
707        );
708    }
709}