1use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{
25 CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
26 MetaKvBackend,
27};
28use catalog::process_manager::ProcessManager;
29use clap::Parser;
30use client::client_manager::NodeClients;
31use common_base::Plugins;
32use common_config::{Configurable, DEFAULT_DATA_HOME};
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::ChannelConfig;
35use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
36use common_meta::heartbeat::handler::HandlerGroupExecutor;
37use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
38use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
39use common_meta::heartbeat::handler::suspend::SuspendHandler;
40use common_query::prelude::set_default_prefix;
41use common_stat::ResourceStatImpl;
42use common_telemetry::info;
43use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
44use common_time::timezone::set_default_timezone;
45use common_version::{short_version, verbose_version};
46use frontend::frontend::Frontend;
47use frontend::heartbeat::HeartbeatTask;
48use frontend::instance::builder::FrontendBuilder;
49use frontend::server::Services;
50use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
51use plugins::PluginOptions;
52use plugins::frontend::context::{
53 CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
54};
55use plugins::frontend::setup_frontend_dynamic_plugins;
56use plugins::options::PluginOptionsDeserializerImpl;
57use servers::addrs;
58use servers::grpc::GrpcOptions;
59use servers::tls::{TlsMode, TlsOption, merge_tls_option};
60use snafu::{OptionExt, ResultExt};
61use tracing_appender::non_blocking::WorkerGuard;
62
63use crate::error::{self, OtherSnafu, Result};
64use crate::options::{GlobalOptions, GreptimeOptions};
65use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
66
67type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
68
69pub struct Instance {
70 frontend: Frontend,
71 _guard: Vec<WorkerGuard>,
73}
74
75pub const APP_NAME: &str = "greptime-frontend";
76
77impl Instance {
78 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
79 Self { frontend, _guard }
80 }
81
82 pub fn inner(&self) -> &Frontend {
83 &self.frontend
84 }
85
86 pub fn mut_inner(&mut self) -> &mut Frontend {
87 &mut self.frontend
88 }
89}
90
91#[async_trait]
92impl App for Instance {
93 fn name(&self) -> &str {
94 APP_NAME
95 }
96
97 async fn start(&mut self) -> Result<()> {
98 let plugins = self.frontend.instance.plugins().clone();
99 plugins::start_frontend_plugins(plugins)
100 .await
101 .context(error::StartFrontendSnafu)?;
102
103 self.frontend
104 .start()
105 .await
106 .context(error::StartFrontendSnafu)
107 }
108
109 async fn stop(&mut self) -> Result<()> {
110 self.frontend
111 .shutdown()
112 .await
113 .context(error::ShutdownFrontendSnafu)
114 }
115}
116
117#[derive(Parser)]
118pub struct Command {
119 #[clap(subcommand)]
120 pub subcmd: SubCommand,
121}
122
123impl Command {
124 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
125 self.subcmd.build(opts).await
126 }
127
128 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
129 self.subcmd.load_options(global_options)
130 }
131}
132
133#[derive(Parser)]
134pub enum SubCommand {
135 Start(StartCommand),
136}
137
138impl SubCommand {
139 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
140 match self {
141 SubCommand::Start(cmd) => cmd.build(opts).await,
142 }
143 }
144
145 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
146 match self {
147 SubCommand::Start(cmd) => cmd.load_options(global_options),
148 }
149 }
150}
151
152#[derive(Debug, Default, Parser)]
153pub struct StartCommand {
154 #[clap(long, alias = "rpc-addr")]
156 rpc_bind_addr: Option<String>,
157 #[clap(long, alias = "rpc-hostname")]
161 rpc_server_addr: Option<String>,
162 #[clap(long, alias = "internal-rpc-addr")]
164 internal_rpc_bind_addr: Option<String>,
165 #[clap(long, alias = "internal-rpc-hostname")]
169 internal_rpc_server_addr: Option<String>,
170 #[clap(long)]
171 http_addr: Option<String>,
172 #[clap(long)]
173 http_timeout: Option<u64>,
174 #[clap(long)]
175 mysql_addr: Option<String>,
176 #[clap(long)]
177 postgres_addr: Option<String>,
178 #[clap(short, long)]
179 pub config_file: Option<String>,
180 #[clap(short, long)]
181 influxdb_enable: Option<bool>,
182 #[clap(long, value_delimiter = ',', num_args = 1..)]
183 metasrv_addrs: Option<Vec<String>>,
184 #[clap(long)]
185 tls_mode: Option<TlsMode>,
186 #[clap(long)]
187 tls_cert_path: Option<String>,
188 #[clap(long)]
189 tls_key_path: Option<String>,
190 #[clap(long)]
191 tls_watch: bool,
192 #[clap(long)]
193 user_provider: Option<String>,
194 #[clap(long)]
195 disable_dashboard: Option<bool>,
196 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
197 pub env_prefix: String,
198}
199
200impl StartCommand {
201 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
202 let mut opts = FrontendOptions::load_layered_options(
203 self.config_file.as_deref(),
204 self.env_prefix.as_ref(),
205 )
206 .context(error::LoadLayeredConfigSnafu)?;
207
208 self.merge_with_cli_options(global_options, &mut opts)?;
209
210 Ok(opts)
211 }
212
213 fn merge_with_cli_options(
215 &self,
216 global_options: &GlobalOptions,
217 opts: &mut FrontendOptions,
218 ) -> Result<()> {
219 let opts = &mut opts.component;
220
221 if let Some(dir) = &global_options.log_dir {
222 opts.logging.dir.clone_from(dir);
223 }
224
225 if opts.logging.dir.is_empty() {
227 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
228 .join(DEFAULT_LOGGING_DIR)
229 .to_string_lossy()
230 .to_string();
231 }
232
233 if global_options.log_level.is_some() {
234 opts.logging.level.clone_from(&global_options.log_level);
235 }
236
237 opts.tracing = TracingOptions {
238 #[cfg(feature = "tokio-console")]
239 tokio_console_addr: global_options.tokio_console_addr.clone(),
240 };
241
242 let tls_opts = TlsOption::new(
243 self.tls_mode,
244 self.tls_cert_path.clone(),
245 self.tls_key_path.clone(),
246 self.tls_watch,
247 );
248
249 if let Some(addr) = &self.http_addr {
250 opts.http.addr.clone_from(addr);
251 }
252
253 if let Some(http_timeout) = self.http_timeout {
254 opts.http.timeout = Duration::from_secs(http_timeout)
255 }
256
257 if let Some(disable_dashboard) = self.disable_dashboard {
258 opts.http.disable_dashboard = disable_dashboard;
259 }
260
261 if let Some(addr) = &self.rpc_bind_addr {
262 opts.grpc.bind_addr.clone_from(addr);
263 opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
264 }
265
266 if let Some(addr) = &self.rpc_server_addr {
267 opts.grpc.server_addr.clone_from(addr);
268 }
269
270 if let Some(addr) = &self.internal_rpc_bind_addr {
271 if let Some(internal_grpc) = &mut opts.internal_grpc {
272 internal_grpc.bind_addr = addr.clone();
273 } else {
274 let grpc_options = GrpcOptions {
275 bind_addr: addr.clone(),
276 ..Default::default()
277 };
278
279 opts.internal_grpc = Some(grpc_options);
280 }
281 }
282
283 if let Some(addr) = &self.internal_rpc_server_addr {
284 if let Some(internal_grpc) = &mut opts.internal_grpc {
285 internal_grpc.server_addr = addr.clone();
286 } else {
287 let grpc_options = GrpcOptions {
288 server_addr: addr.clone(),
289 ..Default::default()
290 };
291 opts.internal_grpc = Some(grpc_options);
292 }
293 }
294
295 if let Some(addr) = &self.mysql_addr {
296 opts.mysql.enable = true;
297 opts.mysql.addr.clone_from(addr);
298 opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
299 }
300
301 if let Some(addr) = &self.postgres_addr {
302 opts.postgres.enable = true;
303 opts.postgres.addr.clone_from(addr);
304 opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
305 }
306
307 if let Some(enable) = self.influxdb_enable {
308 opts.influxdb.enable = enable;
309 }
310
311 if let Some(metasrv_addrs) = &self.metasrv_addrs {
312 opts.meta_client
313 .get_or_insert_with(MetaClientOptions::default)
314 .metasrv_addrs
315 .clone_from(metasrv_addrs);
316 }
317
318 if let Some(user_provider) = &self.user_provider {
319 opts.user_provider = Some(user_provider.clone());
320 }
321
322 Ok(())
323 }
324
325 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
326 common_runtime::init_global_runtimes(&opts.runtime);
327
328 let guard = common_telemetry::init_global_logging(
329 APP_NAME,
330 &opts.component.logging,
331 &opts.component.tracing,
332 opts.component.node_id.clone(),
333 Some(&opts.component.slow_query),
334 );
335
336 log_versions(verbose_version(), short_version(), APP_NAME);
337 maybe_activate_heap_profile(&opts.component.memory);
338 create_resource_limit_metrics(APP_NAME);
339
340 info!("Frontend start command: {:#?}", self);
341 info!("Frontend options: {:#?}", opts);
342
343 let plugin_opts = opts.plugins;
344 let mut opts = opts.component;
345 opts.grpc.detect_server_addr();
346 let mut plugins = Plugins::new();
347 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
348 .await
349 .context(error::StartFrontendSnafu)?;
350
351 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
352 set_default_prefix(opts.default_column_prefix.as_deref())
353 .map_err(BoxedError::new)
354 .context(error::BuildCliSnafu)?;
355
356 let meta_client_options = opts
357 .meta_client
358 .as_ref()
359 .context(error::MissingConfigSnafu {
360 msg: "'meta_client'",
361 })?;
362
363 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
364 let cache_ttl = meta_client_options.metadata_cache_ttl;
365 let cache_tti = meta_client_options.metadata_cache_tti;
366
367 let meta_client = meta_client::create_meta_client(
368 MetaClientType::Frontend,
369 meta_client_options,
370 Some(&plugins),
371 None,
372 )
373 .await
374 .context(error::MetaClientInitSnafu)?;
375
376 let meta_config: Vec<PluginOptions> = meta_client
377 .pull_config(PluginOptionsDeserializerImpl)
378 .await
379 .context(error::MetaClientInitSnafu)?;
380 setup_frontend_dynamic_plugins(meta_config, &mut plugins)
381 .await
382 .context(error::StartFrontendSnafu)?;
383
384 let cached_meta_backend =
386 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
387 .cache_max_capacity(cache_max_capacity)
388 .cache_ttl(cache_ttl)
389 .cache_tti(cache_tti)
390 .build();
391 let cached_meta_backend = Arc::new(cached_meta_backend);
392
393 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
395 CacheRegistryBuilder::default()
396 .add_cache(cached_meta_backend.clone())
397 .build(),
398 );
399 let fundamental_cache_registry =
400 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
401 let layered_cache_registry = Arc::new(
402 with_default_composite_cache_registry(
403 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
404 )
405 .context(error::BuildCacheRegistrySnafu)?
406 .build(),
407 );
408
409 let mut channel_config = ChannelConfig {
412 timeout: None,
413 tcp_nodelay: opts.datanode.client.tcp_nodelay,
414 connect_timeout: Some(opts.datanode.client.connect_timeout),
415 ..Default::default()
416 };
417 if opts.grpc.flight_compression.transport_compression() {
418 channel_config.accept_compression = true;
419 channel_config.send_compression = true;
420 }
421 let client = Arc::new(NodeClients::new(channel_config));
422
423 let information_extension = Arc::new(DistributedInformationExtension::new(
424 meta_client.clone(),
425 client.clone(),
426 ));
427 plugins.insert::<InformationExtensionRef>(information_extension.clone());
428
429 let process_manager = Arc::new(ProcessManager::new(
430 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
431 Some(meta_client.clone()),
432 ));
433
434 let builder = KvBackendCatalogManagerBuilder::new(
435 information_extension,
436 cached_meta_backend.clone(),
437 layered_cache_registry.clone(),
438 )
439 .with_process_manager(process_manager.clone());
440 let builder = if let Some(configurator) =
441 plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
442 {
443 let ctx = DistributedCatalogManagerConfigureContext {
444 meta_client: meta_client.clone(),
445 };
446 let ctx = CatalogManagerConfigureContext::Distributed(ctx);
447
448 configurator
449 .configure(builder, ctx)
450 .await
451 .context(OtherSnafu)?
452 } else {
453 builder
454 };
455 let catalog_manager = builder.build();
456
457 let instance = FrontendBuilder::new(
458 opts.clone(),
459 cached_meta_backend.clone(),
460 layered_cache_registry.clone(),
461 catalog_manager,
462 client,
463 meta_client.clone(),
464 process_manager,
465 )
466 .with_plugin(plugins.clone())
467 .with_local_cache_invalidator(layered_cache_registry)
468 .try_build()
469 .await
470 .context(error::StartFrontendSnafu)?;
471
472 let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
473
474 let instance = Arc::new(instance);
475
476 let servers = Services::new(opts, instance.clone(), plugins)
477 .build()
478 .context(error::StartFrontendSnafu)?;
479
480 let frontend = Frontend {
481 instance,
482 servers,
483 heartbeat_task,
484 };
485
486 Ok(Instance::new(frontend, guard))
487 }
488}
489
490pub fn create_heartbeat_task(
491 options: &frontend::frontend::FrontendOptions,
492 meta_client: MetaClientRef,
493 instance: &frontend::instance::Instance,
494) -> HeartbeatTask {
495 let executor = Arc::new(HandlerGroupExecutor::new(vec![
496 Arc::new(ParseMailboxMessageHandler),
497 Arc::new(SuspendHandler::new(instance.suspend_state())),
498 Arc::new(InvalidateCacheHandler::new(
499 instance.cache_invalidator().clone(),
500 )),
501 ]));
502
503 let stat = {
504 let mut stat = ResourceStatImpl::default();
505 stat.start_collect_cpu_usage();
506 Arc::new(stat)
507 };
508
509 HeartbeatTask::new(options, meta_client, executor, stat)
510}
511
512#[cfg(test)]
513mod tests {
514 use std::io::Write;
515 use std::time::Duration;
516
517 use auth::{Identity, Password, UserProviderRef};
518 use common_base::readable_size::ReadableSize;
519 use common_config::ENV_VAR_SEP;
520 use common_test_util::temp_dir::create_named_temp_file;
521 use servers::grpc::GrpcOptions;
522 use servers::http::HttpOptions;
523
524 use super::*;
525 use crate::options::GlobalOptions;
526
527 #[test]
528 fn test_try_from_start_command() {
529 let command = StartCommand {
530 http_addr: Some("127.0.0.1:1234".to_string()),
531 mysql_addr: Some("127.0.0.1:5678".to_string()),
532 postgres_addr: Some("127.0.0.1:5432".to_string()),
533 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
534 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
535 influxdb_enable: Some(false),
536 disable_dashboard: Some(false),
537 ..Default::default()
538 };
539
540 let opts = command.load_options(&Default::default()).unwrap().component;
541
542 assert_eq!(opts.http.addr, "127.0.0.1:1234");
543 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
544 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
545 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
546
547 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
548 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
549 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
550
551 let default_opts = FrontendOptions::default().component;
552
553 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
554 assert!(opts.mysql.enable);
555 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
556 assert!(opts.postgres.enable);
557 assert_eq!(
558 opts.postgres.runtime_size,
559 default_opts.postgres.runtime_size
560 );
561 assert!(opts.opentsdb.enable);
562
563 assert!(!opts.influxdb.enable);
564 }
565
566 #[test]
567 fn test_read_from_config_file() {
568 let mut file = create_named_temp_file();
569 let toml_str = r#"
570 [http]
571 addr = "127.0.0.1:4000"
572 timeout = "0s"
573 body_limit = "2GB"
574
575 [opentsdb]
576 enable = false
577
578 [logging]
579 level = "debug"
580 dir = "./greptimedb_data/test/logs"
581 "#;
582 write!(file, "{}", toml_str).unwrap();
583
584 let command = StartCommand {
585 config_file: Some(file.path().to_str().unwrap().to_string()),
586 disable_dashboard: Some(false),
587 ..Default::default()
588 };
589
590 let fe_opts = command.load_options(&Default::default()).unwrap().component;
591
592 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
593 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
594
595 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
596
597 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
598 assert_eq!(
599 "./greptimedb_data/test/logs".to_string(),
600 fe_opts.logging.dir
601 );
602 assert!(!fe_opts.opentsdb.enable);
603 }
604
605 #[tokio::test]
606 async fn test_try_from_start_command_to_anymap() {
607 let fe_opts = frontend::frontend::FrontendOptions {
608 http: HttpOptions {
609 disable_dashboard: false,
610 ..Default::default()
611 },
612 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
613 ..Default::default()
614 };
615
616 let mut plugins = Plugins::new();
617 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
618 .await
619 .unwrap();
620
621 let provider = plugins.get::<UserProviderRef>().unwrap();
622 let result = provider
623 .authenticate(
624 Identity::UserId("test", None),
625 Password::PlainText("test".to_string().into()),
626 )
627 .await;
628 let _ = result.unwrap();
629 }
630
631 #[test]
632 fn test_load_log_options_from_cli() {
633 let cmd = StartCommand {
634 disable_dashboard: Some(false),
635 ..Default::default()
636 };
637
638 let options = cmd
639 .load_options(&GlobalOptions {
640 log_dir: Some("./greptimedb_data/test/logs".to_string()),
641 log_level: Some("debug".to_string()),
642
643 #[cfg(feature = "tokio-console")]
644 tokio_console_addr: None,
645 })
646 .unwrap()
647 .component;
648
649 let logging_opt = options.logging;
650 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
651 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
652 }
653
654 #[test]
655 fn test_config_precedence_order() {
656 let mut file = create_named_temp_file();
657 let toml_str = r#"
658 [http]
659 addr = "127.0.0.1:4000"
660
661 [meta_client]
662 timeout = "3s"
663 connect_timeout = "5s"
664 tcp_nodelay = true
665
666 [mysql]
667 addr = "127.0.0.1:4002"
668 "#;
669 write!(file, "{}", toml_str).unwrap();
670
671 let env_prefix = "FRONTEND_UT";
672 temp_env::with_vars(
673 [
674 (
675 [
677 env_prefix.to_string(),
678 "mysql".to_uppercase(),
679 "addr".to_uppercase(),
680 ]
681 .join(ENV_VAR_SEP),
682 Some("127.0.0.1:14002"),
683 ),
684 (
685 [
687 env_prefix.to_string(),
688 "mysql".to_uppercase(),
689 "runtime_size".to_uppercase(),
690 ]
691 .join(ENV_VAR_SEP),
692 Some("11"),
693 ),
694 (
695 [
697 env_prefix.to_string(),
698 "http".to_uppercase(),
699 "addr".to_uppercase(),
700 ]
701 .join(ENV_VAR_SEP),
702 Some("127.0.0.1:24000"),
703 ),
704 (
705 [
707 env_prefix.to_string(),
708 "meta_client".to_uppercase(),
709 "metasrv_addrs".to_uppercase(),
710 ]
711 .join(ENV_VAR_SEP),
712 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
713 ),
714 ],
715 || {
716 let command = StartCommand {
717 config_file: Some(file.path().to_str().unwrap().to_string()),
718 http_addr: Some("127.0.0.1:14000".to_string()),
719 env_prefix: env_prefix.to_string(),
720 ..Default::default()
721 };
722
723 let fe_opts = command.load_options(&Default::default()).unwrap().component;
724
725 assert_eq!(fe_opts.mysql.runtime_size, 11);
727 assert_eq!(
728 fe_opts.meta_client.unwrap().metasrv_addrs,
729 vec![
730 "127.0.0.1:3001".to_string(),
731 "127.0.0.1:3002".to_string(),
732 "127.0.0.1:3003".to_string()
733 ]
734 );
735
736 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
738
739 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
741
742 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
744 },
745 );
746 }
747}