cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
20use catalog::information_extension::DistributedInformationExtension;
21use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
22use clap::Parser;
23use client::client_manager::NodeClients;
24use common_base::Plugins;
25use common_config::Configurable;
26use common_grpc::channel_manager::ChannelConfig;
27use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
28use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
29use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_telemetry::info;
32use common_telemetry::logging::TracingOptions;
33use common_time::timezone::set_default_timezone;
34use common_version::{short_version, version};
35use frontend::frontend::Frontend;
36use frontend::heartbeat::HeartbeatTask;
37use frontend::instance::builder::FrontendBuilder;
38use frontend::server::Services;
39use meta_client::{MetaClientOptions, MetaClientType};
40use servers::export_metrics::ExportMetricsTask;
41use servers::tls::{TlsMode, TlsOption};
42use snafu::{OptionExt, ResultExt};
43use tracing_appender::non_blocking::WorkerGuard;
44
45use crate::error::{self, Result};
46use crate::options::{GlobalOptions, GreptimeOptions};
47use crate::{log_versions, App};
48
49type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
50
51pub struct Instance {
52    frontend: Frontend,
53    // Keep the logging guard to prevent the worker from being dropped.
54    _guard: Vec<WorkerGuard>,
55}
56
57pub const APP_NAME: &str = "greptime-frontend";
58
59impl Instance {
60    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
61        Self { frontend, _guard }
62    }
63
64    pub fn inner(&self) -> &Frontend {
65        &self.frontend
66    }
67
68    pub fn mut_inner(&mut self) -> &mut Frontend {
69        &mut self.frontend
70    }
71}
72
73#[async_trait]
74impl App for Instance {
75    fn name(&self) -> &str {
76        APP_NAME
77    }
78
79    async fn start(&mut self) -> Result<()> {
80        let plugins = self.frontend.instance.plugins().clone();
81        plugins::start_frontend_plugins(plugins)
82            .await
83            .context(error::StartFrontendSnafu)?;
84
85        self.frontend
86            .start()
87            .await
88            .context(error::StartFrontendSnafu)
89    }
90
91    async fn stop(&mut self) -> Result<()> {
92        self.frontend
93            .shutdown()
94            .await
95            .context(error::ShutdownFrontendSnafu)
96    }
97}
98
99#[derive(Parser)]
100pub struct Command {
101    #[clap(subcommand)]
102    subcmd: SubCommand,
103}
104
105impl Command {
106    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
107        self.subcmd.build(opts).await
108    }
109
110    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
111        self.subcmd.load_options(global_options)
112    }
113}
114
115#[derive(Parser)]
116enum SubCommand {
117    Start(StartCommand),
118}
119
120impl SubCommand {
121    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
122        match self {
123            SubCommand::Start(cmd) => cmd.build(opts).await,
124        }
125    }
126
127    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
128        match self {
129            SubCommand::Start(cmd) => cmd.load_options(global_options),
130        }
131    }
132}
133
134#[derive(Debug, Default, Parser)]
135pub struct StartCommand {
136    /// The address to bind the gRPC server.
137    #[clap(long, alias = "rpc-addr")]
138    rpc_bind_addr: Option<String>,
139    /// The address advertised to the metasrv, and used for connections from outside the host.
140    /// If left empty or unset, the server will automatically use the IP address of the first network interface
141    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
142    #[clap(long, alias = "rpc-hostname")]
143    rpc_server_addr: Option<String>,
144    #[clap(long)]
145    http_addr: Option<String>,
146    #[clap(long)]
147    http_timeout: Option<u64>,
148    #[clap(long)]
149    mysql_addr: Option<String>,
150    #[clap(long)]
151    postgres_addr: Option<String>,
152    #[clap(short, long)]
153    config_file: Option<String>,
154    #[clap(short, long)]
155    influxdb_enable: Option<bool>,
156    #[clap(long, value_delimiter = ',', num_args = 1..)]
157    metasrv_addrs: Option<Vec<String>>,
158    #[clap(long)]
159    tls_mode: Option<TlsMode>,
160    #[clap(long)]
161    tls_cert_path: Option<String>,
162    #[clap(long)]
163    tls_key_path: Option<String>,
164    #[clap(long)]
165    user_provider: Option<String>,
166    #[clap(long)]
167    disable_dashboard: Option<bool>,
168    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
169    env_prefix: String,
170}
171
172impl StartCommand {
173    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
174        let mut opts = FrontendOptions::load_layered_options(
175            self.config_file.as_deref(),
176            self.env_prefix.as_ref(),
177        )
178        .context(error::LoadLayeredConfigSnafu)?;
179
180        self.merge_with_cli_options(global_options, &mut opts)?;
181
182        Ok(opts)
183    }
184
185    // The precedence order is: cli > config file > environment variables > default values.
186    fn merge_with_cli_options(
187        &self,
188        global_options: &GlobalOptions,
189        opts: &mut FrontendOptions,
190    ) -> Result<()> {
191        let opts = &mut opts.component;
192
193        if let Some(dir) = &global_options.log_dir {
194            opts.logging.dir.clone_from(dir);
195        }
196
197        if global_options.log_level.is_some() {
198            opts.logging.level.clone_from(&global_options.log_level);
199        }
200
201        opts.tracing = TracingOptions {
202            #[cfg(feature = "tokio-console")]
203            tokio_console_addr: global_options.tokio_console_addr.clone(),
204        };
205
206        let tls_opts = TlsOption::new(
207            self.tls_mode.clone(),
208            self.tls_cert_path.clone(),
209            self.tls_key_path.clone(),
210        );
211
212        if let Some(addr) = &self.http_addr {
213            opts.http.addr.clone_from(addr);
214        }
215
216        if let Some(http_timeout) = self.http_timeout {
217            opts.http.timeout = Duration::from_secs(http_timeout)
218        }
219
220        if let Some(disable_dashboard) = self.disable_dashboard {
221            opts.http.disable_dashboard = disable_dashboard;
222        }
223
224        if let Some(addr) = &self.rpc_bind_addr {
225            opts.grpc.bind_addr.clone_from(addr);
226            opts.grpc.tls = tls_opts.clone();
227        }
228
229        if let Some(addr) = &self.rpc_server_addr {
230            opts.grpc.server_addr.clone_from(addr);
231        }
232
233        if let Some(addr) = &self.mysql_addr {
234            opts.mysql.enable = true;
235            opts.mysql.addr.clone_from(addr);
236            opts.mysql.tls = tls_opts.clone();
237        }
238
239        if let Some(addr) = &self.postgres_addr {
240            opts.postgres.enable = true;
241            opts.postgres.addr.clone_from(addr);
242            opts.postgres.tls = tls_opts;
243        }
244
245        if let Some(enable) = self.influxdb_enable {
246            opts.influxdb.enable = enable;
247        }
248
249        if let Some(metasrv_addrs) = &self.metasrv_addrs {
250            opts.meta_client
251                .get_or_insert_with(MetaClientOptions::default)
252                .metasrv_addrs
253                .clone_from(metasrv_addrs);
254        }
255
256        if let Some(user_provider) = &self.user_provider {
257            opts.user_provider = Some(user_provider.clone());
258        }
259
260        Ok(())
261    }
262
263    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
264        common_runtime::init_global_runtimes(&opts.runtime);
265
266        let guard = common_telemetry::init_global_logging(
267            APP_NAME,
268            &opts.component.logging,
269            &opts.component.tracing,
270            opts.component.node_id.clone(),
271            opts.component.slow_query.as_ref(),
272        );
273        log_versions(version(), short_version(), APP_NAME);
274
275        info!("Frontend start command: {:#?}", self);
276        info!("Frontend options: {:#?}", opts);
277
278        let plugin_opts = opts.plugins;
279        let mut opts = opts.component;
280        opts.grpc.detect_server_addr();
281        let mut plugins = Plugins::new();
282        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
283            .await
284            .context(error::StartFrontendSnafu)?;
285
286        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
287
288        let meta_client_options = opts
289            .meta_client
290            .as_ref()
291            .context(error::MissingConfigSnafu {
292                msg: "'meta_client'",
293            })?;
294
295        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
296        let cache_ttl = meta_client_options.metadata_cache_ttl;
297        let cache_tti = meta_client_options.metadata_cache_tti;
298
299        let meta_client = meta_client::create_meta_client(
300            MetaClientType::Frontend,
301            meta_client_options,
302            Some(&plugins),
303        )
304        .await
305        .context(error::MetaClientInitSnafu)?;
306
307        // TODO(discord9): add helper function to ease the creation of cache registry&such
308        let cached_meta_backend =
309            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
310                .cache_max_capacity(cache_max_capacity)
311                .cache_ttl(cache_ttl)
312                .cache_tti(cache_tti)
313                .build();
314        let cached_meta_backend = Arc::new(cached_meta_backend);
315
316        // Builds cache registry
317        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
318            CacheRegistryBuilder::default()
319                .add_cache(cached_meta_backend.clone())
320                .build(),
321        );
322        let fundamental_cache_registry =
323            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
324        let layered_cache_registry = Arc::new(
325            with_default_composite_cache_registry(
326                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
327            )
328            .context(error::BuildCacheRegistrySnafu)?
329            .build(),
330        );
331
332        let information_extension =
333            Arc::new(DistributedInformationExtension::new(meta_client.clone()));
334        let catalog_manager = KvBackendCatalogManager::new(
335            information_extension,
336            cached_meta_backend.clone(),
337            layered_cache_registry.clone(),
338            None,
339        );
340
341        let executor = HandlerGroupExecutor::new(vec![
342            Arc::new(ParseMailboxMessageHandler),
343            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
344        ]);
345
346        let heartbeat_task = HeartbeatTask::new(
347            &opts,
348            meta_client.clone(),
349            opts.heartbeat.clone(),
350            Arc::new(executor),
351        );
352        let heartbeat_task = Some(heartbeat_task);
353
354        // frontend to datanode need not timeout.
355        // Some queries are expected to take long time.
356        let channel_config = ChannelConfig {
357            timeout: None,
358            tcp_nodelay: opts.datanode.client.tcp_nodelay,
359            connect_timeout: Some(opts.datanode.client.connect_timeout),
360            ..Default::default()
361        };
362        let client = NodeClients::new(channel_config);
363
364        let instance = FrontendBuilder::new(
365            opts.clone(),
366            cached_meta_backend.clone(),
367            layered_cache_registry.clone(),
368            catalog_manager,
369            Arc::new(client),
370            meta_client,
371        )
372        .with_plugin(plugins.clone())
373        .with_local_cache_invalidator(layered_cache_registry)
374        .try_build()
375        .await
376        .context(error::StartFrontendSnafu)?;
377        let instance = Arc::new(instance);
378
379        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
380            .context(error::ServersSnafu)?;
381
382        let servers = Services::new(opts, instance.clone(), plugins)
383            .build()
384            .context(error::StartFrontendSnafu)?;
385
386        let frontend = Frontend {
387            instance,
388            servers,
389            heartbeat_task,
390            export_metrics_task,
391        };
392
393        Ok(Instance::new(frontend, guard))
394    }
395}
396
397#[cfg(test)]
398mod tests {
399    use std::io::Write;
400    use std::time::Duration;
401
402    use auth::{Identity, Password, UserProviderRef};
403    use common_base::readable_size::ReadableSize;
404    use common_config::ENV_VAR_SEP;
405    use common_test_util::temp_dir::create_named_temp_file;
406    use servers::grpc::GrpcOptions;
407    use servers::http::HttpOptions;
408
409    use super::*;
410    use crate::options::GlobalOptions;
411
412    #[test]
413    fn test_try_from_start_command() {
414        let command = StartCommand {
415            http_addr: Some("127.0.0.1:1234".to_string()),
416            mysql_addr: Some("127.0.0.1:5678".to_string()),
417            postgres_addr: Some("127.0.0.1:5432".to_string()),
418            influxdb_enable: Some(false),
419            disable_dashboard: Some(false),
420            ..Default::default()
421        };
422
423        let opts = command.load_options(&Default::default()).unwrap().component;
424
425        assert_eq!(opts.http.addr, "127.0.0.1:1234");
426        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
427        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
428        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
429
430        let default_opts = FrontendOptions::default().component;
431
432        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
433        assert!(opts.mysql.enable);
434        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
435        assert!(opts.postgres.enable);
436        assert_eq!(
437            opts.postgres.runtime_size,
438            default_opts.postgres.runtime_size
439        );
440        assert!(opts.opentsdb.enable);
441
442        assert!(!opts.influxdb.enable);
443    }
444
445    #[test]
446    fn test_read_from_config_file() {
447        let mut file = create_named_temp_file();
448        let toml_str = r#"
449            [http]
450            addr = "127.0.0.1:4000"
451            timeout = "0s"
452            body_limit = "2GB"
453
454            [opentsdb]
455            enable = false
456
457            [logging]
458            level = "debug"
459            dir = "./greptimedb_data/test/logs"
460        "#;
461        write!(file, "{}", toml_str).unwrap();
462
463        let command = StartCommand {
464            config_file: Some(file.path().to_str().unwrap().to_string()),
465            disable_dashboard: Some(false),
466            ..Default::default()
467        };
468
469        let fe_opts = command.load_options(&Default::default()).unwrap().component;
470
471        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
472        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
473
474        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
475
476        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
477        assert_eq!(
478            "./greptimedb_data/test/logs".to_string(),
479            fe_opts.logging.dir
480        );
481        assert!(!fe_opts.opentsdb.enable);
482    }
483
484    #[tokio::test]
485    async fn test_try_from_start_command_to_anymap() {
486        let fe_opts = frontend::frontend::FrontendOptions {
487            http: HttpOptions {
488                disable_dashboard: false,
489                ..Default::default()
490            },
491            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
492            ..Default::default()
493        };
494
495        let mut plugins = Plugins::new();
496        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
497            .await
498            .unwrap();
499
500        let provider = plugins.get::<UserProviderRef>().unwrap();
501        let result = provider
502            .authenticate(
503                Identity::UserId("test", None),
504                Password::PlainText("test".to_string().into()),
505            )
506            .await;
507        let _ = result.unwrap();
508    }
509
510    #[test]
511    fn test_load_log_options_from_cli() {
512        let cmd = StartCommand {
513            disable_dashboard: Some(false),
514            ..Default::default()
515        };
516
517        let options = cmd
518            .load_options(&GlobalOptions {
519                log_dir: Some("./greptimedb_data/test/logs".to_string()),
520                log_level: Some("debug".to_string()),
521
522                #[cfg(feature = "tokio-console")]
523                tokio_console_addr: None,
524            })
525            .unwrap()
526            .component;
527
528        let logging_opt = options.logging;
529        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
530        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
531    }
532
533    #[test]
534    fn test_config_precedence_order() {
535        let mut file = create_named_temp_file();
536        let toml_str = r#"
537            [http]
538            addr = "127.0.0.1:4000"
539
540            [meta_client]
541            timeout = "3s"
542            connect_timeout = "5s"
543            tcp_nodelay = true
544
545            [mysql]
546            addr = "127.0.0.1:4002"
547        "#;
548        write!(file, "{}", toml_str).unwrap();
549
550        let env_prefix = "FRONTEND_UT";
551        temp_env::with_vars(
552            [
553                (
554                    // mysql.addr = 127.0.0.1:14002
555                    [
556                        env_prefix.to_string(),
557                        "mysql".to_uppercase(),
558                        "addr".to_uppercase(),
559                    ]
560                    .join(ENV_VAR_SEP),
561                    Some("127.0.0.1:14002"),
562                ),
563                (
564                    // mysql.runtime_size = 11
565                    [
566                        env_prefix.to_string(),
567                        "mysql".to_uppercase(),
568                        "runtime_size".to_uppercase(),
569                    ]
570                    .join(ENV_VAR_SEP),
571                    Some("11"),
572                ),
573                (
574                    // http.addr = 127.0.0.1:24000
575                    [
576                        env_prefix.to_string(),
577                        "http".to_uppercase(),
578                        "addr".to_uppercase(),
579                    ]
580                    .join(ENV_VAR_SEP),
581                    Some("127.0.0.1:24000"),
582                ),
583                (
584                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
585                    [
586                        env_prefix.to_string(),
587                        "meta_client".to_uppercase(),
588                        "metasrv_addrs".to_uppercase(),
589                    ]
590                    .join(ENV_VAR_SEP),
591                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
592                ),
593            ],
594            || {
595                let command = StartCommand {
596                    config_file: Some(file.path().to_str().unwrap().to_string()),
597                    http_addr: Some("127.0.0.1:14000".to_string()),
598                    env_prefix: env_prefix.to_string(),
599                    ..Default::default()
600                };
601
602                let fe_opts = command.load_options(&Default::default()).unwrap().component;
603
604                // Should be read from env, env > default values.
605                assert_eq!(fe_opts.mysql.runtime_size, 11);
606                assert_eq!(
607                    fe_opts.meta_client.unwrap().metasrv_addrs,
608                    vec![
609                        "127.0.0.1:3001".to_string(),
610                        "127.0.0.1:3002".to_string(),
611                        "127.0.0.1:3003".to_string()
612                    ]
613                );
614
615                // Should be read from config file, config file > env > default values.
616                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
617
618                // Should be read from cli, cli > config file > env > default values.
619                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
620
621                // Should be default value.
622                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
623            },
624        );
625    }
626}