1use std::path::Path;
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
21use catalog::information_extension::DistributedInformationExtension;
22use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
23use catalog::process_manager::ProcessManager;
24use clap::Parser;
25use client::client_manager::NodeClients;
26use common_base::Plugins;
27use common_config::{Configurable, DEFAULT_DATA_HOME};
28use common_grpc::channel_manager::ChannelConfig;
29use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
30use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
31use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
32use common_meta::heartbeat::handler::HandlerGroupExecutor;
33use common_telemetry::info;
34use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
35use common_time::timezone::set_default_timezone;
36use common_version::{short_version, version};
37use frontend::frontend::Frontend;
38use frontend::heartbeat::HeartbeatTask;
39use frontend::instance::builder::FrontendBuilder;
40use frontend::server::Services;
41use meta_client::{MetaClientOptions, MetaClientType};
42use servers::addrs;
43use servers::export_metrics::ExportMetricsTask;
44use servers::tls::{TlsMode, TlsOption};
45use snafu::{OptionExt, ResultExt};
46use tracing_appender::non_blocking::WorkerGuard;
47
48use crate::error::{self, Result};
49use crate::options::{GlobalOptions, GreptimeOptions};
50use crate::{create_resource_limit_metrics, log_versions, App};
51
52type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
53
54pub struct Instance {
55 frontend: Frontend,
56 _guard: Vec<WorkerGuard>,
58}
59
60pub const APP_NAME: &str = "greptime-frontend";
61
62impl Instance {
63 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
64 Self { frontend, _guard }
65 }
66
67 pub fn inner(&self) -> &Frontend {
68 &self.frontend
69 }
70
71 pub fn mut_inner(&mut self) -> &mut Frontend {
72 &mut self.frontend
73 }
74}
75
76#[async_trait]
77impl App for Instance {
78 fn name(&self) -> &str {
79 APP_NAME
80 }
81
82 async fn start(&mut self) -> Result<()> {
83 let plugins = self.frontend.instance.plugins().clone();
84 plugins::start_frontend_plugins(plugins)
85 .await
86 .context(error::StartFrontendSnafu)?;
87
88 self.frontend
89 .start()
90 .await
91 .context(error::StartFrontendSnafu)
92 }
93
94 async fn stop(&mut self) -> Result<()> {
95 self.frontend
96 .shutdown()
97 .await
98 .context(error::ShutdownFrontendSnafu)
99 }
100}
101
102#[derive(Parser)]
103pub struct Command {
104 #[clap(subcommand)]
105 subcmd: SubCommand,
106}
107
108impl Command {
109 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
110 self.subcmd.build(opts).await
111 }
112
113 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
114 self.subcmd.load_options(global_options)
115 }
116}
117
118#[derive(Parser)]
119enum SubCommand {
120 Start(StartCommand),
121}
122
123impl SubCommand {
124 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
125 match self {
126 SubCommand::Start(cmd) => cmd.build(opts).await,
127 }
128 }
129
130 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
131 match self {
132 SubCommand::Start(cmd) => cmd.load_options(global_options),
133 }
134 }
135}
136
137#[derive(Debug, Default, Parser)]
138pub struct StartCommand {
139 #[clap(long, alias = "rpc-addr")]
141 rpc_bind_addr: Option<String>,
142 #[clap(long, alias = "rpc-hostname")]
146 rpc_server_addr: Option<String>,
147 #[clap(long)]
148 http_addr: Option<String>,
149 #[clap(long)]
150 http_timeout: Option<u64>,
151 #[clap(long)]
152 mysql_addr: Option<String>,
153 #[clap(long)]
154 postgres_addr: Option<String>,
155 #[clap(short, long)]
156 config_file: Option<String>,
157 #[clap(short, long)]
158 influxdb_enable: Option<bool>,
159 #[clap(long, value_delimiter = ',', num_args = 1..)]
160 metasrv_addrs: Option<Vec<String>>,
161 #[clap(long)]
162 tls_mode: Option<TlsMode>,
163 #[clap(long)]
164 tls_cert_path: Option<String>,
165 #[clap(long)]
166 tls_key_path: Option<String>,
167 #[clap(long)]
168 user_provider: Option<String>,
169 #[clap(long)]
170 disable_dashboard: Option<bool>,
171 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
172 env_prefix: String,
173}
174
175impl StartCommand {
176 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
177 let mut opts = FrontendOptions::load_layered_options(
178 self.config_file.as_deref(),
179 self.env_prefix.as_ref(),
180 )
181 .context(error::LoadLayeredConfigSnafu)?;
182
183 self.merge_with_cli_options(global_options, &mut opts)?;
184
185 Ok(opts)
186 }
187
188 fn merge_with_cli_options(
190 &self,
191 global_options: &GlobalOptions,
192 opts: &mut FrontendOptions,
193 ) -> Result<()> {
194 let opts = &mut opts.component;
195
196 if let Some(dir) = &global_options.log_dir {
197 opts.logging.dir.clone_from(dir);
198 }
199
200 if opts.logging.dir.is_empty() {
202 opts.logging.dir = Path::new(DEFAULT_DATA_HOME)
203 .join(DEFAULT_LOGGING_DIR)
204 .to_string_lossy()
205 .to_string();
206 }
207
208 if global_options.log_level.is_some() {
209 opts.logging.level.clone_from(&global_options.log_level);
210 }
211
212 opts.tracing = TracingOptions {
213 #[cfg(feature = "tokio-console")]
214 tokio_console_addr: global_options.tokio_console_addr.clone(),
215 };
216
217 let tls_opts = TlsOption::new(
218 self.tls_mode.clone(),
219 self.tls_cert_path.clone(),
220 self.tls_key_path.clone(),
221 );
222
223 if let Some(addr) = &self.http_addr {
224 opts.http.addr.clone_from(addr);
225 }
226
227 if let Some(http_timeout) = self.http_timeout {
228 opts.http.timeout = Duration::from_secs(http_timeout)
229 }
230
231 if let Some(disable_dashboard) = self.disable_dashboard {
232 opts.http.disable_dashboard = disable_dashboard;
233 }
234
235 if let Some(addr) = &self.rpc_bind_addr {
236 opts.grpc.bind_addr.clone_from(addr);
237 opts.grpc.tls = tls_opts.clone();
238 }
239
240 if let Some(addr) = &self.rpc_server_addr {
241 opts.grpc.server_addr.clone_from(addr);
242 }
243
244 if let Some(addr) = &self.mysql_addr {
245 opts.mysql.enable = true;
246 opts.mysql.addr.clone_from(addr);
247 opts.mysql.tls = tls_opts.clone();
248 }
249
250 if let Some(addr) = &self.postgres_addr {
251 opts.postgres.enable = true;
252 opts.postgres.addr.clone_from(addr);
253 opts.postgres.tls = tls_opts;
254 }
255
256 if let Some(enable) = self.influxdb_enable {
257 opts.influxdb.enable = enable;
258 }
259
260 if let Some(metasrv_addrs) = &self.metasrv_addrs {
261 opts.meta_client
262 .get_or_insert_with(MetaClientOptions::default)
263 .metasrv_addrs
264 .clone_from(metasrv_addrs);
265 }
266
267 if let Some(user_provider) = &self.user_provider {
268 opts.user_provider = Some(user_provider.clone());
269 }
270
271 Ok(())
272 }
273
274 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
275 common_runtime::init_global_runtimes(&opts.runtime);
276
277 let guard = common_telemetry::init_global_logging(
278 APP_NAME,
279 &opts.component.logging,
280 &opts.component.tracing,
281 opts.component.node_id.clone(),
282 opts.component.slow_query.as_ref(),
283 );
284
285 log_versions(version(), short_version(), APP_NAME);
286 create_resource_limit_metrics(APP_NAME);
287
288 info!("Frontend start command: {:#?}", self);
289 info!("Frontend options: {:#?}", opts);
290
291 let plugin_opts = opts.plugins;
292 let mut opts = opts.component;
293 opts.grpc.detect_server_addr();
294 let mut plugins = Plugins::new();
295 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
296 .await
297 .context(error::StartFrontendSnafu)?;
298
299 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
300
301 let meta_client_options = opts
302 .meta_client
303 .as_ref()
304 .context(error::MissingConfigSnafu {
305 msg: "'meta_client'",
306 })?;
307
308 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
309 let cache_ttl = meta_client_options.metadata_cache_ttl;
310 let cache_tti = meta_client_options.metadata_cache_tti;
311
312 let meta_client = meta_client::create_meta_client(
313 MetaClientType::Frontend,
314 meta_client_options,
315 Some(&plugins),
316 None,
317 )
318 .await
319 .context(error::MetaClientInitSnafu)?;
320
321 let cached_meta_backend =
323 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
324 .cache_max_capacity(cache_max_capacity)
325 .cache_ttl(cache_ttl)
326 .cache_tti(cache_tti)
327 .build();
328 let cached_meta_backend = Arc::new(cached_meta_backend);
329
330 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
332 CacheRegistryBuilder::default()
333 .add_cache(cached_meta_backend.clone())
334 .build(),
335 );
336 let fundamental_cache_registry =
337 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
338 let layered_cache_registry = Arc::new(
339 with_default_composite_cache_registry(
340 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
341 )
342 .context(error::BuildCacheRegistrySnafu)?
343 .build(),
344 );
345
346 let information_extension =
347 Arc::new(DistributedInformationExtension::new(meta_client.clone()));
348
349 let process_manager = Arc::new(ProcessManager::new(
350 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
351 Some(meta_client.clone()),
352 ));
353 let catalog_manager = KvBackendCatalogManager::new(
354 information_extension,
355 cached_meta_backend.clone(),
356 layered_cache_registry.clone(),
357 None,
358 Some(process_manager.clone()),
359 );
360
361 let executor = HandlerGroupExecutor::new(vec![
362 Arc::new(ParseMailboxMessageHandler),
363 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
364 ]);
365
366 let heartbeat_task = HeartbeatTask::new(
367 &opts,
368 meta_client.clone(),
369 opts.heartbeat.clone(),
370 Arc::new(executor),
371 );
372 let heartbeat_task = Some(heartbeat_task);
373
374 let mut channel_config = ChannelConfig {
377 timeout: None,
378 tcp_nodelay: opts.datanode.client.tcp_nodelay,
379 connect_timeout: Some(opts.datanode.client.connect_timeout),
380 ..Default::default()
381 };
382 if opts.grpc.flight_compression.transport_compression() {
383 channel_config.accept_compression = true;
384 channel_config.send_compression = true;
385 }
386 let client = NodeClients::new(channel_config);
387
388 let instance = FrontendBuilder::new(
389 opts.clone(),
390 cached_meta_backend.clone(),
391 layered_cache_registry.clone(),
392 catalog_manager,
393 Arc::new(client),
394 meta_client,
395 process_manager,
396 )
397 .with_plugin(plugins.clone())
398 .with_local_cache_invalidator(layered_cache_registry)
399 .try_build()
400 .await
401 .context(error::StartFrontendSnafu)?;
402 let instance = Arc::new(instance);
403
404 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
405 .context(error::ServersSnafu)?;
406
407 let servers = Services::new(opts, instance.clone(), plugins)
408 .build()
409 .context(error::StartFrontendSnafu)?;
410
411 let frontend = Frontend {
412 instance,
413 servers,
414 heartbeat_task,
415 export_metrics_task,
416 };
417
418 Ok(Instance::new(frontend, guard))
419 }
420}
421
422#[cfg(test)]
423mod tests {
424 use std::io::Write;
425 use std::time::Duration;
426
427 use auth::{Identity, Password, UserProviderRef};
428 use common_base::readable_size::ReadableSize;
429 use common_config::ENV_VAR_SEP;
430 use common_test_util::temp_dir::create_named_temp_file;
431 use servers::grpc::GrpcOptions;
432 use servers::http::HttpOptions;
433
434 use super::*;
435 use crate::options::GlobalOptions;
436
437 #[test]
438 fn test_try_from_start_command() {
439 let command = StartCommand {
440 http_addr: Some("127.0.0.1:1234".to_string()),
441 mysql_addr: Some("127.0.0.1:5678".to_string()),
442 postgres_addr: Some("127.0.0.1:5432".to_string()),
443 influxdb_enable: Some(false),
444 disable_dashboard: Some(false),
445 ..Default::default()
446 };
447
448 let opts = command.load_options(&Default::default()).unwrap().component;
449
450 assert_eq!(opts.http.addr, "127.0.0.1:1234");
451 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
452 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
453 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
454
455 let default_opts = FrontendOptions::default().component;
456
457 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
458 assert!(opts.mysql.enable);
459 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
460 assert!(opts.postgres.enable);
461 assert_eq!(
462 opts.postgres.runtime_size,
463 default_opts.postgres.runtime_size
464 );
465 assert!(opts.opentsdb.enable);
466
467 assert!(!opts.influxdb.enable);
468 }
469
470 #[test]
471 fn test_read_from_config_file() {
472 let mut file = create_named_temp_file();
473 let toml_str = r#"
474 [http]
475 addr = "127.0.0.1:4000"
476 timeout = "0s"
477 body_limit = "2GB"
478
479 [opentsdb]
480 enable = false
481
482 [logging]
483 level = "debug"
484 dir = "./greptimedb_data/test/logs"
485 "#;
486 write!(file, "{}", toml_str).unwrap();
487
488 let command = StartCommand {
489 config_file: Some(file.path().to_str().unwrap().to_string()),
490 disable_dashboard: Some(false),
491 ..Default::default()
492 };
493
494 let fe_opts = command.load_options(&Default::default()).unwrap().component;
495
496 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
497 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
498
499 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
500
501 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
502 assert_eq!(
503 "./greptimedb_data/test/logs".to_string(),
504 fe_opts.logging.dir
505 );
506 assert!(!fe_opts.opentsdb.enable);
507 }
508
509 #[tokio::test]
510 async fn test_try_from_start_command_to_anymap() {
511 let fe_opts = frontend::frontend::FrontendOptions {
512 http: HttpOptions {
513 disable_dashboard: false,
514 ..Default::default()
515 },
516 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
517 ..Default::default()
518 };
519
520 let mut plugins = Plugins::new();
521 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
522 .await
523 .unwrap();
524
525 let provider = plugins.get::<UserProviderRef>().unwrap();
526 let result = provider
527 .authenticate(
528 Identity::UserId("test", None),
529 Password::PlainText("test".to_string().into()),
530 )
531 .await;
532 let _ = result.unwrap();
533 }
534
535 #[test]
536 fn test_load_log_options_from_cli() {
537 let cmd = StartCommand {
538 disable_dashboard: Some(false),
539 ..Default::default()
540 };
541
542 let options = cmd
543 .load_options(&GlobalOptions {
544 log_dir: Some("./greptimedb_data/test/logs".to_string()),
545 log_level: Some("debug".to_string()),
546
547 #[cfg(feature = "tokio-console")]
548 tokio_console_addr: None,
549 })
550 .unwrap()
551 .component;
552
553 let logging_opt = options.logging;
554 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
555 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
556 }
557
558 #[test]
559 fn test_config_precedence_order() {
560 let mut file = create_named_temp_file();
561 let toml_str = r#"
562 [http]
563 addr = "127.0.0.1:4000"
564
565 [meta_client]
566 timeout = "3s"
567 connect_timeout = "5s"
568 tcp_nodelay = true
569
570 [mysql]
571 addr = "127.0.0.1:4002"
572 "#;
573 write!(file, "{}", toml_str).unwrap();
574
575 let env_prefix = "FRONTEND_UT";
576 temp_env::with_vars(
577 [
578 (
579 [
581 env_prefix.to_string(),
582 "mysql".to_uppercase(),
583 "addr".to_uppercase(),
584 ]
585 .join(ENV_VAR_SEP),
586 Some("127.0.0.1:14002"),
587 ),
588 (
589 [
591 env_prefix.to_string(),
592 "mysql".to_uppercase(),
593 "runtime_size".to_uppercase(),
594 ]
595 .join(ENV_VAR_SEP),
596 Some("11"),
597 ),
598 (
599 [
601 env_prefix.to_string(),
602 "http".to_uppercase(),
603 "addr".to_uppercase(),
604 ]
605 .join(ENV_VAR_SEP),
606 Some("127.0.0.1:24000"),
607 ),
608 (
609 [
611 env_prefix.to_string(),
612 "meta_client".to_uppercase(),
613 "metasrv_addrs".to_uppercase(),
614 ]
615 .join(ENV_VAR_SEP),
616 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
617 ),
618 ],
619 || {
620 let command = StartCommand {
621 config_file: Some(file.path().to_str().unwrap().to_string()),
622 http_addr: Some("127.0.0.1:14000".to_string()),
623 env_prefix: env_prefix.to_string(),
624 ..Default::default()
625 };
626
627 let fe_opts = command.load_options(&Default::default()).unwrap().component;
628
629 assert_eq!(fe_opts.mysql.runtime_size, 11);
631 assert_eq!(
632 fe_opts.meta_client.unwrap().metasrv_addrs,
633 vec![
634 "127.0.0.1:3001".to_string(),
635 "127.0.0.1:3002".to_string(),
636 "127.0.0.1:3003".to_string()
637 ]
638 );
639
640 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
642
643 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
645
646 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
648 },
649 );
650 }
651}