1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_error::ext::BoxedError;
29use common_grpc::channel_manager::ChannelConfig;
30use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
31use common_meta::heartbeat::handler::HandlerGroupExecutor;
32use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
33use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
34use common_query::prelude::set_default_prefix;
35use common_stat::ResourceStatImpl;
36use common_telemetry::info;
37use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
38use common_time::timezone::set_default_timezone;
39use common_version::{short_version, verbose_version};
40use frontend::frontend::Frontend;
41use frontend::heartbeat::HeartbeatTask;
42use frontend::instance::builder::FrontendBuilder;
43use frontend::server::Services;
44use meta_client::{MetaClientOptions, MetaClientType};
45use servers::addrs;
46use servers::grpc::GrpcOptions;
47use servers::tls::{TlsMode, TlsOption};
48use snafu::{OptionExt, ResultExt};
49use tracing_appender::non_blocking::WorkerGuard;
50
51use crate::error::{self, Result};
52use crate::options::{GlobalOptions, GreptimeOptions};
53use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
54
55type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
56
57pub struct Instance {
58 frontend: Frontend,
59 _guard: Vec<WorkerGuard>,
61}
62
63pub const APP_NAME: &str = "greptime-frontend";
64
65impl Instance {
66 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
67 Self { frontend, _guard }
68 }
69
70 pub fn inner(&self) -> &Frontend {
71 &self.frontend
72 }
73
74 pub fn mut_inner(&mut self) -> &mut Frontend {
75 &mut self.frontend
76 }
77}
78
79#[async_trait]
80impl App for Instance {
81 fn name(&self) -> &str {
82 APP_NAME
83 }
84
85 async fn start(&mut self) -> Result<()> {
86 let plugins = self.frontend.instance.plugins().clone();
87 plugins::start_frontend_plugins(plugins)
88 .await
89 .context(error::StartFrontendSnafu)?;
90
91 self.frontend
92 .start()
93 .await
94 .context(error::StartFrontendSnafu)
95 }
96
97 async fn stop(&mut self) -> Result<()> {
98 self.frontend
99 .shutdown()
100 .await
101 .context(error::ShutdownFrontendSnafu)
102 }
103}
104
105#[derive(Parser)]
106pub struct Command {
107 #[clap(subcommand)]
108 pub subcmd: SubCommand,
109}
110
111impl Command {
112 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
113 self.subcmd.build(opts).await
114 }
115
116 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
117 self.subcmd.load_options(global_options)
118 }
119}
120
121#[derive(Parser)]
122pub enum SubCommand {
123 Start(StartCommand),
124}
125
126impl SubCommand {
127 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
128 match self {
129 SubCommand::Start(cmd) => cmd.build(opts).await,
130 }
131 }
132
133 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
134 match self {
135 SubCommand::Start(cmd) => cmd.load_options(global_options),
136 }
137 }
138}
139
140#[derive(Debug, Default, Parser)]
141pub struct StartCommand {
142 #[clap(long, alias = "rpc-addr")]
144 rpc_bind_addr: Option<String>,
145 #[clap(long, alias = "rpc-hostname")]
149 rpc_server_addr: Option<String>,
150 #[clap(long, alias = "internal-rpc-addr")]
152 internal_rpc_bind_addr: Option<String>,
153 #[clap(long, alias = "internal-rpc-hostname")]
157 internal_rpc_server_addr: Option<String>,
158 #[clap(long)]
159 http_addr: Option<String>,
160 #[clap(long)]
161 http_timeout: Option<u64>,
162 #[clap(long)]
163 mysql_addr: Option<String>,
164 #[clap(long)]
165 postgres_addr: Option<String>,
166 #[clap(short, long)]
167 pub config_file: Option<String>,
168 #[clap(short, long)]
169 influxdb_enable: Option<bool>,
170 #[clap(long, value_delimiter = ',', num_args = 1..)]
171 metasrv_addrs: Option<Vec<String>>,
172 #[clap(long)]
173 tls_mode: Option<TlsMode>,
174 #[clap(long)]
175 tls_cert_path: Option<String>,
176 #[clap(long)]
177 tls_key_path: Option<String>,
178 #[clap(long)]
179 tls_watch: bool,
180 #[clap(long)]
181 user_provider: Option<String>,
182 #[clap(long)]
183 disable_dashboard: Option<bool>,
184 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
185 pub env_prefix: String,
186}
187
188impl StartCommand {
189 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
190 let mut opts = FrontendOptions::load_layered_options(
191 self.config_file.as_deref(),
192 self.env_prefix.as_ref(),
193 )
194 .context(error::LoadLayeredConfigSnafu)?;
195
196 self.merge_with_cli_options(global_options, &mut opts)?;
197
198 Ok(opts)
199 }
200
201 fn merge_with_cli_options(
203 &self,
204 global_options: &GlobalOptions,
205 opts: &mut FrontendOptions,
206 ) -> Result<()> {
207 let opts = &mut opts.component;
208
209 if let Some(dir) = &global_options.log_dir {
210 opts.logging.dir.clone_from(dir);
211 }
212
213 if opts.logging.dir.is_empty() {
215 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
216 .join(DEFAULT_LOGGING_DIR)
217 .to_string_lossy()
218 .to_string();
219 }
220
221 if global_options.log_level.is_some() {
222 opts.logging.level.clone_from(&global_options.log_level);
223 }
224
225 opts.tracing = TracingOptions {
226 #[cfg(feature = "tokio-console")]
227 tokio_console_addr: global_options.tokio_console_addr.clone(),
228 };
229
230 let tls_opts = TlsOption::new(
231 self.tls_mode.clone(),
232 self.tls_cert_path.clone(),
233 self.tls_key_path.clone(),
234 self.tls_watch,
235 );
236
237 if let Some(addr) = &self.http_addr {
238 opts.http.addr.clone_from(addr);
239 }
240
241 if let Some(http_timeout) = self.http_timeout {
242 opts.http.timeout = Duration::from_secs(http_timeout)
243 }
244
245 if let Some(disable_dashboard) = self.disable_dashboard {
246 opts.http.disable_dashboard = disable_dashboard;
247 }
248
249 if let Some(addr) = &self.rpc_bind_addr {
250 opts.grpc.bind_addr.clone_from(addr);
251 opts.grpc.tls = tls_opts.clone();
252 }
253
254 if let Some(addr) = &self.rpc_server_addr {
255 opts.grpc.server_addr.clone_from(addr);
256 }
257
258 if let Some(addr) = &self.internal_rpc_bind_addr {
259 if let Some(internal_grpc) = &mut opts.internal_grpc {
260 internal_grpc.bind_addr = addr.clone();
261 } else {
262 let grpc_options = GrpcOptions {
263 bind_addr: addr.clone(),
264 ..Default::default()
265 };
266
267 opts.internal_grpc = Some(grpc_options);
268 }
269 }
270
271 if let Some(addr) = &self.internal_rpc_server_addr {
272 if let Some(internal_grpc) = &mut opts.internal_grpc {
273 internal_grpc.server_addr = addr.clone();
274 } else {
275 let grpc_options = GrpcOptions {
276 server_addr: addr.clone(),
277 ..Default::default()
278 };
279 opts.internal_grpc = Some(grpc_options);
280 }
281 }
282
283 if let Some(addr) = &self.mysql_addr {
284 opts.mysql.enable = true;
285 opts.mysql.addr.clone_from(addr);
286 opts.mysql.tls = tls_opts.clone();
287 }
288
289 if let Some(addr) = &self.postgres_addr {
290 opts.postgres.enable = true;
291 opts.postgres.addr.clone_from(addr);
292 opts.postgres.tls = tls_opts;
293 }
294
295 if let Some(enable) = self.influxdb_enable {
296 opts.influxdb.enable = enable;
297 }
298
299 if let Some(metasrv_addrs) = &self.metasrv_addrs {
300 opts.meta_client
301 .get_or_insert_with(MetaClientOptions::default)
302 .metasrv_addrs
303 .clone_from(metasrv_addrs);
304 }
305
306 if let Some(user_provider) = &self.user_provider {
307 opts.user_provider = Some(user_provider.clone());
308 }
309
310 Ok(())
311 }
312
313 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
314 common_runtime::init_global_runtimes(&opts.runtime);
315
316 let guard = common_telemetry::init_global_logging(
317 APP_NAME,
318 &opts.component.logging,
319 &opts.component.tracing,
320 opts.component.node_id.clone(),
321 Some(&opts.component.slow_query),
322 );
323
324 log_versions(verbose_version(), short_version(), APP_NAME);
325 maybe_activate_heap_profile(&opts.component.memory);
326 create_resource_limit_metrics(APP_NAME);
327
328 info!("Frontend start command: {:#?}", self);
329 info!("Frontend options: {:#?}", opts);
330
331 let plugin_opts = opts.plugins;
332 let mut opts = opts.component;
333 opts.grpc.detect_server_addr();
334 let mut plugins = Plugins::new();
335 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
336 .await
337 .context(error::StartFrontendSnafu)?;
338
339 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
340 set_default_prefix(opts.default_column_prefix.as_deref())
341 .map_err(BoxedError::new)
342 .context(error::BuildCliSnafu)?;
343
344 let meta_client_options = opts
345 .meta_client
346 .as_ref()
347 .context(error::MissingConfigSnafu {
348 msg: "'meta_client'",
349 })?;
350
351 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
352 let cache_ttl = meta_client_options.metadata_cache_ttl;
353 let cache_tti = meta_client_options.metadata_cache_tti;
354
355 let meta_client = meta_client::create_meta_client(
356 MetaClientType::Frontend,
357 meta_client_options,
358 Some(&plugins),
359 None,
360 )
361 .await
362 .context(error::MetaClientInitSnafu)?;
363
364 let cached_meta_backend =
366 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
367 .cache_max_capacity(cache_max_capacity)
368 .cache_ttl(cache_ttl)
369 .cache_tti(cache_tti)
370 .build();
371 let cached_meta_backend = Arc::new(cached_meta_backend);
372
373 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
375 CacheRegistryBuilder::default()
376 .add_cache(cached_meta_backend.clone())
377 .build(),
378 );
379 let fundamental_cache_registry =
380 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
381 let layered_cache_registry = Arc::new(
382 with_default_composite_cache_registry(
383 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
384 )
385 .context(error::BuildCacheRegistrySnafu)?
386 .build(),
387 );
388
389 let mut channel_config = ChannelConfig {
392 timeout: None,
393 tcp_nodelay: opts.datanode.client.tcp_nodelay,
394 connect_timeout: Some(opts.datanode.client.connect_timeout),
395 ..Default::default()
396 };
397 if opts.grpc.flight_compression.transport_compression() {
398 channel_config.accept_compression = true;
399 channel_config.send_compression = true;
400 }
401 let client = Arc::new(NodeClients::new(channel_config));
402
403 let information_extension = Arc::new(DistributedInformationExtension::new(
404 meta_client.clone(),
405 client.clone(),
406 ));
407
408 let process_manager = Arc::new(ProcessManager::new(
409 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
410 Some(meta_client.clone()),
411 ));
412
413 let builder = KvBackendCatalogManagerBuilder::new(
414 information_extension,
415 cached_meta_backend.clone(),
416 layered_cache_registry.clone(),
417 )
418 .with_process_manager(process_manager.clone());
419 #[cfg(feature = "enterprise")]
420 let builder = if let Some(factories) = plugins.get() {
421 builder.with_extra_information_table_factories(factories)
422 } else {
423 builder
424 };
425 let catalog_manager = builder.build();
426
427 let executor = HandlerGroupExecutor::new(vec![
428 Arc::new(ParseMailboxMessageHandler),
429 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
430 ]);
431
432 let mut resource_stat = ResourceStatImpl::default();
433 resource_stat.start_collect_cpu_usage();
434
435 let heartbeat_task = HeartbeatTask::new(
436 &opts,
437 meta_client.clone(),
438 opts.heartbeat.clone(),
439 Arc::new(executor),
440 Arc::new(resource_stat),
441 );
442 let heartbeat_task = Some(heartbeat_task);
443
444 let instance = FrontendBuilder::new(
445 opts.clone(),
446 cached_meta_backend.clone(),
447 layered_cache_registry.clone(),
448 catalog_manager,
449 client,
450 meta_client,
451 process_manager,
452 )
453 .with_plugin(plugins.clone())
454 .with_local_cache_invalidator(layered_cache_registry)
455 .try_build()
456 .await
457 .context(error::StartFrontendSnafu)?;
458 let instance = Arc::new(instance);
459
460 let servers = Services::new(opts, instance.clone(), plugins)
461 .build()
462 .context(error::StartFrontendSnafu)?;
463
464 let frontend = Frontend {
465 instance,
466 servers,
467 heartbeat_task,
468 };
469
470 Ok(Instance::new(frontend, guard))
471 }
472}
473
474#[cfg(test)]
475mod tests {
476 use std::io::Write;
477 use std::time::Duration;
478
479 use auth::{Identity, Password, UserProviderRef};
480 use common_base::readable_size::ReadableSize;
481 use common_config::ENV_VAR_SEP;
482 use common_test_util::temp_dir::create_named_temp_file;
483 use servers::grpc::GrpcOptions;
484 use servers::http::HttpOptions;
485
486 use super::*;
487 use crate::options::GlobalOptions;
488
489 #[test]
490 fn test_try_from_start_command() {
491 let command = StartCommand {
492 http_addr: Some("127.0.0.1:1234".to_string()),
493 mysql_addr: Some("127.0.0.1:5678".to_string()),
494 postgres_addr: Some("127.0.0.1:5432".to_string()),
495 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
496 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
497 influxdb_enable: Some(false),
498 disable_dashboard: Some(false),
499 ..Default::default()
500 };
501
502 let opts = command.load_options(&Default::default()).unwrap().component;
503
504 assert_eq!(opts.http.addr, "127.0.0.1:1234");
505 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
506 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
507 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
508
509 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
510 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
511 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
512
513 let default_opts = FrontendOptions::default().component;
514
515 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
516 assert!(opts.mysql.enable);
517 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
518 assert!(opts.postgres.enable);
519 assert_eq!(
520 opts.postgres.runtime_size,
521 default_opts.postgres.runtime_size
522 );
523 assert!(opts.opentsdb.enable);
524
525 assert!(!opts.influxdb.enable);
526 }
527
528 #[test]
529 fn test_read_from_config_file() {
530 let mut file = create_named_temp_file();
531 let toml_str = r#"
532 [http]
533 addr = "127.0.0.1:4000"
534 timeout = "0s"
535 body_limit = "2GB"
536
537 [opentsdb]
538 enable = false
539
540 [logging]
541 level = "debug"
542 dir = "./greptimedb_data/test/logs"
543 "#;
544 write!(file, "{}", toml_str).unwrap();
545
546 let command = StartCommand {
547 config_file: Some(file.path().to_str().unwrap().to_string()),
548 disable_dashboard: Some(false),
549 ..Default::default()
550 };
551
552 let fe_opts = command.load_options(&Default::default()).unwrap().component;
553
554 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
555 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
556
557 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
558
559 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
560 assert_eq!(
561 "./greptimedb_data/test/logs".to_string(),
562 fe_opts.logging.dir
563 );
564 assert!(!fe_opts.opentsdb.enable);
565 }
566
567 #[tokio::test]
568 async fn test_try_from_start_command_to_anymap() {
569 let fe_opts = frontend::frontend::FrontendOptions {
570 http: HttpOptions {
571 disable_dashboard: false,
572 ..Default::default()
573 },
574 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
575 ..Default::default()
576 };
577
578 let mut plugins = Plugins::new();
579 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
580 .await
581 .unwrap();
582
583 let provider = plugins.get::<UserProviderRef>().unwrap();
584 let result = provider
585 .authenticate(
586 Identity::UserId("test", None),
587 Password::PlainText("test".to_string().into()),
588 )
589 .await;
590 let _ = result.unwrap();
591 }
592
593 #[test]
594 fn test_load_log_options_from_cli() {
595 let cmd = StartCommand {
596 disable_dashboard: Some(false),
597 ..Default::default()
598 };
599
600 let options = cmd
601 .load_options(&GlobalOptions {
602 log_dir: Some("./greptimedb_data/test/logs".to_string()),
603 log_level: Some("debug".to_string()),
604
605 #[cfg(feature = "tokio-console")]
606 tokio_console_addr: None,
607 })
608 .unwrap()
609 .component;
610
611 let logging_opt = options.logging;
612 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
613 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
614 }
615
616 #[test]
617 fn test_config_precedence_order() {
618 let mut file = create_named_temp_file();
619 let toml_str = r#"
620 [http]
621 addr = "127.0.0.1:4000"
622
623 [meta_client]
624 timeout = "3s"
625 connect_timeout = "5s"
626 tcp_nodelay = true
627
628 [mysql]
629 addr = "127.0.0.1:4002"
630 "#;
631 write!(file, "{}", toml_str).unwrap();
632
633 let env_prefix = "FRONTEND_UT";
634 temp_env::with_vars(
635 [
636 (
637 [
639 env_prefix.to_string(),
640 "mysql".to_uppercase(),
641 "addr".to_uppercase(),
642 ]
643 .join(ENV_VAR_SEP),
644 Some("127.0.0.1:14002"),
645 ),
646 (
647 [
649 env_prefix.to_string(),
650 "mysql".to_uppercase(),
651 "runtime_size".to_uppercase(),
652 ]
653 .join(ENV_VAR_SEP),
654 Some("11"),
655 ),
656 (
657 [
659 env_prefix.to_string(),
660 "http".to_uppercase(),
661 "addr".to_uppercase(),
662 ]
663 .join(ENV_VAR_SEP),
664 Some("127.0.0.1:24000"),
665 ),
666 (
667 [
669 env_prefix.to_string(),
670 "meta_client".to_uppercase(),
671 "metasrv_addrs".to_uppercase(),
672 ]
673 .join(ENV_VAR_SEP),
674 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
675 ),
676 ],
677 || {
678 let command = StartCommand {
679 config_file: Some(file.path().to_str().unwrap().to_string()),
680 http_addr: Some("127.0.0.1:14000".to_string()),
681 env_prefix: env_prefix.to_string(),
682 ..Default::default()
683 };
684
685 let fe_opts = command.load_options(&Default::default()).unwrap().component;
686
687 assert_eq!(fe_opts.mysql.runtime_size, 11);
689 assert_eq!(
690 fe_opts.meta_client.unwrap().metasrv_addrs,
691 vec![
692 "127.0.0.1:3001".to_string(),
693 "127.0.0.1:3002".to_string(),
694 "127.0.0.1:3003".to_string()
695 ]
696 );
697
698 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
700
701 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
703
704 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
706 },
707 );
708 }
709}