1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManagerBuilder, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
32use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
33use common_telemetry::info;
34use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
35use common_time::timezone::set_default_timezone;
36use common_version::{short_version, verbose_version};
37use frontend::frontend::Frontend;
38use frontend::heartbeat::HeartbeatTask;
39use frontend::instance::builder::FrontendBuilder;
40use frontend::server::Services;
41use meta_client::{MetaClientOptions, MetaClientType};
42use servers::addrs;
43use servers::export_metrics::ExportMetricsTask;
44use servers::grpc::GrpcOptions;
45use servers::tls::{TlsMode, TlsOption};
46use snafu::{OptionExt, ResultExt};
47use tracing_appender::non_blocking::WorkerGuard;
48
49use crate::error::{self, Result};
50use crate::options::{GlobalOptions, GreptimeOptions};
51use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
52
53type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
54
55pub struct Instance {
56 frontend: Frontend,
57 _guard: Vec<WorkerGuard>,
59}
60
61pub const APP_NAME: &str = "greptime-frontend";
62
63impl Instance {
64 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
65 Self { frontend, _guard }
66 }
67
68 pub fn inner(&self) -> &Frontend {
69 &self.frontend
70 }
71
72 pub fn mut_inner(&mut self) -> &mut Frontend {
73 &mut self.frontend
74 }
75}
76
77#[async_trait]
78impl App for Instance {
79 fn name(&self) -> &str {
80 APP_NAME
81 }
82
83 async fn start(&mut self) -> Result<()> {
84 let plugins = self.frontend.instance.plugins().clone();
85 plugins::start_frontend_plugins(plugins)
86 .await
87 .context(error::StartFrontendSnafu)?;
88
89 self.frontend
90 .start()
91 .await
92 .context(error::StartFrontendSnafu)
93 }
94
95 async fn stop(&mut self) -> Result<()> {
96 self.frontend
97 .shutdown()
98 .await
99 .context(error::ShutdownFrontendSnafu)
100 }
101}
102
103#[derive(Parser)]
104pub struct Command {
105 #[clap(subcommand)]
106 pub subcmd: SubCommand,
107}
108
109impl Command {
110 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
111 self.subcmd.build(opts).await
112 }
113
114 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
115 self.subcmd.load_options(global_options)
116 }
117}
118
119#[derive(Parser)]
120pub enum SubCommand {
121 Start(StartCommand),
122}
123
124impl SubCommand {
125 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
126 match self {
127 SubCommand::Start(cmd) => cmd.build(opts).await,
128 }
129 }
130
131 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
132 match self {
133 SubCommand::Start(cmd) => cmd.load_options(global_options),
134 }
135 }
136}
137
138#[derive(Debug, Default, Parser)]
139pub struct StartCommand {
140 #[clap(long, alias = "rpc-addr")]
142 rpc_bind_addr: Option<String>,
143 #[clap(long, alias = "rpc-hostname")]
147 rpc_server_addr: Option<String>,
148 #[clap(long, alias = "internal-rpc-addr")]
150 internal_rpc_bind_addr: Option<String>,
151 #[clap(long, alias = "internal-rpc-hostname")]
155 internal_rpc_server_addr: Option<String>,
156 #[clap(long)]
157 http_addr: Option<String>,
158 #[clap(long)]
159 http_timeout: Option<u64>,
160 #[clap(long)]
161 mysql_addr: Option<String>,
162 #[clap(long)]
163 postgres_addr: Option<String>,
164 #[clap(short, long)]
165 pub config_file: Option<String>,
166 #[clap(short, long)]
167 influxdb_enable: Option<bool>,
168 #[clap(long, value_delimiter = ',', num_args = 1..)]
169 metasrv_addrs: Option<Vec<String>>,
170 #[clap(long)]
171 tls_mode: Option<TlsMode>,
172 #[clap(long)]
173 tls_cert_path: Option<String>,
174 #[clap(long)]
175 tls_key_path: Option<String>,
176 #[clap(long)]
177 user_provider: Option<String>,
178 #[clap(long)]
179 disable_dashboard: Option<bool>,
180 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
181 pub env_prefix: String,
182}
183
184impl StartCommand {
185 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
186 let mut opts = FrontendOptions::load_layered_options(
187 self.config_file.as_deref(),
188 self.env_prefix.as_ref(),
189 )
190 .context(error::LoadLayeredConfigSnafu)?;
191
192 self.merge_with_cli_options(global_options, &mut opts)?;
193
194 Ok(opts)
195 }
196
197 fn merge_with_cli_options(
199 &self,
200 global_options: &GlobalOptions,
201 opts: &mut FrontendOptions,
202 ) -> Result<()> {
203 let opts = &mut opts.component;
204
205 if let Some(dir) = &global_options.log_dir {
206 opts.logging.dir.clone_from(dir);
207 }
208
209 if opts.logging.dir.is_empty() {
211 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
212 .join(DEFAULT_LOGGING_DIR)
213 .to_string_lossy()
214 .to_string();
215 }
216
217 if global_options.log_level.is_some() {
218 opts.logging.level.clone_from(&global_options.log_level);
219 }
220
221 opts.tracing = TracingOptions {
222 #[cfg(feature = "tokio-console")]
223 tokio_console_addr: global_options.tokio_console_addr.clone(),
224 };
225
226 let tls_opts = TlsOption::new(
227 self.tls_mode.clone(),
228 self.tls_cert_path.clone(),
229 self.tls_key_path.clone(),
230 );
231
232 if let Some(addr) = &self.http_addr {
233 opts.http.addr.clone_from(addr);
234 }
235
236 if let Some(http_timeout) = self.http_timeout {
237 opts.http.timeout = Duration::from_secs(http_timeout)
238 }
239
240 if let Some(disable_dashboard) = self.disable_dashboard {
241 opts.http.disable_dashboard = disable_dashboard;
242 }
243
244 if let Some(addr) = &self.rpc_bind_addr {
245 opts.grpc.bind_addr.clone_from(addr);
246 opts.grpc.tls = tls_opts.clone();
247 }
248
249 if let Some(addr) = &self.rpc_server_addr {
250 opts.grpc.server_addr.clone_from(addr);
251 }
252
253 if let Some(addr) = &self.internal_rpc_bind_addr {
254 if let Some(internal_grpc) = &mut opts.internal_grpc {
255 internal_grpc.bind_addr = addr.to_string();
256 } else {
257 let grpc_options = GrpcOptions {
258 bind_addr: addr.to_string(),
259 ..Default::default()
260 };
261
262 opts.internal_grpc = Some(grpc_options);
263 }
264 }
265
266 if let Some(addr) = &self.internal_rpc_server_addr {
267 if let Some(internal_grpc) = &mut opts.internal_grpc {
268 internal_grpc.server_addr = addr.to_string();
269 } else {
270 let grpc_options = GrpcOptions {
271 server_addr: addr.to_string(),
272 ..Default::default()
273 };
274 opts.internal_grpc = Some(grpc_options);
275 }
276 }
277
278 if let Some(addr) = &self.mysql_addr {
279 opts.mysql.enable = true;
280 opts.mysql.addr.clone_from(addr);
281 opts.mysql.tls = tls_opts.clone();
282 }
283
284 if let Some(addr) = &self.postgres_addr {
285 opts.postgres.enable = true;
286 opts.postgres.addr.clone_from(addr);
287 opts.postgres.tls = tls_opts;
288 }
289
290 if let Some(enable) = self.influxdb_enable {
291 opts.influxdb.enable = enable;
292 }
293
294 if let Some(metasrv_addrs) = &self.metasrv_addrs {
295 opts.meta_client
296 .get_or_insert_with(MetaClientOptions::default)
297 .metasrv_addrs
298 .clone_from(metasrv_addrs);
299 }
300
301 if let Some(user_provider) = &self.user_provider {
302 opts.user_provider = Some(user_provider.clone());
303 }
304
305 Ok(())
306 }
307
308 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
309 common_runtime::init_global_runtimes(&opts.runtime);
310
311 let guard = common_telemetry::init_global_logging(
312 APP_NAME,
313 &opts.component.logging,
314 &opts.component.tracing,
315 opts.component.node_id.clone(),
316 Some(&opts.component.slow_query),
317 );
318
319 log_versions(verbose_version(), short_version(), APP_NAME);
320 maybe_activate_heap_profile(&opts.component.memory);
321 create_resource_limit_metrics(APP_NAME);
322
323 info!("Frontend start command: {:#?}", self);
324 info!("Frontend options: {:#?}", opts);
325
326 let plugin_opts = opts.plugins;
327 let mut opts = opts.component;
328 opts.grpc.detect_server_addr();
329 let mut plugins = Plugins::new();
330 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
331 .await
332 .context(error::StartFrontendSnafu)?;
333
334 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
335
336 let meta_client_options = opts
337 .meta_client
338 .as_ref()
339 .context(error::MissingConfigSnafu {
340 msg: "'meta_client'",
341 })?;
342
343 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
344 let cache_ttl = meta_client_options.metadata_cache_ttl;
345 let cache_tti = meta_client_options.metadata_cache_tti;
346
347 let meta_client = meta_client::create_meta_client(
348 MetaClientType::Frontend,
349 meta_client_options,
350 Some(&plugins),
351 None,
352 )
353 .await
354 .context(error::MetaClientInitSnafu)?;
355
356 let cached_meta_backend =
358 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
359 .cache_max_capacity(cache_max_capacity)
360 .cache_ttl(cache_ttl)
361 .cache_tti(cache_tti)
362 .build();
363 let cached_meta_backend = Arc::new(cached_meta_backend);
364
365 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
367 CacheRegistryBuilder::default()
368 .add_cache(cached_meta_backend.clone())
369 .build(),
370 );
371 let fundamental_cache_registry =
372 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
373 let layered_cache_registry = Arc::new(
374 with_default_composite_cache_registry(
375 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
376 )
377 .context(error::BuildCacheRegistrySnafu)?
378 .build(),
379 );
380
381 let mut channel_config = ChannelConfig {
384 timeout: None,
385 tcp_nodelay: opts.datanode.client.tcp_nodelay,
386 connect_timeout: Some(opts.datanode.client.connect_timeout),
387 ..Default::default()
388 };
389 if opts.grpc.flight_compression.transport_compression() {
390 channel_config.accept_compression = true;
391 channel_config.send_compression = true;
392 }
393 let client = Arc::new(NodeClients::new(channel_config));
394
395 let information_extension = Arc::new(DistributedInformationExtension::new(
396 meta_client.clone(),
397 client.clone(),
398 ));
399
400 let process_manager = Arc::new(ProcessManager::new(
401 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
402 Some(meta_client.clone()),
403 ));
404
405 let builder = KvBackendCatalogManagerBuilder::new(
406 information_extension,
407 cached_meta_backend.clone(),
408 layered_cache_registry.clone(),
409 )
410 .with_process_manager(process_manager.clone());
411 #[cfg(feature = "enterprise")]
412 let builder = if let Some(factories) = plugins.get() {
413 builder.with_extra_information_table_factories(factories)
414 } else {
415 builder
416 };
417 let catalog_manager = builder.build();
418
419 let executor = HandlerGroupExecutor::new(vec![
420 Arc::new(ParseMailboxMessageHandler),
421 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
422 ]);
423
424 let heartbeat_task = HeartbeatTask::new(
425 &opts,
426 meta_client.clone(),
427 opts.heartbeat.clone(),
428 Arc::new(executor),
429 );
430 let heartbeat_task = Some(heartbeat_task);
431
432 let instance = FrontendBuilder::new(
433 opts.clone(),
434 cached_meta_backend.clone(),
435 layered_cache_registry.clone(),
436 catalog_manager,
437 client,
438 meta_client,
439 process_manager,
440 )
441 .with_plugin(plugins.clone())
442 .with_local_cache_invalidator(layered_cache_registry)
443 .try_build()
444 .await
445 .context(error::StartFrontendSnafu)?;
446 let instance = Arc::new(instance);
447
448 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
449 .context(error::ServersSnafu)?;
450
451 let servers = Services::new(opts, instance.clone(), plugins)
452 .build()
453 .context(error::StartFrontendSnafu)?;
454
455 let frontend = Frontend {
456 instance,
457 servers,
458 heartbeat_task,
459 export_metrics_task,
460 };
461
462 Ok(Instance::new(frontend, guard))
463 }
464}
465
466#[cfg(test)]
467mod tests {
468 use std::io::Write;
469 use std::time::Duration;
470
471 use auth::{Identity, Password, UserProviderRef};
472 use common_base::readable_size::ReadableSize;
473 use common_config::ENV_VAR_SEP;
474 use common_test_util::temp_dir::create_named_temp_file;
475 use servers::grpc::GrpcOptions;
476 use servers::http::HttpOptions;
477
478 use super::*;
479 use crate::options::GlobalOptions;
480
481 #[test]
482 fn test_try_from_start_command() {
483 let command = StartCommand {
484 http_addr: Some("127.0.0.1:1234".to_string()),
485 mysql_addr: Some("127.0.0.1:5678".to_string()),
486 postgres_addr: Some("127.0.0.1:5432".to_string()),
487 internal_rpc_bind_addr: Some("127.0.0.1:4010".to_string()),
488 internal_rpc_server_addr: Some("10.0.0.24:4010".to_string()),
489 influxdb_enable: Some(false),
490 disable_dashboard: Some(false),
491 ..Default::default()
492 };
493
494 let opts = command.load_options(&Default::default()).unwrap().component;
495
496 assert_eq!(opts.http.addr, "127.0.0.1:1234");
497 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
498 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
499 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
500
501 let internal_grpc = opts.internal_grpc.as_ref().unwrap();
502 assert_eq!(internal_grpc.bind_addr, "127.0.0.1:4010");
503 assert_eq!(internal_grpc.server_addr, "10.0.0.24:4010");
504
505 let default_opts = FrontendOptions::default().component;
506
507 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
508 assert!(opts.mysql.enable);
509 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
510 assert!(opts.postgres.enable);
511 assert_eq!(
512 opts.postgres.runtime_size,
513 default_opts.postgres.runtime_size
514 );
515 assert!(opts.opentsdb.enable);
516
517 assert!(!opts.influxdb.enable);
518 }
519
520 #[test]
521 fn test_read_from_config_file() {
522 let mut file = create_named_temp_file();
523 let toml_str = r#"
524 [http]
525 addr = "127.0.0.1:4000"
526 timeout = "0s"
527 body_limit = "2GB"
528
529 [opentsdb]
530 enable = false
531
532 [logging]
533 level = "debug"
534 dir = "./greptimedb_data/test/logs"
535 "#;
536 write!(file, "{}", toml_str).unwrap();
537
538 let command = StartCommand {
539 config_file: Some(file.path().to_str().unwrap().to_string()),
540 disable_dashboard: Some(false),
541 ..Default::default()
542 };
543
544 let fe_opts = command.load_options(&Default::default()).unwrap().component;
545
546 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
547 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
548
549 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
550
551 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
552 assert_eq!(
553 "./greptimedb_data/test/logs".to_string(),
554 fe_opts.logging.dir
555 );
556 assert!(!fe_opts.opentsdb.enable);
557 }
558
559 #[tokio::test]
560 async fn test_try_from_start_command_to_anymap() {
561 let fe_opts = frontend::frontend::FrontendOptions {
562 http: HttpOptions {
563 disable_dashboard: false,
564 ..Default::default()
565 },
566 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
567 ..Default::default()
568 };
569
570 let mut plugins = Plugins::new();
571 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
572 .await
573 .unwrap();
574
575 let provider = plugins.get::<UserProviderRef>().unwrap();
576 let result = provider
577 .authenticate(
578 Identity::UserId("test", None),
579 Password::PlainText("test".to_string().into()),
580 )
581 .await;
582 let _ = result.unwrap();
583 }
584
585 #[test]
586 fn test_load_log_options_from_cli() {
587 let cmd = StartCommand {
588 disable_dashboard: Some(false),
589 ..Default::default()
590 };
591
592 let options = cmd
593 .load_options(&GlobalOptions {
594 log_dir: Some("./greptimedb_data/test/logs".to_string()),
595 log_level: Some("debug".to_string()),
596
597 #[cfg(feature = "tokio-console")]
598 tokio_console_addr: None,
599 })
600 .unwrap()
601 .component;
602
603 let logging_opt = options.logging;
604 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
605 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
606 }
607
608 #[test]
609 fn test_config_precedence_order() {
610 let mut file = create_named_temp_file();
611 let toml_str = r#"
612 [http]
613 addr = "127.0.0.1:4000"
614
615 [meta_client]
616 timeout = "3s"
617 connect_timeout = "5s"
618 tcp_nodelay = true
619
620 [mysql]
621 addr = "127.0.0.1:4002"
622 "#;
623 write!(file, "{}", toml_str).unwrap();
624
625 let env_prefix = "FRONTEND_UT";
626 temp_env::with_vars(
627 [
628 (
629 [
631 env_prefix.to_string(),
632 "mysql".to_uppercase(),
633 "addr".to_uppercase(),
634 ]
635 .join(ENV_VAR_SEP),
636 Some("127.0.0.1:14002"),
637 ),
638 (
639 [
641 env_prefix.to_string(),
642 "mysql".to_uppercase(),
643 "runtime_size".to_uppercase(),
644 ]
645 .join(ENV_VAR_SEP),
646 Some("11"),
647 ),
648 (
649 [
651 env_prefix.to_string(),
652 "http".to_uppercase(),
653 "addr".to_uppercase(),
654 ]
655 .join(ENV_VAR_SEP),
656 Some("127.0.0.1:24000"),
657 ),
658 (
659 [
661 env_prefix.to_string(),
662 "meta_client".to_uppercase(),
663 "metasrv_addrs".to_uppercase(),
664 ]
665 .join(ENV_VAR_SEP),
666 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
667 ),
668 ],
669 || {
670 let command = StartCommand {
671 config_file: Some(file.path().to_str().unwrap().to_string()),
672 http_addr: Some("127.0.0.1:14000".to_string()),
673 env_prefix: env_prefix.to_string(),
674 ..Default::default()
675 };
676
677 let fe_opts = command.load_options(&Default::default()).unwrap().component;
678
679 assert_eq!(fe_opts.mysql.runtime_size, 11);
681 assert_eq!(
682 fe_opts.meta_client.unwrap().metasrv_addrs,
683 vec![
684 "127.0.0.1:3001".to_string(),
685 "127.0.0.1:3002".to_string(),
686 "127.0.0.1:3003".to_string()
687 ]
688 );
689
690 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
692
693 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
695
696 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
698 },
699 );
700 }
701}