1use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::kvbackend::{
24 CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
25 MetaKvBackend,
26};
27use catalog::process_manager::ProcessManager;
28use clap::Parser;
29use client::client_manager::NodeClients;
30use common_base::Plugins;
31use common_config::{Configurable, DEFAULT_DATA_HOME};
32use common_error::ext::BoxedError;
33use common_grpc::channel_manager::ChannelConfig;
34use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
35use common_meta::heartbeat::handler::HandlerGroupExecutor;
36use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
37use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
38use common_meta::heartbeat::handler::suspend::SuspendHandler;
39use common_query::prelude::set_default_prefix;
40use common_stat::ResourceStatImpl;
41use common_telemetry::info;
42use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
43use common_time::timezone::set_default_timezone;
44use common_version::{short_version, verbose_version};
45use frontend::frontend::Frontend;
46use frontend::heartbeat::HeartbeatTask;
47use frontend::instance::builder::FrontendBuilder;
48use frontend::server::Services;
49use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
50use plugins::frontend::context::{
51 CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
52};
53use servers::addrs;
54use servers::grpc::GrpcOptions;
55use servers::tls::{TlsMode, TlsOption, merge_tls_option};
56use snafu::{OptionExt, ResultExt};
57use tracing_appender::non_blocking::WorkerGuard;
58
59use crate::error::{self, OtherSnafu, Result};
60use crate::options::{GlobalOptions, GreptimeOptions};
61use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
62
63type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
64
65pub struct Instance {
66 frontend: Frontend,
67 _guard: Vec<WorkerGuard>,
69}
70
71pub const APP_NAME: &str = "greptime-frontend";
72
73impl Instance {
74 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
75 Self { frontend, _guard }
76 }
77
78 pub fn inner(&self) -> &Frontend {
79 &self.frontend
80 }
81
82 pub fn mut_inner(&mut self) -> &mut Frontend {
83 &mut self.frontend
84 }
85}
86
87#[async_trait]
88impl App for Instance {
89 fn name(&self) -> &str {
90 APP_NAME
91 }
92
93 async fn start(&mut self) -> Result<()> {
94 let plugins = self.frontend.instance.plugins().clone();
95 plugins::start_frontend_plugins(plugins)
96 .await
97 .context(error::StartFrontendSnafu)?;
98
99 self.frontend
100 .start()
101 .await
102 .context(error::StartFrontendSnafu)
103 }
104
105 async fn stop(&mut self) -> Result<()> {
106 self.frontend
107 .shutdown()
108 .await
109 .context(error::ShutdownFrontendSnafu)
110 }
111}
112
113#[derive(Parser)]
114pub struct Command {
115 #[clap(subcommand)]
116 pub subcmd: SubCommand,
117}
118
119impl Command {
120 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
121 self.subcmd.build(opts).await
122 }
123
124 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
125 self.subcmd.load_options(global_options)
126 }
127}
128
129#[derive(Parser)]
130pub enum SubCommand {
131 Start(StartCommand),
132}
133
134impl SubCommand {
135 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
136 match self {
137 SubCommand::Start(cmd) => cmd.build(opts).await,
138 }
139 }
140
141 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
142 match self {
143 SubCommand::Start(cmd) => cmd.load_options(global_options),
144 }
145 }
146}
147
148#[derive(Debug, Default, Parser)]
149pub struct StartCommand {
150 #[clap(long, alias = "rpc-addr")]
152 rpc_bind_addr: Option<String>,
153 #[clap(long, alias = "rpc-hostname")]
157 rpc_server_addr: Option<String>,
158 #[clap(long, alias = "internal-rpc-addr")]
160 internal_rpc_bind_addr: Option<String>,
161 #[clap(long, alias = "internal-rpc-hostname")]
165 internal_rpc_server_addr: Option<String>,
166 #[clap(long)]
167 http_addr: Option<String>,
168 #[clap(long)]
169 http_timeout: Option<u64>,
170 #[clap(long)]
171 mysql_addr: Option<String>,
172 #[clap(long)]
173 postgres_addr: Option<String>,
174 #[clap(short, long)]
175 pub config_file: Option<String>,
176 #[clap(short, long)]
177 influxdb_enable: Option<bool>,
178 #[clap(long, value_delimiter = ',', num_args = 1..)]
179 metasrv_addrs: Option<Vec<String>>,
180 #[clap(long)]
181 tls_mode: Option<TlsMode>,
182 #[clap(long)]
183 tls_cert_path: Option<String>,
184 #[clap(long)]
185 tls_key_path: Option<String>,
186 #[clap(long)]
187 tls_watch: bool,
188 #[clap(long)]
189 user_provider: Option<String>,
190 #[clap(long)]
191 disable_dashboard: Option<bool>,
192 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
193 pub env_prefix: String,
194}
195
196impl StartCommand {
197 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
198 let mut opts = FrontendOptions::load_layered_options(
199 self.config_file.as_deref(),
200 self.env_prefix.as_ref(),
201 )
202 .context(error::LoadLayeredConfigSnafu)?;
203
204 self.merge_with_cli_options(global_options, &mut opts)?;
205
206 Ok(opts)
207 }
208
209 fn merge_with_cli_options(
211 &self,
212 global_options: &GlobalOptions,
213 opts: &mut FrontendOptions,
214 ) -> Result<()> {
215 let opts = &mut opts.component;
216
217 if let Some(dir) = &global_options.log_dir {
218 opts.logging.dir.clone_from(dir);
219 }
220
221 if opts.logging.dir.is_empty() {
223 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
224 .join(DEFAULT_LOGGING_DIR)
225 .to_string_lossy()
226 .to_string();
227 }
228
229 if global_options.log_level.is_some() {
230 opts.logging.level.clone_from(&global_options.log_level);
231 }
232
233 opts.tracing = TracingOptions {
234 #[cfg(feature = "tokio-console")]
235 tokio_console_addr: global_options.tokio_console_addr.clone(),
236 };
237
238 let tls_opts = TlsOption::new(
239 self.tls_mode.clone(),
240 self.tls_cert_path.clone(),
241 self.tls_key_path.clone(),
242 self.tls_watch,
243 );
244
245 if let Some(addr) = &self.http_addr {
246 opts.http.addr.clone_from(addr);
247 }
248
249 if let Some(http_timeout) = self.http_timeout {
250 opts.http.timeout = Duration::from_secs(http_timeout)
251 }
252
253 if let Some(disable_dashboard) = self.disable_dashboard {
254 opts.http.disable_dashboard = disable_dashboard;
255 }
256
257 if let Some(addr) = &self.rpc_bind_addr {
258 opts.grpc.bind_addr.clone_from(addr);
259 opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
260 }
261
262 if let Some(addr) = &self.rpc_server_addr {
263 opts.grpc.server_addr.clone_from(addr);
264 }
265
266 if let Some(addr) = &self.internal_rpc_bind_addr {
267 if let Some(internal_grpc) = &mut opts.internal_grpc {
268 internal_grpc.bind_addr = addr.clone();
269 } else {
270 let grpc_options = GrpcOptions {
271 bind_addr: addr.clone(),
272 ..Default::default()
273 };
274
275 opts.internal_grpc = Some(grpc_options);
276 }
277 }
278
279 if let Some(addr) = &self.internal_rpc_server_addr {
280 if let Some(internal_grpc) = &mut opts.internal_grpc {
281 internal_grpc.server_addr = addr.clone();
282 } else {
283 let grpc_options = GrpcOptions {
284 server_addr: addr.clone(),
285 ..Default::default()
286 };
287 opts.internal_grpc = Some(grpc_options);
288 }
289 }
290
291 if let Some(addr) = &self.mysql_addr {
292 opts.mysql.enable = true;
293 opts.mysql.addr.clone_from(addr);
294 opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
295 }
296
297 if let Some(addr) = &self.postgres_addr {
298 opts.postgres.enable = true;
299 opts.postgres.addr.clone_from(addr);
300 opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
301 }
302
303 if let Some(enable) = self.influxdb_enable {
304 opts.influxdb.enable = enable;
305 }
306
307 if let Some(metasrv_addrs) = &self.metasrv_addrs {
308 opts.meta_client
309 .get_or_insert_with(MetaClientOptions::default)
310 .metasrv_addrs
311 .clone_from(metasrv_addrs);
312 }
313
314 if let Some(user_provider) = &self.user_provider {
315 opts.user_provider = Some(user_provider.clone());
316 }
317
318 Ok(())
319 }
320
321 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
322 common_runtime::init_global_runtimes(&opts.runtime);
323
324 let guard = common_telemetry::init_global_logging(
325 APP_NAME,
326 &opts.component.logging,
327 &opts.component.tracing,
328 opts.component.node_id.clone(),
329 Some(&opts.component.slow_query),
330 );
331
332 log_versions(verbose_version(), short_version(), APP_NAME);
333 maybe_activate_heap_profile(&opts.component.memory);
334 create_resource_limit_metrics(APP_NAME);
335
336 info!("Frontend start command: {:#?}", self);
337 info!("Frontend options: {:#?}", opts);
338
339 let plugin_opts = opts.plugins;
340 let mut opts = opts.component;
341 opts.grpc.detect_server_addr();
342 let mut plugins = Plugins::new();
343 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
344 .await
345 .context(error::StartFrontendSnafu)?;
346
347 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
348 set_default_prefix(opts.default_column_prefix.as_deref())
349 .map_err(BoxedError::new)
350 .context(error::BuildCliSnafu)?;
351
352 let meta_client_options = opts
353 .meta_client
354 .as_ref()
355 .context(error::MissingConfigSnafu {
356 msg: "'meta_client'",
357 })?;
358
359 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
360 let cache_ttl = meta_client_options.metadata_cache_ttl;
361 let cache_tti = meta_client_options.metadata_cache_tti;
362
363 let meta_client = meta_client::create_meta_client(
364 MetaClientType::Frontend,
365 meta_client_options,
366 Some(&plugins),
367 None,
368 )
369 .await
370 .context(error::MetaClientInitSnafu)?;
371
372 let cached_meta_backend =
374 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
375 .cache_max_capacity(cache_max_capacity)
376 .cache_ttl(cache_ttl)
377 .cache_tti(cache_tti)
378 .build();
379 let cached_meta_backend = Arc::new(cached_meta_backend);
380
381 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
383 CacheRegistryBuilder::default()
384 .add_cache(cached_meta_backend.clone())
385 .build(),
386 );
387 let fundamental_cache_registry =
388 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
389 let layered_cache_registry = Arc::new(
390 with_default_composite_cache_registry(
391 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
392 )
393 .context(error::BuildCacheRegistrySnafu)?
394 .build(),
395 );
396
397 let mut channel_config = ChannelConfig {
400 timeout: None,
401 tcp_nodelay: opts.datanode.client.tcp_nodelay,
402 connect_timeout: Some(opts.datanode.client.connect_timeout),
403 ..Default::default()
404 };
405 if opts.grpc.flight_compression.transport_compression() {
406 channel_config.accept_compression = true;
407 channel_config.send_compression = true;
408 }
409 let client = Arc::new(NodeClients::new(channel_config));
410
411 let information_extension = Arc::new(DistributedInformationExtension::new(
412 meta_client.clone(),
413 client.clone(),
414 ));
415
416 let process_manager = Arc::new(ProcessManager::new(
417 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
418 Some(meta_client.clone()),
419 ));
420
421 let builder = KvBackendCatalogManagerBuilder::new(
422 information_extension,
423 cached_meta_backend.clone(),
424 layered_cache_registry.clone(),
425 )
426 .with_process_manager(process_manager.clone());
427 let builder = if let Some(configurator) =
428 plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
429 {
430 let ctx = DistributedCatalogManagerConfigureContext {
431 meta_client: meta_client.clone(),
432 };
433 let ctx = CatalogManagerConfigureContext::Distributed(ctx);
434
435 configurator
436 .configure(builder, ctx)
437 .await
438 .context(OtherSnafu)?
439 } else {
440 builder
441 };
442 let catalog_manager = builder.build();
443
444 let instance = FrontendBuilder::new(
445 opts.clone(),
446 cached_meta_backend.clone(),
447 layered_cache_registry.clone(),
448 catalog_manager,
449 client,
450 meta_client.clone(),
451 process_manager,
452 )
453 .with_plugin(plugins.clone())
454 .with_local_cache_invalidator(layered_cache_registry)
455 .try_build()
456 .await
457 .context(error::StartFrontendSnafu)?;
458
459 let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
460
461 let instance = Arc::new(instance);
462
463 let servers = Services::new(opts, instance.clone(), plugins)
464 .build()
465 .context(error::StartFrontendSnafu)?;
466
467 let frontend = Frontend {
468 instance,
469 servers,
470 heartbeat_task,
471 };
472
473 Ok(Instance::new(frontend, guard))
474 }
475}
476
477pub fn create_heartbeat_task(
478 options: &frontend::frontend::FrontendOptions,
479 meta_client: MetaClientRef,
480 instance: &frontend::instance::Instance,
481) -> HeartbeatTask {
482 let executor = Arc::new(HandlerGroupExecutor::new(vec![
483 Arc::new(ParseMailboxMessageHandler),
484 Arc::new(SuspendHandler::new(instance.suspend_state())),
485 Arc::new(InvalidateCacheHandler::new(
486 instance.cache_invalidator().clone(),
487 )),
488 ]));
489
490 let stat = {
491 let mut stat = ResourceStatImpl::default();
492 stat.start_collect_cpu_usage();
493 Arc::new(stat)
494 };
495
496 HeartbeatTask::new(options, meta_client, executor, stat)
497}
498
499#[cfg(test)]
500mod tests {
501 use std::io::Write;
502 use std::time::Duration;
503
504 use auth::{Identity, Password, UserProviderRef};
505 use common_base::readable_size::ReadableSize;
506 use common_config::ENV_VAR_SEP;
507 use common_test_util::temp_dir::create_named_temp_file;
508 use servers::grpc::GrpcOptions;
509 use servers::http::HttpOptions;
510
511 use super::*;
512 use crate::options::GlobalOptions;
513
514 #[test]
515 fn test_try_from_start_command() {
516 let command = StartCommand {
517 http_addr: Some("127.0.0.1:1234".to_string()),
518 mysql_addr: Some("127.0.0.1:5678".to_string()),
519 postgres_addr: Some("127.0.0.1:5432".to_string()),
520 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
521 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
522 influxdb_enable: Some(false),
523 disable_dashboard: Some(false),
524 ..Default::default()
525 };
526
527 let opts = command.load_options(&Default::default()).unwrap().component;
528
529 assert_eq!(opts.http.addr, "127.0.0.1:1234");
530 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
531 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
532 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
533
534 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
535 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
536 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
537
538 let default_opts = FrontendOptions::default().component;
539
540 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
541 assert!(opts.mysql.enable);
542 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
543 assert!(opts.postgres.enable);
544 assert_eq!(
545 opts.postgres.runtime_size,
546 default_opts.postgres.runtime_size
547 );
548 assert!(opts.opentsdb.enable);
549
550 assert!(!opts.influxdb.enable);
551 }
552
553 #[test]
554 fn test_read_from_config_file() {
555 let mut file = create_named_temp_file();
556 let toml_str = r#"
557 [http]
558 addr = "127.0.0.1:4000"
559 timeout = "0s"
560 body_limit = "2GB"
561
562 [opentsdb]
563 enable = false
564
565 [logging]
566 level = "debug"
567 dir = "./greptimedb_data/test/logs"
568 "#;
569 write!(file, "{}", toml_str).unwrap();
570
571 let command = StartCommand {
572 config_file: Some(file.path().to_str().unwrap().to_string()),
573 disable_dashboard: Some(false),
574 ..Default::default()
575 };
576
577 let fe_opts = command.load_options(&Default::default()).unwrap().component;
578
579 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
580 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
581
582 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
583
584 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
585 assert_eq!(
586 "./greptimedb_data/test/logs".to_string(),
587 fe_opts.logging.dir
588 );
589 assert!(!fe_opts.opentsdb.enable);
590 }
591
592 #[tokio::test]
593 async fn test_try_from_start_command_to_anymap() {
594 let fe_opts = frontend::frontend::FrontendOptions {
595 http: HttpOptions {
596 disable_dashboard: false,
597 ..Default::default()
598 },
599 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
600 ..Default::default()
601 };
602
603 let mut plugins = Plugins::new();
604 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
605 .await
606 .unwrap();
607
608 let provider = plugins.get::<UserProviderRef>().unwrap();
609 let result = provider
610 .authenticate(
611 Identity::UserId("test", None),
612 Password::PlainText("test".to_string().into()),
613 )
614 .await;
615 let _ = result.unwrap();
616 }
617
618 #[test]
619 fn test_load_log_options_from_cli() {
620 let cmd = StartCommand {
621 disable_dashboard: Some(false),
622 ..Default::default()
623 };
624
625 let options = cmd
626 .load_options(&GlobalOptions {
627 log_dir: Some("./greptimedb_data/test/logs".to_string()),
628 log_level: Some("debug".to_string()),
629
630 #[cfg(feature = "tokio-console")]
631 tokio_console_addr: None,
632 })
633 .unwrap()
634 .component;
635
636 let logging_opt = options.logging;
637 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
638 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
639 }
640
641 #[test]
642 fn test_config_precedence_order() {
643 let mut file = create_named_temp_file();
644 let toml_str = r#"
645 [http]
646 addr = "127.0.0.1:4000"
647
648 [meta_client]
649 timeout = "3s"
650 connect_timeout = "5s"
651 tcp_nodelay = true
652
653 [mysql]
654 addr = "127.0.0.1:4002"
655 "#;
656 write!(file, "{}", toml_str).unwrap();
657
658 let env_prefix = "FRONTEND_UT";
659 temp_env::with_vars(
660 [
661 (
662 [
664 env_prefix.to_string(),
665 "mysql".to_uppercase(),
666 "addr".to_uppercase(),
667 ]
668 .join(ENV_VAR_SEP),
669 Some("127.0.0.1:14002"),
670 ),
671 (
672 [
674 env_prefix.to_string(),
675 "mysql".to_uppercase(),
676 "runtime_size".to_uppercase(),
677 ]
678 .join(ENV_VAR_SEP),
679 Some("11"),
680 ),
681 (
682 [
684 env_prefix.to_string(),
685 "http".to_uppercase(),
686 "addr".to_uppercase(),
687 ]
688 .join(ENV_VAR_SEP),
689 Some("127.0.0.1:24000"),
690 ),
691 (
692 [
694 env_prefix.to_string(),
695 "meta_client".to_uppercase(),
696 "metasrv_addrs".to_uppercase(),
697 ]
698 .join(ENV_VAR_SEP),
699 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
700 ),
701 ],
702 || {
703 let command = StartCommand {
704 config_file: Some(file.path().to_str().unwrap().to_string()),
705 http_addr: Some("127.0.0.1:14000".to_string()),
706 env_prefix: env_prefix.to_string(),
707 ..Default::default()
708 };
709
710 let fe_opts = command.load_options(&Default::default()).unwrap().component;
711
712 assert_eq!(fe_opts.mysql.runtime_size, 11);
714 assert_eq!(
715 fe_opts.meta_client.unwrap().metasrv_addrs,
716 vec![
717 "127.0.0.1:3001".to_string(),
718 "127.0.0.1:3002".to_string(),
719 "127.0.0.1:3003".to_string()
720 ]
721 );
722
723 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
725
726 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
728
729 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
731 },
732 );
733 }
734}