1use std::fmt::Debug;
16use std::path::Path;
17use std::sync::Arc;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
22use catalog::information_extension::DistributedInformationExtension;
23use catalog::information_schema::InformationExtensionRef;
24use catalog::kvbackend::{
25 CachedKvBackendBuilder, CatalogManagerConfiguratorRef, KvBackendCatalogManagerBuilder,
26 MetaKvBackend,
27};
28use catalog::process_manager::ProcessManager;
29use clap::Parser;
30use client::client_manager::NodeClients;
31use common_base::Plugins;
32use common_config::{Configurable, DEFAULT_DATA_HOME};
33use common_error::ext::BoxedError;
34use common_grpc::channel_manager::ChannelConfig;
35use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
36use common_meta::heartbeat::handler::HandlerGroupExecutor;
37use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
38use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
39use common_meta::heartbeat::handler::suspend::SuspendHandler;
40use common_query::prelude::set_default_prefix;
41use common_stat::ResourceStatImpl;
42use common_telemetry::info;
43use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
44use common_time::timezone::set_default_timezone;
45use common_version::{short_version, verbose_version};
46use frontend::frontend::Frontend;
47use frontend::heartbeat::HeartbeatTask;
48use frontend::instance::builder::FrontendBuilder;
49use frontend::server::Services;
50use meta_client::{MetaClientOptions, MetaClientRef, MetaClientType};
51use plugins::frontend::context::{
52 CatalogManagerConfigureContext, DistributedCatalogManagerConfigureContext,
53};
54use servers::addrs;
55use servers::grpc::GrpcOptions;
56use servers::tls::{TlsMode, TlsOption, merge_tls_option};
57use snafu::{OptionExt, ResultExt};
58use tracing_appender::non_blocking::WorkerGuard;
59
60use crate::error::{self, OtherSnafu, Result};
61use crate::options::{GlobalOptions, GreptimeOptions};
62use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
63
64type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
65
66pub struct Instance {
67 frontend: Frontend,
68 _guard: Vec<WorkerGuard>,
70}
71
72pub const APP_NAME: &str = "greptime-frontend";
73
74impl Instance {
75 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
76 Self { frontend, _guard }
77 }
78
79 pub fn inner(&self) -> &Frontend {
80 &self.frontend
81 }
82
83 pub fn mut_inner(&mut self) -> &mut Frontend {
84 &mut self.frontend
85 }
86}
87
88#[async_trait]
89impl App for Instance {
90 fn name(&self) -> &str {
91 APP_NAME
92 }
93
94 async fn start(&mut self) -> Result<()> {
95 let plugins = self.frontend.instance.plugins().clone();
96 plugins::start_frontend_plugins(plugins)
97 .await
98 .context(error::StartFrontendSnafu)?;
99
100 self.frontend
101 .start()
102 .await
103 .context(error::StartFrontendSnafu)
104 }
105
106 async fn stop(&mut self) -> Result<()> {
107 self.frontend
108 .shutdown()
109 .await
110 .context(error::ShutdownFrontendSnafu)
111 }
112}
113
114#[derive(Parser)]
115pub struct Command {
116 #[clap(subcommand)]
117 pub subcmd: SubCommand,
118}
119
120impl Command {
121 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
122 self.subcmd.build(opts).await
123 }
124
125 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
126 self.subcmd.load_options(global_options)
127 }
128}
129
130#[derive(Parser)]
131pub enum SubCommand {
132 Start(StartCommand),
133}
134
135impl SubCommand {
136 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
137 match self {
138 SubCommand::Start(cmd) => cmd.build(opts).await,
139 }
140 }
141
142 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
143 match self {
144 SubCommand::Start(cmd) => cmd.load_options(global_options),
145 }
146 }
147}
148
149#[derive(Debug, Default, Parser)]
150pub struct StartCommand {
151 #[clap(long, alias = "rpc-addr")]
153 rpc_bind_addr: Option<String>,
154 #[clap(long, alias = "rpc-hostname")]
158 rpc_server_addr: Option<String>,
159 #[clap(long, alias = "internal-rpc-addr")]
161 internal_rpc_bind_addr: Option<String>,
162 #[clap(long, alias = "internal-rpc-hostname")]
166 internal_rpc_server_addr: Option<String>,
167 #[clap(long)]
168 http_addr: Option<String>,
169 #[clap(long)]
170 http_timeout: Option<u64>,
171 #[clap(long)]
172 mysql_addr: Option<String>,
173 #[clap(long)]
174 postgres_addr: Option<String>,
175 #[clap(short, long)]
176 pub config_file: Option<String>,
177 #[clap(short, long)]
178 influxdb_enable: Option<bool>,
179 #[clap(long, value_delimiter = ',', num_args = 1..)]
180 metasrv_addrs: Option<Vec<String>>,
181 #[clap(long)]
182 tls_mode: Option<TlsMode>,
183 #[clap(long)]
184 tls_cert_path: Option<String>,
185 #[clap(long)]
186 tls_key_path: Option<String>,
187 #[clap(long)]
188 tls_watch: bool,
189 #[clap(long)]
190 user_provider: Option<String>,
191 #[clap(long)]
192 disable_dashboard: Option<bool>,
193 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
194 pub env_prefix: String,
195}
196
197impl StartCommand {
198 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
199 let mut opts = FrontendOptions::load_layered_options(
200 self.config_file.as_deref(),
201 self.env_prefix.as_ref(),
202 )
203 .context(error::LoadLayeredConfigSnafu)?;
204
205 self.merge_with_cli_options(global_options, &mut opts)?;
206
207 Ok(opts)
208 }
209
210 fn merge_with_cli_options(
212 &self,
213 global_options: &GlobalOptions,
214 opts: &mut FrontendOptions,
215 ) -> Result<()> {
216 let opts = &mut opts.component;
217
218 if let Some(dir) = &global_options.log_dir {
219 opts.logging.dir.clone_from(dir);
220 }
221
222 if opts.logging.dir.is_empty() {
224 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
225 .join(DEFAULT_LOGGING_DIR)
226 .to_string_lossy()
227 .to_string();
228 }
229
230 if global_options.log_level.is_some() {
231 opts.logging.level.clone_from(&global_options.log_level);
232 }
233
234 opts.tracing = TracingOptions {
235 #[cfg(feature = "tokio-console")]
236 tokio_console_addr: global_options.tokio_console_addr.clone(),
237 };
238
239 let tls_opts = TlsOption::new(
240 self.tls_mode,
241 self.tls_cert_path.clone(),
242 self.tls_key_path.clone(),
243 self.tls_watch,
244 );
245
246 if let Some(addr) = &self.http_addr {
247 opts.http.addr.clone_from(addr);
248 }
249
250 if let Some(http_timeout) = self.http_timeout {
251 opts.http.timeout = Duration::from_secs(http_timeout)
252 }
253
254 if let Some(disable_dashboard) = self.disable_dashboard {
255 opts.http.disable_dashboard = disable_dashboard;
256 }
257
258 if let Some(addr) = &self.rpc_bind_addr {
259 opts.grpc.bind_addr.clone_from(addr);
260 opts.grpc.tls = merge_tls_option(&opts.grpc.tls, tls_opts.clone());
261 }
262
263 if let Some(addr) = &self.rpc_server_addr {
264 opts.grpc.server_addr.clone_from(addr);
265 }
266
267 if let Some(addr) = &self.internal_rpc_bind_addr {
268 if let Some(internal_grpc) = &mut opts.internal_grpc {
269 internal_grpc.bind_addr = addr.clone();
270 } else {
271 let grpc_options = GrpcOptions {
272 bind_addr: addr.clone(),
273 ..Default::default()
274 };
275
276 opts.internal_grpc = Some(grpc_options);
277 }
278 }
279
280 if let Some(addr) = &self.internal_rpc_server_addr {
281 if let Some(internal_grpc) = &mut opts.internal_grpc {
282 internal_grpc.server_addr = addr.clone();
283 } else {
284 let grpc_options = GrpcOptions {
285 server_addr: addr.clone(),
286 ..Default::default()
287 };
288 opts.internal_grpc = Some(grpc_options);
289 }
290 }
291
292 if let Some(addr) = &self.mysql_addr {
293 opts.mysql.enable = true;
294 opts.mysql.addr.clone_from(addr);
295 opts.mysql.tls = merge_tls_option(&opts.mysql.tls, tls_opts.clone());
296 }
297
298 if let Some(addr) = &self.postgres_addr {
299 opts.postgres.enable = true;
300 opts.postgres.addr.clone_from(addr);
301 opts.postgres.tls = merge_tls_option(&opts.postgres.tls, tls_opts.clone());
302 }
303
304 if let Some(enable) = self.influxdb_enable {
305 opts.influxdb.enable = enable;
306 }
307
308 if let Some(metasrv_addrs) = &self.metasrv_addrs {
309 opts.meta_client
310 .get_or_insert_with(MetaClientOptions::default)
311 .metasrv_addrs
312 .clone_from(metasrv_addrs);
313 }
314
315 if let Some(user_provider) = &self.user_provider {
316 opts.user_provider = Some(user_provider.clone());
317 }
318
319 Ok(())
320 }
321
322 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
323 common_runtime::init_global_runtimes(&opts.runtime);
324
325 let guard = common_telemetry::init_global_logging(
326 APP_NAME,
327 &opts.component.logging,
328 &opts.component.tracing,
329 opts.component.node_id.clone(),
330 Some(&opts.component.slow_query),
331 );
332
333 log_versions(verbose_version(), short_version(), APP_NAME);
334 maybe_activate_heap_profile(&opts.component.memory);
335 create_resource_limit_metrics(APP_NAME);
336
337 info!("Frontend start command: {:#?}", self);
338 info!("Frontend options: {:#?}", opts);
339
340 let plugin_opts = opts.plugins;
341 let mut opts = opts.component;
342 opts.grpc.detect_server_addr();
343 let mut plugins = Plugins::new();
344 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
345 .await
346 .context(error::StartFrontendSnafu)?;
347
348 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
349 set_default_prefix(opts.default_column_prefix.as_deref())
350 .map_err(BoxedError::new)
351 .context(error::BuildCliSnafu)?;
352
353 let meta_client_options = opts
354 .meta_client
355 .as_ref()
356 .context(error::MissingConfigSnafu {
357 msg: "'meta_client'",
358 })?;
359
360 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
361 let cache_ttl = meta_client_options.metadata_cache_ttl;
362 let cache_tti = meta_client_options.metadata_cache_tti;
363
364 let meta_client = meta_client::create_meta_client(
365 MetaClientType::Frontend,
366 meta_client_options,
367 Some(&plugins),
368 None,
369 )
370 .await
371 .context(error::MetaClientInitSnafu)?;
372
373 let cached_meta_backend =
375 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
376 .cache_max_capacity(cache_max_capacity)
377 .cache_ttl(cache_ttl)
378 .cache_tti(cache_tti)
379 .build();
380 let cached_meta_backend = Arc::new(cached_meta_backend);
381
382 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
384 CacheRegistryBuilder::default()
385 .add_cache(cached_meta_backend.clone())
386 .build(),
387 );
388 let fundamental_cache_registry =
389 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
390 let layered_cache_registry = Arc::new(
391 with_default_composite_cache_registry(
392 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
393 )
394 .context(error::BuildCacheRegistrySnafu)?
395 .build(),
396 );
397
398 let mut channel_config = ChannelConfig {
401 timeout: None,
402 tcp_nodelay: opts.datanode.client.tcp_nodelay,
403 connect_timeout: Some(opts.datanode.client.connect_timeout),
404 ..Default::default()
405 };
406 if opts.grpc.flight_compression.transport_compression() {
407 channel_config.accept_compression = true;
408 channel_config.send_compression = true;
409 }
410 let client = Arc::new(NodeClients::new(channel_config));
411
412 let information_extension = Arc::new(DistributedInformationExtension::new(
413 meta_client.clone(),
414 client.clone(),
415 ));
416 plugins.insert::<InformationExtensionRef>(information_extension.clone());
417
418 let process_manager = Arc::new(ProcessManager::new(
419 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
420 Some(meta_client.clone()),
421 ));
422
423 let builder = KvBackendCatalogManagerBuilder::new(
424 information_extension,
425 cached_meta_backend.clone(),
426 layered_cache_registry.clone(),
427 )
428 .with_process_manager(process_manager.clone());
429 let builder = if let Some(configurator) =
430 plugins.get::<CatalogManagerConfiguratorRef<CatalogManagerConfigureContext>>()
431 {
432 let ctx = DistributedCatalogManagerConfigureContext {
433 meta_client: meta_client.clone(),
434 };
435 let ctx = CatalogManagerConfigureContext::Distributed(ctx);
436
437 configurator
438 .configure(builder, ctx)
439 .await
440 .context(OtherSnafu)?
441 } else {
442 builder
443 };
444 let catalog_manager = builder.build();
445
446 let instance = FrontendBuilder::new(
447 opts.clone(),
448 cached_meta_backend.clone(),
449 layered_cache_registry.clone(),
450 catalog_manager,
451 client,
452 meta_client.clone(),
453 process_manager,
454 )
455 .with_plugin(plugins.clone())
456 .with_local_cache_invalidator(layered_cache_registry)
457 .try_build()
458 .await
459 .context(error::StartFrontendSnafu)?;
460
461 let heartbeat_task = Some(create_heartbeat_task(&opts, meta_client, &instance));
462
463 let instance = Arc::new(instance);
464
465 let servers = Services::new(opts, instance.clone(), plugins)
466 .build()
467 .context(error::StartFrontendSnafu)?;
468
469 let frontend = Frontend {
470 instance,
471 servers,
472 heartbeat_task,
473 };
474
475 Ok(Instance::new(frontend, guard))
476 }
477}
478
479pub fn create_heartbeat_task(
480 options: &frontend::frontend::FrontendOptions,
481 meta_client: MetaClientRef,
482 instance: &frontend::instance::Instance,
483) -> HeartbeatTask {
484 let executor = Arc::new(HandlerGroupExecutor::new(vec![
485 Arc::new(ParseMailboxMessageHandler),
486 Arc::new(SuspendHandler::new(instance.suspend_state())),
487 Arc::new(InvalidateCacheHandler::new(
488 instance.cache_invalidator().clone(),
489 )),
490 ]));
491
492 let stat = {
493 let mut stat = ResourceStatImpl::default();
494 stat.start_collect_cpu_usage();
495 Arc::new(stat)
496 };
497
498 HeartbeatTask::new(options, meta_client, executor, stat)
499}
500
501#[cfg(test)]
502mod tests {
503 use std::io::Write;
504 use std::time::Duration;
505
506 use auth::{Identity, Password, UserProviderRef};
507 use common_base::readable_size::ReadableSize;
508 use common_config::ENV_VAR_SEP;
509 use common_test_util::temp_dir::create_named_temp_file;
510 use servers::grpc::GrpcOptions;
511 use servers::http::HttpOptions;
512
513 use super::*;
514 use crate::options::GlobalOptions;
515
516 #[test]
517 fn test_try_from_start_command() {
518 let command = StartCommand {
519 http_addr: Some("127.0.0.1:1234".to_string()),
520 mysql_addr: Some("127.0.0.1:5678".to_string()),
521 postgres_addr: Some("127.0.0.1:5432".to_string()),
522 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
523 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
524 influxdb_enable: Some(false),
525 disable_dashboard: Some(false),
526 ..Default::default()
527 };
528
529 let opts = command.load_options(&Default::default()).unwrap().component;
530
531 assert_eq!(opts.http.addr, "127.0.0.1:1234");
532 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
533 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
534 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
535
536 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
537 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
538 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
539
540 let default_opts = FrontendOptions::default().component;
541
542 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
543 assert!(opts.mysql.enable);
544 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
545 assert!(opts.postgres.enable);
546 assert_eq!(
547 opts.postgres.runtime_size,
548 default_opts.postgres.runtime_size
549 );
550 assert!(opts.opentsdb.enable);
551
552 assert!(!opts.influxdb.enable);
553 }
554
555 #[test]
556 fn test_read_from_config_file() {
557 let mut file = create_named_temp_file();
558 let toml_str = r#"
559 [http]
560 addr = "127.0.0.1:4000"
561 timeout = "0s"
562 body_limit = "2GB"
563
564 [opentsdb]
565 enable = false
566
567 [logging]
568 level = "debug"
569 dir = "./greptimedb_data/test/logs"
570 "#;
571 write!(file, "{}", toml_str).unwrap();
572
573 let command = StartCommand {
574 config_file: Some(file.path().to_str().unwrap().to_string()),
575 disable_dashboard: Some(false),
576 ..Default::default()
577 };
578
579 let fe_opts = command.load_options(&Default::default()).unwrap().component;
580
581 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
582 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
583
584 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
585
586 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
587 assert_eq!(
588 "./greptimedb_data/test/logs".to_string(),
589 fe_opts.logging.dir
590 );
591 assert!(!fe_opts.opentsdb.enable);
592 }
593
594 #[tokio::test]
595 async fn test_try_from_start_command_to_anymap() {
596 let fe_opts = frontend::frontend::FrontendOptions {
597 http: HttpOptions {
598 disable_dashboard: false,
599 ..Default::default()
600 },
601 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
602 ..Default::default()
603 };
604
605 let mut plugins = Plugins::new();
606 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
607 .await
608 .unwrap();
609
610 let provider = plugins.get::<UserProviderRef>().unwrap();
611 let result = provider
612 .authenticate(
613 Identity::UserId("test", None),
614 Password::PlainText("test".to_string().into()),
615 )
616 .await;
617 let _ = result.unwrap();
618 }
619
620 #[test]
621 fn test_load_log_options_from_cli() {
622 let cmd = StartCommand {
623 disable_dashboard: Some(false),
624 ..Default::default()
625 };
626
627 let options = cmd
628 .load_options(&GlobalOptions {
629 log_dir: Some("./greptimedb_data/test/logs".to_string()),
630 log_level: Some("debug".to_string()),
631
632 #[cfg(feature = "tokio-console")]
633 tokio_console_addr: None,
634 })
635 .unwrap()
636 .component;
637
638 let logging_opt = options.logging;
639 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
640 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
641 }
642
643 #[test]
644 fn test_config_precedence_order() {
645 let mut file = create_named_temp_file();
646 let toml_str = r#"
647 [http]
648 addr = "127.0.0.1:4000"
649
650 [meta_client]
651 timeout = "3s"
652 connect_timeout = "5s"
653 tcp_nodelay = true
654
655 [mysql]
656 addr = "127.0.0.1:4002"
657 "#;
658 write!(file, "{}", toml_str).unwrap();
659
660 let env_prefix = "FRONTEND_UT";
661 temp_env::with_vars(
662 [
663 (
664 [
666 env_prefix.to_string(),
667 "mysql".to_uppercase(),
668 "addr".to_uppercase(),
669 ]
670 .join(ENV_VAR_SEP),
671 Some("127.0.0.1:14002"),
672 ),
673 (
674 [
676 env_prefix.to_string(),
677 "mysql".to_uppercase(),
678 "runtime_size".to_uppercase(),
679 ]
680 .join(ENV_VAR_SEP),
681 Some("11"),
682 ),
683 (
684 [
686 env_prefix.to_string(),
687 "http".to_uppercase(),
688 "addr".to_uppercase(),
689 ]
690 .join(ENV_VAR_SEP),
691 Some("127.0.0.1:24000"),
692 ),
693 (
694 [
696 env_prefix.to_string(),
697 "meta_client".to_uppercase(),
698 "metasrv_addrs".to_uppercase(),
699 ]
700 .join(ENV_VAR_SEP),
701 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
702 ),
703 ],
704 || {
705 let command = StartCommand {
706 config_file: Some(file.path().to_str().unwrap().to_string()),
707 http_addr: Some("127.0.0.1:14000".to_string()),
708 env_prefix: env_prefix.to_string(),
709 ..Default::default()
710 };
711
712 let fe_opts = command.load_options(&Default::default()).unwrap().component;
713
714 assert_eq!(fe_opts.mysql.runtime_size, 11);
716 assert_eq!(
717 fe_opts.meta_client.unwrap().metasrv_addrs,
718 vec![
719 "127.0.0.1:3001".to_string(),
720 "127.0.0.1:3002".to_string(),
721 "127.0.0.1:3003".to_string()
722 ]
723 );
724
725 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
727
728 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
730
731 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
733 },
734 );
735 }
736}