1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
32use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
33use common_stat::ResourceStatImpl;
34use common_telemetry::info;
35use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
36use common_time::timezone::set_default_timezone;
37use common_version::{short_version, verbose_version};
38use frontend::frontend::Frontend;
39use frontend::heartbeat::HeartbeatTask;
40use frontend::instance::builder::FrontendBuilder;
41use frontend::server::Services;
42use meta_client::{MetaClientOptions, MetaClientType};
43use servers::addrs;
44use servers::export_metrics::ExportMetricsTask;
45use servers::grpc::GrpcOptions;
46use servers::tls::{TlsMode, TlsOption};
47use snafu::{OptionExt, ResultExt};
48use tracing_appender::non_blocking::WorkerGuard;
49
50use crate::error::{self, Result};
51use crate::options::{GlobalOptions, GreptimeOptions};
52use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
53
54type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
55
56pub struct Instance {
57 frontend: Frontend,
58 _guard: Vec<WorkerGuard>,
60}
61
62pub const APP_NAME: &str = "greptime-frontend";
63
64impl Instance {
65 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
66 Self { frontend, _guard }
67 }
68
69 pub fn inner(&self) -> &Frontend {
70 &self.frontend
71 }
72
73 pub fn mut_inner(&mut self) -> &mut Frontend {
74 &mut self.frontend
75 }
76}
77
78#[async_trait]
79impl App for Instance {
80 fn name(&self) -> &str {
81 APP_NAME
82 }
83
84 async fn start(&mut self) -> Result<()> {
85 let plugins = self.frontend.instance.plugins().clone();
86 plugins::start_frontend_plugins(plugins)
87 .await
88 .context(error::StartFrontendSnafu)?;
89
90 self.frontend
91 .start()
92 .await
93 .context(error::StartFrontendSnafu)
94 }
95
96 async fn stop(&mut self) -> Result<()> {
97 self.frontend
98 .shutdown()
99 .await
100 .context(error::ShutdownFrontendSnafu)
101 }
102}
103
104#[derive(Parser)]
105pub struct Command {
106 #[clap(subcommand)]
107 pub subcmd: SubCommand,
108}
109
110impl Command {
111 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
112 self.subcmd.build(opts).await
113 }
114
115 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
116 self.subcmd.load_options(global_options)
117 }
118}
119
120#[derive(Parser)]
121pub enum SubCommand {
122 Start(StartCommand),
123}
124
125impl SubCommand {
126 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
127 match self {
128 SubCommand::Start(cmd) => cmd.build(opts).await,
129 }
130 }
131
132 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
133 match self {
134 SubCommand::Start(cmd) => cmd.load_options(global_options),
135 }
136 }
137}
138
139#[derive(Debug, Default, Parser)]
140pub struct StartCommand {
141 #[clap(long, alias = "rpc-addr")]
143 rpc_bind_addr: Option<String>,
144 #[clap(long, alias = "rpc-hostname")]
148 rpc_server_addr: Option<String>,
149 #[clap(long, alias = "internal-rpc-addr")]
151 internal_rpc_bind_addr: Option<String>,
152 #[clap(long, alias = "internal-rpc-hostname")]
156 internal_rpc_server_addr: Option<String>,
157 #[clap(long)]
158 http_addr: Option<String>,
159 #[clap(long)]
160 http_timeout: Option<u64>,
161 #[clap(long)]
162 mysql_addr: Option<String>,
163 #[clap(long)]
164 postgres_addr: Option<String>,
165 #[clap(short, long)]
166 pub config_file: Option<String>,
167 #[clap(short, long)]
168 influxdb_enable: Option<bool>,
169 #[clap(long, value_delimiter = ',', num_args = 1..)]
170 metasrv_addrs: Option<Vec<String>>,
171 #[clap(long)]
172 tls_mode: Option<TlsMode>,
173 #[clap(long)]
174 tls_cert_path: Option<String>,
175 #[clap(long)]
176 tls_key_path: Option<String>,
177 #[clap(long)]
178 user_provider: Option<String>,
179 #[clap(long)]
180 disable_dashboard: Option<bool>,
181 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
182 pub env_prefix: String,
183}
184
185impl StartCommand {
186 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
187 let mut opts = FrontendOptions::load_layered_options(
188 self.config_file.as_deref(),
189 self.env_prefix.as_ref(),
190 )
191 .context(error::LoadLayeredConfigSnafu)?;
192
193 self.merge_with_cli_options(global_options, &mut opts)?;
194
195 Ok(opts)
196 }
197
198 fn merge_with_cli_options(
200 &self,
201 global_options: &GlobalOptions,
202 opts: &mut FrontendOptions,
203 ) -> Result<()> {
204 let opts = &mut opts.component;
205
206 if let Some(dir) = &global_options.log_dir {
207 opts.logging.dir.clone_from(dir);
208 }
209
210 if opts.logging.dir.is_empty() {
212 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
213 .join(DEFAULT_LOGGING_DIR)
214 .to_string_lossy()
215 .to_string();
216 }
217
218 if global_options.log_level.is_some() {
219 opts.logging.level.clone_from(&global_options.log_level);
220 }
221
222 opts.tracing = TracingOptions {
223 #[cfg(feature = "tokio-console")]
224 tokio_console_addr: global_options.tokio_console_addr.clone(),
225 };
226
227 let tls_opts = TlsOption::new(
228 self.tls_mode.clone(),
229 self.tls_cert_path.clone(),
230 self.tls_key_path.clone(),
231 );
232
233 if let Some(addr) = &self.http_addr {
234 opts.http.addr.clone_from(addr);
235 }
236
237 if let Some(http_timeout) = self.http_timeout {
238 opts.http.timeout = Duration::from_secs(http_timeout)
239 }
240
241 if let Some(disable_dashboard) = self.disable_dashboard {
242 opts.http.disable_dashboard = disable_dashboard;
243 }
244
245 if let Some(addr) = &self.rpc_bind_addr {
246 opts.grpc.bind_addr.clone_from(addr);
247 opts.grpc.tls = tls_opts.clone();
248 }
249
250 if let Some(addr) = &self.rpc_server_addr {
251 opts.grpc.server_addr.clone_from(addr);
252 }
253
254 if let Some(addr) = &self.internal_rpc_bind_addr {
255 if let Some(internal_grpc) = &mut opts.internal_grpc {
256 internal_grpc.bind_addr = addr.clone();
257 } else {
258 let grpc_options = GrpcOptions {
259 bind_addr: addr.clone(),
260 ..Default::default()
261 };
262
263 opts.internal_grpc = Some(grpc_options);
264 }
265 }
266
267 if let Some(addr) = &self.internal_rpc_server_addr {
268 if let Some(internal_grpc) = &mut opts.internal_grpc {
269 internal_grpc.server_addr = addr.clone();
270 } else {
271 let grpc_options = GrpcOptions {
272 server_addr: addr.clone(),
273 ..Default::default()
274 };
275 opts.internal_grpc = Some(grpc_options);
276 }
277 }
278
279 if let Some(addr) = &self.mysql_addr {
280 opts.mysql.enable = true;
281 opts.mysql.addr.clone_from(addr);
282 opts.mysql.tls = tls_opts.clone();
283 }
284
285 if let Some(addr) = &self.postgres_addr {
286 opts.postgres.enable = true;
287 opts.postgres.addr.clone_from(addr);
288 opts.postgres.tls = tls_opts;
289 }
290
291 if let Some(enable) = self.influxdb_enable {
292 opts.influxdb.enable = enable;
293 }
294
295 if let Some(metasrv_addrs) = &self.metasrv_addrs {
296 opts.meta_client
297 .get_or_insert_with(MetaClientOptions::default)
298 .metasrv_addrs
299 .clone_from(metasrv_addrs);
300 }
301
302 if let Some(user_provider) = &self.user_provider {
303 opts.user_provider = Some(user_provider.clone());
304 }
305
306 Ok(())
307 }
308
309 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
310 common_runtime::init_global_runtimes(&opts.runtime);
311
312 let guard = common_telemetry::init_global_logging(
313 APP_NAME,
314 &opts.component.logging,
315 &opts.component.tracing,
316 opts.component.node_id.clone(),
317 Some(&opts.component.slow_query),
318 );
319
320 log_versions(verbose_version(), short_version(), APP_NAME);
321 maybe_activate_heap_profile(&opts.component.memory);
322 create_resource_limit_metrics(APP_NAME);
323
324 info!("Frontend start command: {:#?}", self);
325 info!("Frontend options: {:#?}", opts);
326
327 let plugin_opts = opts.plugins;
328 let mut opts = opts.component;
329 opts.grpc.detect_server_addr();
330 let mut plugins = Plugins::new();
331 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
332 .await
333 .context(error::StartFrontendSnafu)?;
334
335 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
336
337 let meta_client_options = opts
338 .meta_client
339 .as_ref()
340 .context(error::MissingConfigSnafu {
341 msg: "'meta_client'",
342 })?;
343
344 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
345 let cache_ttl = meta_client_options.metadata_cache_ttl;
346 let cache_tti = meta_client_options.metadata_cache_tti;
347
348 let meta_client = meta_client::create_meta_client(
349 MetaClientType::Frontend,
350 meta_client_options,
351 Some(&plugins),
352 None,
353 )
354 .await
355 .context(error::MetaClientInitSnafu)?;
356
357 let cached_meta_backend =
359 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
360 .cache_max_capacity(cache_max_capacity)
361 .cache_ttl(cache_ttl)
362 .cache_tti(cache_tti)
363 .build();
364 let cached_meta_backend = Arc::new(cached_meta_backend);
365
366 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
368 CacheRegistryBuilder::default()
369 .add_cache(cached_meta_backend.clone())
370 .build(),
371 );
372 let fundamental_cache_registry =
373 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
374 let layered_cache_registry = Arc::new(
375 with_default_composite_cache_registry(
376 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
377 )
378 .context(error::BuildCacheRegistrySnafu)?
379 .build(),
380 );
381
382 let mut channel_config = ChannelConfig {
385 timeout: None,
386 tcp_nodelay: opts.datanode.client.tcp_nodelay,
387 connect_timeout: Some(opts.datanode.client.connect_timeout),
388 ..Default::default()
389 };
390 if opts.grpc.flight_compression.transport_compression() {
391 channel_config.accept_compression = true;
392 channel_config.send_compression = true;
393 }
394 let client = Arc::new(NodeClients::new(channel_config));
395
396 let information_extension = Arc::new(DistributedInformationExtension::new(
397 meta_client.clone(),
398 client.clone(),
399 ));
400
401 let process_manager = Arc::new(ProcessManager::new(
402 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
403 Some(meta_client.clone()),
404 ));
405
406 let builder = KvBackendCatalogManagerBuilder::new(
407 information_extension,
408 cached_meta_backend.clone(),
409 layered_cache_registry.clone(),
410 )
411 .with_process_manager(process_manager.clone());
412 #[cfg(feature = "enterprise")]
413 let builder = if let Some(factories) = plugins.get() {
414 builder.with_extra_information_table_factories(factories)
415 } else {
416 builder
417 };
418 let catalog_manager = builder.build();
419
420 let executor = HandlerGroupExecutor::new(vec![
421 Arc::new(ParseMailboxMessageHandler),
422 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
423 ]);
424
425 let mut resource_stat = ResourceStatImpl::default();
426 resource_stat.start_collect_cpu_usage();
427
428 let heartbeat_task = HeartbeatTask::new(
429 &opts,
430 meta_client.clone(),
431 opts.heartbeat.clone(),
432 Arc::new(executor),
433 Arc::new(resource_stat),
434 );
435 let heartbeat_task = Some(heartbeat_task);
436
437 let instance = FrontendBuilder::new(
438 opts.clone(),
439 cached_meta_backend.clone(),
440 layered_cache_registry.clone(),
441 catalog_manager,
442 client,
443 meta_client,
444 process_manager,
445 )
446 .with_plugin(plugins.clone())
447 .with_local_cache_invalidator(layered_cache_registry)
448 .try_build()
449 .await
450 .context(error::StartFrontendSnafu)?;
451 let instance = Arc::new(instance);
452
453 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
454 .context(error::ServersSnafu)?;
455
456 let servers = Services::new(opts, instance.clone(), plugins)
457 .build()
458 .context(error::StartFrontendSnafu)?;
459
460 let frontend = Frontend {
461 instance,
462 servers,
463 heartbeat_task,
464 export_metrics_task,
465 };
466
467 Ok(Instance::new(frontend, guard))
468 }
469}
470
471#[cfg(test)]
472mod tests {
473 use std::io::Write;
474 use std::time::Duration;
475
476 use auth::{Identity, Password, UserProviderRef};
477 use common_base::readable_size::ReadableSize;
478 use common_config::ENV_VAR_SEP;
479 use common_test_util::temp_dir::create_named_temp_file;
480 use servers::grpc::GrpcOptions;
481 use servers::http::HttpOptions;
482
483 use super::*;
484 use crate::options::GlobalOptions;
485
486 #[test]
487 fn test_try_from_start_command() {
488 let command = StartCommand {
489 http_addr: Some("127.0.0.1:1234".to_string()),
490 mysql_addr: Some("127.0.0.1:5678".to_string()),
491 postgres_addr: Some("127.0.0.1:5432".to_string()),
492 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
493 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
494 influxdb_enable: Some(false),
495 disable_dashboard: Some(false),
496 ..Default::default()
497 };
498
499 let opts = command.load_options(&Default::default()).unwrap().component;
500
501 assert_eq!(opts.http.addr, "127.0.0.1:1234");
502 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
503 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
504 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
505
506 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
507 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
508 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
509
510 let default_opts = FrontendOptions::default().component;
511
512 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
513 assert!(opts.mysql.enable);
514 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
515 assert!(opts.postgres.enable);
516 assert_eq!(
517 opts.postgres.runtime_size,
518 default_opts.postgres.runtime_size
519 );
520 assert!(opts.opentsdb.enable);
521
522 assert!(!opts.influxdb.enable);
523 }
524
525 #[test]
526 fn test_read_from_config_file() {
527 let mut file = create_named_temp_file();
528 let toml_str = r#"
529 [http]
530 addr = "127.0.0.1:4000"
531 timeout = "0s"
532 body_limit = "2GB"
533
534 [opentsdb]
535 enable = false
536
537 [logging]
538 level = "debug"
539 dir = "./greptimedb_data/test/logs"
540 "#;
541 write!(file, "{}", toml_str).unwrap();
542
543 let command = StartCommand {
544 config_file: Some(file.path().to_str().unwrap().to_string()),
545 disable_dashboard: Some(false),
546 ..Default::default()
547 };
548
549 let fe_opts = command.load_options(&Default::default()).unwrap().component;
550
551 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
552 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
553
554 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
555
556 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
557 assert_eq!(
558 "./greptimedb_data/test/logs".to_string(),
559 fe_opts.logging.dir
560 );
561 assert!(!fe_opts.opentsdb.enable);
562 }
563
564 #[tokio::test]
565 async fn test_try_from_start_command_to_anymap() {
566 let fe_opts = frontend::frontend::FrontendOptions {
567 http: HttpOptions {
568 disable_dashboard: false,
569 ..Default::default()
570 },
571 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
572 ..Default::default()
573 };
574
575 let mut plugins = Plugins::new();
576 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
577 .await
578 .unwrap();
579
580 let provider = plugins.get::<UserProviderRef>().unwrap();
581 let result = provider
582 .authenticate(
583 Identity::UserId("test", None),
584 Password::PlainText("test".to_string().into()),
585 )
586 .await;
587 let _ = result.unwrap();
588 }
589
590 #[test]
591 fn test_load_log_options_from_cli() {
592 let cmd = StartCommand {
593 disable_dashboard: Some(false),
594 ..Default::default()
595 };
596
597 let options = cmd
598 .load_options(&GlobalOptions {
599 log_dir: Some("./greptimedb_data/test/logs".to_string()),
600 log_level: Some("debug".to_string()),
601
602 #[cfg(feature = "tokio-console")]
603 tokio_console_addr: None,
604 })
605 .unwrap()
606 .component;
607
608 let logging_opt = options.logging;
609 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
610 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
611 }
612
613 #[test]
614 fn test_config_precedence_order() {
615 let mut file = create_named_temp_file();
616 let toml_str = r#"
617 [http]
618 addr = "127.0.0.1:4000"
619
620 [meta_client]
621 timeout = "3s"
622 connect_timeout = "5s"
623 tcp_nodelay = true
624
625 [mysql]
626 addr = "127.0.0.1:4002"
627 "#;
628 write!(file, "{}", toml_str).unwrap();
629
630 let env_prefix = "FRONTEND_UT";
631 temp_env::with_vars(
632 [
633 (
634 [
636 env_prefix.to_string(),
637 "mysql".to_uppercase(),
638 "addr".to_uppercase(),
639 ]
640 .join(ENV_VAR_SEP),
641 Some("127.0.0.1:14002"),
642 ),
643 (
644 [
646 env_prefix.to_string(),
647 "mysql".to_uppercase(),
648 "runtime_size".to_uppercase(),
649 ]
650 .join(ENV_VAR_SEP),
651 Some("11"),
652 ),
653 (
654 [
656 env_prefix.to_string(),
657 "http".to_uppercase(),
658 "addr".to_uppercase(),
659 ]
660 .join(ENV_VAR_SEP),
661 Some("127.0.0.1:24000"),
662 ),
663 (
664 [
666 env_prefix.to_string(),
667 "meta_client".to_uppercase(),
668 "metasrv_addrs".to_uppercase(),
669 ]
670 .join(ENV_VAR_SEP),
671 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
672 ),
673 ],
674 || {
675 let command = StartCommand {
676 config_file: Some(file.path().to_str().unwrap().to_string()),
677 http_addr: Some("127.0.0.1:14000".to_string()),
678 env_prefix: env_prefix.to_string(),
679 ..Default::default()
680 };
681
682 let fe_opts = command.load_options(&Default::default()).unwrap().component;
683
684 assert_eq!(fe_opts.mysql.runtime_size, 11);
686 assert_eq!(
687 fe_opts.meta_client.unwrap().metasrv_addrs,
688 vec![
689 "127.0.0.1:3001".to_string(),
690 "127.0.0.1:3002".to_string(),
691 "127.0.0.1:3003".to_string()
692 ]
693 );
694
695 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
697
698 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
700
701 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
703 },
704 );
705 }
706}