cmd/
frontend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
31use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
32use common_meta::heartbeat::handler::HandlerGroupExecutor;
33use common_telemetry::info;
34use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
35use common_time::timezone::set_default_timezone;
36use common_version::{short_version, version};
37use frontend::frontend::Frontend;
38use frontend::heartbeat::HeartbeatTask;
39use frontend::instance::builder::FrontendBuilder;
40use frontend::server::Services;
41use meta_client::{MetaClientOptions, MetaClientType};
42use servers::addrs;
43use servers::export_metrics::ExportMetricsTask;
44use servers::tls::{TlsMode, TlsOption};
45use snafu::{OptionExt, ResultExt};
46use tracing_appender::non_blocking::WorkerGuard;
47
48use crate::error::{self, Result};
49use crate::options::{GlobalOptions, GreptimeOptions};
50use crate::{create_resource_limit_metrics, log_versions, App};
51
52type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
53
54pub struct Instance {
55    frontend: Frontend,
56    // Keep the logging guard to prevent the worker from being dropped.
57    _guard: Vec<WorkerGuard>,
58}
59
60pub const APP_NAME: &str = "greptime-frontend";
61
62impl Instance {
63    pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
64        Self { frontend, _guard }
65    }
66
67    pub fn inner(&self) -> &Frontend {
68        &self.frontend
69    }
70
71    pub fn mut_inner(&mut self) -> &mut Frontend {
72        &mut self.frontend
73    }
74}
75
76#[async_trait]
77impl App for Instance {
78    fn name(&self) -> &str {
79        APP_NAME
80    }
81
82    async fn start(&mut self) -> Result<()> {
83        let plugins = self.frontend.instance.plugins().clone();
84        plugins::start_frontend_plugins(plugins)
85            .await
86            .context(error::StartFrontendSnafu)?;
87
88        self.frontend
89            .start()
90            .await
91            .context(error::StartFrontendSnafu)
92    }
93
94    async fn stop(&mut self) -> Result<()> {
95        self.frontend
96            .shutdown()
97            .await
98            .context(error::ShutdownFrontendSnafu)
99    }
100}
101
102#[derive(Parser)]
103pub struct Command {
104    #[clap(subcommand)]
105    subcmd: SubCommand,
106}
107
108impl Command {
109    pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
110        self.subcmd.build(opts).await
111    }
112
113    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
114        self.subcmd.load_options(global_options)
115    }
116}
117
118#[derive(Parser)]
119enum SubCommand {
120    Start(StartCommand),
121}
122
123impl SubCommand {
124    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
125        match self {
126            SubCommand::Start(cmd) => cmd.build(opts).await,
127        }
128    }
129
130    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
131        match self {
132            SubCommand::Start(cmd) => cmd.load_options(global_options),
133        }
134    }
135}
136
137#[derive(Debug, Default, Parser)]
138pub struct StartCommand {
139    /// The address to bind the gRPC server.
140    #[clap(long, alias = "rpc-addr")]
141    rpc_bind_addr: Option<String>,
142    /// The address advertised to the metasrv, and used for connections from outside the host.
143    /// If left empty or unset, the server will automatically use the IP address of the first network interface
144    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
145    #[clap(long, alias = "rpc-hostname")]
146    rpc_server_addr: Option<String>,
147    #[clap(long)]
148    http_addr: Option<String>,
149    #[clap(long)]
150    http_timeout: Option<u64>,
151    #[clap(long)]
152    mysql_addr: Option<String>,
153    #[clap(long)]
154    postgres_addr: Option<String>,
155    #[clap(short, long)]
156    config_file: Option<String>,
157    #[clap(short, long)]
158    influxdb_enable: Option<bool>,
159    #[clap(long, value_delimiter = ',', num_args = 1..)]
160    metasrv_addrs: Option<Vec<String>>,
161    #[clap(long)]
162    tls_mode: Option<TlsMode>,
163    #[clap(long)]
164    tls_cert_path: Option<String>,
165    #[clap(long)]
166    tls_key_path: Option<String>,
167    #[clap(long)]
168    user_provider: Option<String>,
169    #[clap(long)]
170    disable_dashboard: Option<bool>,
171    #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
172    env_prefix: String,
173}
174
175impl StartCommand {
176    fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
177        let mut opts = FrontendOptions::load_layered_options(
178            self.config_file.as_deref(),
179            self.env_prefix.as_ref(),
180        )
181        .context(error::LoadLayeredConfigSnafu)?;
182
183        self.merge_with_cli_options(global_options, &mut opts)?;
184
185        Ok(opts)
186    }
187
188    // The precedence order is: cli > config file > environment variables > default values.
189    fn merge_with_cli_options(
190        &self,
191        global_options: &GlobalOptions,
192        opts: &mut FrontendOptions,
193    ) -> Result<()> {
194        let opts = &mut opts.component;
195
196        if let Some(dir) = &global_options.log_dir {
197            opts.logging.dir.clone_from(dir);
198        }
199
200        // If the logging dir is not set, use the default logs dir in the data home.
201        if opts.logging.dir.is_empty() {
202            opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
203                .join(DEFAULT_LOGGING_DIR)
204                .to_string_lossy()
205                .to_string();
206        }
207
208        if global_options.log_level.is_some() {
209            opts.logging.level.clone_from(&global_options.log_level);
210        }
211
212        opts.tracing = TracingOptions {
213            #[cfg(feature = "tokio-console")]
214            tokio_console_addr: global_options.tokio_console_addr.clone(),
215        };
216
217        let tls_opts = TlsOption::new(
218            self.tls_mode.clone(),
219            self.tls_cert_path.clone(),
220            self.tls_key_path.clone(),
221        );
222
223        if let Some(addr) = &self.http_addr {
224            opts.http.addr.clone_from(addr);
225        }
226
227        if let Some(http_timeout) = self.http_timeout {
228            opts.http.timeout = Duration::from_secs(http_timeout)
229        }
230
231        if let Some(disable_dashboard) = self.disable_dashboard {
232            opts.http.disable_dashboard = disable_dashboard;
233        }
234
235        if let Some(addr) = &self.rpc_bind_addr {
236            opts.grpc.bind_addr.clone_from(addr);
237            opts.grpc.tls = tls_opts.clone();
238        }
239
240        if let Some(addr) = &self.rpc_server_addr {
241            opts.grpc.server_addr.clone_from(addr);
242        }
243
244        if let Some(addr) = &self.mysql_addr {
245            opts.mysql.enable = true;
246            opts.mysql.addr.clone_from(addr);
247            opts.mysql.tls = tls_opts.clone();
248        }
249
250        if let Some(addr) = &self.postgres_addr {
251            opts.postgres.enable = true;
252            opts.postgres.addr.clone_from(addr);
253            opts.postgres.tls = tls_opts;
254        }
255
256        if let Some(enable) = self.influxdb_enable {
257            opts.influxdb.enable = enable;
258        }
259
260        if let Some(metasrv_addrs) = &self.metasrv_addrs {
261            opts.meta_client
262                .get_or_insert_with(MetaClientOptions::default)
263                .metasrv_addrs
264                .clone_from(metasrv_addrs);
265        }
266
267        if let Some(user_provider) = &self.user_provider {
268            opts.user_provider = Some(user_provider.clone());
269        }
270
271        Ok(())
272    }
273
274    async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
275        common_runtime::init_global_runtimes(&opts.runtime);
276
277        let guard = common_telemetry::init_global_logging(
278            APP_NAME,
279            &opts.component.logging,
280            &opts.component.tracing,
281            opts.component.node_id.clone(),
282            opts.component.slow_query.as_ref(),
283        );
284
285        log_versions(version(), short_version(), APP_NAME);
286        create_resource_limit_metrics(APP_NAME);
287
288        info!("Frontend start command: {:#?}", self);
289        info!("Frontend options: {:#?}", opts);
290
291        let plugin_opts = opts.plugins;
292        let mut opts = opts.component;
293        opts.grpc.detect_server_addr();
294        let mut plugins = Plugins::new();
295        plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
296            .await
297            .context(error::StartFrontendSnafu)?;
298
299        set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
300
301        let meta_client_options = opts
302            .meta_client
303            .as_ref()
304            .context(error::MissingConfigSnafu {
305                msg: "'meta_client'",
306            })?;
307
308        let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
309        let cache_ttl = meta_client_options.metadata_cache_ttl;
310        let cache_tti = meta_client_options.metadata_cache_tti;
311
312        let meta_client = meta_client::create_meta_client(
313            MetaClientType::Frontend,
314            meta_client_options,
315            Some(&plugins),
316            None,
317        )
318        .await
319        .context(error::MetaClientInitSnafu)?;
320
321        // TODO(discord9): add helper function to ease the creation of cache registry&such
322        let cached_meta_backend =
323            CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
324                .cache_max_capacity(cache_max_capacity)
325                .cache_ttl(cache_ttl)
326                .cache_tti(cache_tti)
327                .build();
328        let cached_meta_backend = Arc::new(cached_meta_backend);
329
330        // Builds cache registry
331        let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
332            CacheRegistryBuilder::default()
333                .add_cache(cached_meta_backend.clone())
334                .build(),
335        );
336        let fundamental_cache_registry =
337            build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
338        let layered_cache_registry = Arc::new(
339            with_default_composite_cache_registry(
340                layered_cache_builder.add_cache_registry(fundamental_cache_registry),
341            )
342            .context(error::BuildCacheRegistrySnafu)?
343            .build(),
344        );
345
346        let information_extension =
347            Arc::new(DistributedInformationExtension::new(meta_client.clone()));
348
349        let process_manager = Arc::new(ProcessManager::new(
350            addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
351            Some(meta_client.clone()),
352        ));
353        let catalog_manager = KvBackendCatalogManager::new(
354            information_extension,
355            cached_meta_backend.clone(),
356            layered_cache_registry.clone(),
357            None,
358            Some(process_manager.clone()),
359        );
360
361        let executor = HandlerGroupExecutor::new(vec![
362            Arc::new(ParseMailboxMessageHandler),
363            Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
364        ]);
365
366        let heartbeat_task = HeartbeatTask::new(
367            &opts,
368            meta_client.clone(),
369            opts.heartbeat.clone(),
370            Arc::new(executor),
371        );
372        let heartbeat_task = Some(heartbeat_task);
373
374        // frontend to datanode need not timeout.
375        // Some queries are expected to take long time.
376        let mut channel_config = ChannelConfig {
377            timeout: None,
378            tcp_nodelay: opts.datanode.client.tcp_nodelay,
379            connect_timeout: Some(opts.datanode.client.connect_timeout),
380            ..Default::default()
381        };
382        if opts.grpc.flight_compression.transport_compression() {
383            channel_config.accept_compression = true;
384            channel_config.send_compression = true;
385        }
386        let client = NodeClients::new(channel_config);
387
388        let instance = FrontendBuilder::new(
389            opts.clone(),
390            cached_meta_backend.clone(),
391            layered_cache_registry.clone(),
392            catalog_manager,
393            Arc::new(client),
394            meta_client,
395            process_manager,
396        )
397        .with_plugin(plugins.clone())
398        .with_local_cache_invalidator(layered_cache_registry)
399        .try_build()
400        .await
401        .context(error::StartFrontendSnafu)?;
402        let instance = Arc::new(instance);
403
404        let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
405            .context(error::ServersSnafu)?;
406
407        let servers = Services::new(opts, instance.clone(), plugins)
408            .build()
409            .context(error::StartFrontendSnafu)?;
410
411        let frontend = Frontend {
412            instance,
413            servers,
414            heartbeat_task,
415            export_metrics_task,
416        };
417
418        Ok(Instance::new(frontend, guard))
419    }
420}
421
422#[cfg(test)]
423mod tests {
424    use std::io::Write;
425    use std::time::Duration;
426
427    use auth::{Identity, Password, UserProviderRef};
428    use common_base::readable_size::ReadableSize;
429    use common_config::ENV_VAR_SEP;
430    use common_test_util::temp_dir::create_named_temp_file;
431    use servers::grpc::GrpcOptions;
432    use servers::http::HttpOptions;
433
434    use super::*;
435    use crate::options::GlobalOptions;
436
437    #[test]
438    fn test_try_from_start_command() {
439        let command = StartCommand {
440            http_addr: Some("127.0.0.1:1234".to_string()),
441            mysql_addr: Some("127.0.0.1:5678".to_string()),
442            postgres_addr: Some("127.0.0.1:5432".to_string()),
443            influxdb_enable: Some(false),
444            disable_dashboard: Some(false),
445            ..Default::default()
446        };
447
448        let opts = command.load_options(&Default::default()).unwrap().component;
449
450        assert_eq!(opts.http.addr, "127.0.0.1:1234");
451        assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
452        assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
453        assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
454
455        let default_opts = FrontendOptions::default().component;
456
457        assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
458        assert!(opts.mysql.enable);
459        assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
460        assert!(opts.postgres.enable);
461        assert_eq!(
462            opts.postgres.runtime_size,
463            default_opts.postgres.runtime_size
464        );
465        assert!(opts.opentsdb.enable);
466
467        assert!(!opts.influxdb.enable);
468    }
469
470    #[test]
471    fn test_read_from_config_file() {
472        let mut file = create_named_temp_file();
473        let toml_str = r#"
474            [http]
475            addr = "127.0.0.1:4000"
476            timeout = "0s"
477            body_limit = "2GB"
478
479            [opentsdb]
480            enable = false
481
482            [logging]
483            level = "debug"
484            dir = "./greptimedb_data/test/logs"
485        "#;
486        write!(file, "{}", toml_str).unwrap();
487
488        let command = StartCommand {
489            config_file: Some(file.path().to_str().unwrap().to_string()),
490            disable_dashboard: Some(false),
491            ..Default::default()
492        };
493
494        let fe_opts = command.load_options(&Default::default()).unwrap().component;
495
496        assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
497        assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
498
499        assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
500
501        assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
502        assert_eq!(
503            "./greptimedb_data/test/logs".to_string(),
504            fe_opts.logging.dir
505        );
506        assert!(!fe_opts.opentsdb.enable);
507    }
508
509    #[tokio::test]
510    async fn test_try_from_start_command_to_anymap() {
511        let fe_opts = frontend::frontend::FrontendOptions {
512            http: HttpOptions {
513                disable_dashboard: false,
514                ..Default::default()
515            },
516            user_provider: Some("static_user_provider:cmd:test=test".to_string()),
517            ..Default::default()
518        };
519
520        let mut plugins = Plugins::new();
521        plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
522            .await
523            .unwrap();
524
525        let provider = plugins.get::<UserProviderRef>().unwrap();
526        let result = provider
527            .authenticate(
528                Identity::UserId("test", None),
529                Password::PlainText("test".to_string().into()),
530            )
531            .await;
532        let _ = result.unwrap();
533    }
534
535    #[test]
536    fn test_load_log_options_from_cli() {
537        let cmd = StartCommand {
538            disable_dashboard: Some(false),
539            ..Default::default()
540        };
541
542        let options = cmd
543            .load_options(&GlobalOptions {
544                log_dir: Some("./greptimedb_data/test/logs".to_string()),
545                log_level: Some("debug".to_string()),
546
547                #[cfg(feature = "tokio-console")]
548                tokio_console_addr: None,
549            })
550            .unwrap()
551            .component;
552
553        let logging_opt = options.logging;
554        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
555        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
556    }
557
558    #[test]
559    fn test_config_precedence_order() {
560        let mut file = create_named_temp_file();
561        let toml_str = r#"
562            [http]
563            addr = "127.0.0.1:4000"
564
565            [meta_client]
566            timeout = "3s"
567            connect_timeout = "5s"
568            tcp_nodelay = true
569
570            [mysql]
571            addr = "127.0.0.1:4002"
572        "#;
573        write!(file, "{}", toml_str).unwrap();
574
575        let env_prefix = "FRONTEND_UT";
576        temp_env::with_vars(
577            [
578                (
579                    // mysql.addr = 127.0.0.1:14002
580                    [
581                        env_prefix.to_string(),
582                        "mysql".to_uppercase(),
583                        "addr".to_uppercase(),
584                    ]
585                    .join(ENV_VAR_SEP),
586                    Some("127.0.0.1:14002"),
587                ),
588                (
589                    // mysql.runtime_size = 11
590                    [
591                        env_prefix.to_string(),
592                        "mysql".to_uppercase(),
593                        "runtime_size".to_uppercase(),
594                    ]
595                    .join(ENV_VAR_SEP),
596                    Some("11"),
597                ),
598                (
599                    // http.addr = 127.0.0.1:24000
600                    [
601                        env_prefix.to_string(),
602                        "http".to_uppercase(),
603                        "addr".to_uppercase(),
604                    ]
605                    .join(ENV_VAR_SEP),
606                    Some("127.0.0.1:24000"),
607                ),
608                (
609                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
610                    [
611                        env_prefix.to_string(),
612                        "meta_client".to_uppercase(),
613                        "metasrv_addrs".to_uppercase(),
614                    ]
615                    .join(ENV_VAR_SEP),
616                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
617                ),
618            ],
619            || {
620                let command = StartCommand {
621                    config_file: Some(file.path().to_str().unwrap().to_string()),
622                    http_addr: Some("127.0.0.1:14000".to_string()),
623                    env_prefix: env_prefix.to_string(),
624                    ..Default::default()
625                };
626
627                let fe_opts = command.load_options(&Default::default()).unwrap().component;
628
629                // Should be read from env, env > default values.
630                assert_eq!(fe_opts.mysql.runtime_size, 11);
631                assert_eq!(
632                    fe_opts.meta_client.unwrap().metasrv_addrs,
633                    vec![
634                        "127.0.0.1:3001".to_string(),
635                        "127.0.0.1:3002".to_string(),
636                        "127.0.0.1:3003".to_string()
637                    ]
638                );
639
640                // Should be read from config file, config file > env > default values.
641                assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
642
643                // Should be read from cli, cli > config file > env > default values.
644                assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
645
646                // Should be default value.
647                assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
648            },
649        );
650    }
651}