1use std::sync::Arc;
16use std::time::Duration;
17
18use async_trait::async_trait;
19use cache::{build_fundamental_cache_registry, with_default_composite_cache_registry};
20use catalog::information_extension::DistributedInformationExtension;
21use catalog::kvbackend::{CachedKvBackendBuilder, KvBackendCatalogManager, MetaKvBackend};
22use clap::Parser;
23use client::client_manager::NodeClients;
24use common_base::Plugins;
25use common_config::Configurable;
26use common_grpc::channel_manager::ChannelConfig;
27use common_meta::cache::{CacheRegistryBuilder, LayeredCacheRegistryBuilder};
28use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
29use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
30use common_meta::heartbeat::handler::HandlerGroupExecutor;
31use common_telemetry::info;
32use common_telemetry::logging::TracingOptions;
33use common_time::timezone::set_default_timezone;
34use common_version::{short_version, version};
35use frontend::frontend::Frontend;
36use frontend::heartbeat::HeartbeatTask;
37use frontend::instance::builder::FrontendBuilder;
38use frontend::server::Services;
39use meta_client::{MetaClientOptions, MetaClientType};
40use servers::export_metrics::ExportMetricsTask;
41use servers::tls::{TlsMode, TlsOption};
42use snafu::{OptionExt, ResultExt};
43use tracing_appender::non_blocking::WorkerGuard;
44
45use crate::error::{self, Result};
46use crate::options::{GlobalOptions, GreptimeOptions};
47use crate::{log_versions, App};
48
49type FrontendOptions = GreptimeOptions<frontend::frontend::FrontendOptions>;
50
51pub struct Instance {
52 frontend: Frontend,
53 _guard: Vec<WorkerGuard>,
55}
56
57pub const APP_NAME: &str = "greptime-frontend";
58
59impl Instance {
60 pub fn new(frontend: Frontend, _guard: Vec<WorkerGuard>) -> Self {
61 Self { frontend, _guard }
62 }
63
64 pub fn inner(&self) -> &Frontend {
65 &self.frontend
66 }
67
68 pub fn mut_inner(&mut self) -> &mut Frontend {
69 &mut self.frontend
70 }
71}
72
73#[async_trait]
74impl App for Instance {
75 fn name(&self) -> &str {
76 APP_NAME
77 }
78
79 async fn start(&mut self) -> Result<()> {
80 let plugins = self.frontend.instance.plugins().clone();
81 plugins::start_frontend_plugins(plugins)
82 .await
83 .context(error::StartFrontendSnafu)?;
84
85 self.frontend
86 .start()
87 .await
88 .context(error::StartFrontendSnafu)
89 }
90
91 async fn stop(&mut self) -> Result<()> {
92 self.frontend
93 .shutdown()
94 .await
95 .context(error::ShutdownFrontendSnafu)
96 }
97}
98
99#[derive(Parser)]
100pub struct Command {
101 #[clap(subcommand)]
102 subcmd: SubCommand,
103}
104
105impl Command {
106 pub async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
107 self.subcmd.build(opts).await
108 }
109
110 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
111 self.subcmd.load_options(global_options)
112 }
113}
114
115#[derive(Parser)]
116enum SubCommand {
117 Start(StartCommand),
118}
119
120impl SubCommand {
121 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
122 match self {
123 SubCommand::Start(cmd) => cmd.build(opts).await,
124 }
125 }
126
127 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
128 match self {
129 SubCommand::Start(cmd) => cmd.load_options(global_options),
130 }
131 }
132}
133
134#[derive(Debug, Default, Parser)]
135pub struct StartCommand {
136 #[clap(long, alias = "rpc-addr")]
138 rpc_bind_addr: Option<String>,
139 #[clap(long, alias = "rpc-hostname")]
143 rpc_server_addr: Option<String>,
144 #[clap(long)]
145 http_addr: Option<String>,
146 #[clap(long)]
147 http_timeout: Option<u64>,
148 #[clap(long)]
149 mysql_addr: Option<String>,
150 #[clap(long)]
151 postgres_addr: Option<String>,
152 #[clap(short, long)]
153 config_file: Option<String>,
154 #[clap(short, long)]
155 influxdb_enable: Option<bool>,
156 #[clap(long, value_delimiter = ',', num_args = 1..)]
157 metasrv_addrs: Option<Vec<String>>,
158 #[clap(long)]
159 tls_mode: Option<TlsMode>,
160 #[clap(long)]
161 tls_cert_path: Option<String>,
162 #[clap(long)]
163 tls_key_path: Option<String>,
164 #[clap(long)]
165 user_provider: Option<String>,
166 #[clap(long)]
167 disable_dashboard: Option<bool>,
168 #[clap(long, default_value = "GREPTIMEDB_FRONTEND")]
169 env_prefix: String,
170}
171
172impl StartCommand {
173 fn load_options(&self, global_options: &GlobalOptions) -> Result<FrontendOptions> {
174 let mut opts = FrontendOptions::load_layered_options(
175 self.config_file.as_deref(),
176 self.env_prefix.as_ref(),
177 )
178 .context(error::LoadLayeredConfigSnafu)?;
179
180 self.merge_with_cli_options(global_options, &mut opts)?;
181
182 Ok(opts)
183 }
184
185 fn merge_with_cli_options(
187 &self,
188 global_options: &GlobalOptions,
189 opts: &mut FrontendOptions,
190 ) -> Result<()> {
191 let opts = &mut opts.component;
192
193 if let Some(dir) = &global_options.log_dir {
194 opts.logging.dir.clone_from(dir);
195 }
196
197 if global_options.log_level.is_some() {
198 opts.logging.level.clone_from(&global_options.log_level);
199 }
200
201 opts.tracing = TracingOptions {
202 #[cfg(feature = "tokio-console")]
203 tokio_console_addr: global_options.tokio_console_addr.clone(),
204 };
205
206 let tls_opts = TlsOption::new(
207 self.tls_mode.clone(),
208 self.tls_cert_path.clone(),
209 self.tls_key_path.clone(),
210 );
211
212 if let Some(addr) = &self.http_addr {
213 opts.http.addr.clone_from(addr);
214 }
215
216 if let Some(http_timeout) = self.http_timeout {
217 opts.http.timeout = Duration::from_secs(http_timeout)
218 }
219
220 if let Some(disable_dashboard) = self.disable_dashboard {
221 opts.http.disable_dashboard = disable_dashboard;
222 }
223
224 if let Some(addr) = &self.rpc_bind_addr {
225 opts.grpc.bind_addr.clone_from(addr);
226 opts.grpc.tls = tls_opts.clone();
227 }
228
229 if let Some(addr) = &self.rpc_server_addr {
230 opts.grpc.server_addr.clone_from(addr);
231 }
232
233 if let Some(addr) = &self.mysql_addr {
234 opts.mysql.enable = true;
235 opts.mysql.addr.clone_from(addr);
236 opts.mysql.tls = tls_opts.clone();
237 }
238
239 if let Some(addr) = &self.postgres_addr {
240 opts.postgres.enable = true;
241 opts.postgres.addr.clone_from(addr);
242 opts.postgres.tls = tls_opts;
243 }
244
245 if let Some(enable) = self.influxdb_enable {
246 opts.influxdb.enable = enable;
247 }
248
249 if let Some(metasrv_addrs) = &self.metasrv_addrs {
250 opts.meta_client
251 .get_or_insert_with(MetaClientOptions::default)
252 .metasrv_addrs
253 .clone_from(metasrv_addrs);
254 }
255
256 if let Some(user_provider) = &self.user_provider {
257 opts.user_provider = Some(user_provider.clone());
258 }
259
260 Ok(())
261 }
262
263 async fn build(&self, opts: FrontendOptions) -> Result<Instance> {
264 common_runtime::init_global_runtimes(&opts.runtime);
265
266 let guard = common_telemetry::init_global_logging(
267 APP_NAME,
268 &opts.component.logging,
269 &opts.component.tracing,
270 opts.component.node_id.clone(),
271 opts.component.slow_query.as_ref(),
272 );
273 log_versions(version(), short_version(), APP_NAME);
274
275 info!("Frontend start command: {:#?}", self);
276 info!("Frontend options: {:#?}", opts);
277
278 let plugin_opts = opts.plugins;
279 let mut opts = opts.component;
280 opts.grpc.detect_server_addr();
281 let mut plugins = Plugins::new();
282 plugins::setup_frontend_plugins(&mut plugins, &plugin_opts, &opts)
283 .await
284 .context(error::StartFrontendSnafu)?;
285
286 set_default_timezone(opts.default_timezone.as_deref()).context(error::InitTimezoneSnafu)?;
287
288 let meta_client_options = opts
289 .meta_client
290 .as_ref()
291 .context(error::MissingConfigSnafu {
292 msg: "'meta_client'",
293 })?;
294
295 let cache_max_capacity = meta_client_options.metadata_cache_max_capacity;
296 let cache_ttl = meta_client_options.metadata_cache_ttl;
297 let cache_tti = meta_client_options.metadata_cache_tti;
298
299 let meta_client = meta_client::create_meta_client(
300 MetaClientType::Frontend,
301 meta_client_options,
302 Some(&plugins),
303 )
304 .await
305 .context(error::MetaClientInitSnafu)?;
306
307 let cached_meta_backend =
309 CachedKvBackendBuilder::new(Arc::new(MetaKvBackend::new(meta_client.clone())))
310 .cache_max_capacity(cache_max_capacity)
311 .cache_ttl(cache_ttl)
312 .cache_tti(cache_tti)
313 .build();
314 let cached_meta_backend = Arc::new(cached_meta_backend);
315
316 let layered_cache_builder = LayeredCacheRegistryBuilder::default().add_cache_registry(
318 CacheRegistryBuilder::default()
319 .add_cache(cached_meta_backend.clone())
320 .build(),
321 );
322 let fundamental_cache_registry =
323 build_fundamental_cache_registry(Arc::new(MetaKvBackend::new(meta_client.clone())));
324 let layered_cache_registry = Arc::new(
325 with_default_composite_cache_registry(
326 layered_cache_builder.add_cache_registry(fundamental_cache_registry),
327 )
328 .context(error::BuildCacheRegistrySnafu)?
329 .build(),
330 );
331
332 let information_extension =
333 Arc::new(DistributedInformationExtension::new(meta_client.clone()));
334 let catalog_manager = KvBackendCatalogManager::new(
335 information_extension,
336 cached_meta_backend.clone(),
337 layered_cache_registry.clone(),
338 None,
339 );
340
341 let executor = HandlerGroupExecutor::new(vec![
342 Arc::new(ParseMailboxMessageHandler),
343 Arc::new(InvalidateCacheHandler::new(layered_cache_registry.clone())),
344 ]);
345
346 let heartbeat_task = HeartbeatTask::new(
347 &opts,
348 meta_client.clone(),
349 opts.heartbeat.clone(),
350 Arc::new(executor),
351 );
352 let heartbeat_task = Some(heartbeat_task);
353
354 let channel_config = ChannelConfig {
357 timeout: None,
358 tcp_nodelay: opts.datanode.client.tcp_nodelay,
359 connect_timeout: Some(opts.datanode.client.connect_timeout),
360 ..Default::default()
361 };
362 let client = NodeClients::new(channel_config);
363
364 let instance = FrontendBuilder::new(
365 opts.clone(),
366 cached_meta_backend.clone(),
367 layered_cache_registry.clone(),
368 catalog_manager,
369 Arc::new(client),
370 meta_client,
371 )
372 .with_plugin(plugins.clone())
373 .with_local_cache_invalidator(layered_cache_registry)
374 .try_build()
375 .await
376 .context(error::StartFrontendSnafu)?;
377 let instance = Arc::new(instance);
378
379 let export_metrics_task = ExportMetricsTask::try_new(&opts.export_metrics, Some(&plugins))
380 .context(error::ServersSnafu)?;
381
382 let servers = Services::new(opts, instance.clone(), plugins)
383 .build()
384 .context(error::StartFrontendSnafu)?;
385
386 let frontend = Frontend {
387 instance,
388 servers,
389 heartbeat_task,
390 export_metrics_task,
391 };
392
393 Ok(Instance::new(frontend, guard))
394 }
395}
396
397#[cfg(test)]
398mod tests {
399 use std::io::Write;
400 use std::time::Duration;
401
402 use auth::{Identity, Password, UserProviderRef};
403 use common_base::readable_size::ReadableSize;
404 use common_config::ENV_VAR_SEP;
405 use common_test_util::temp_dir::create_named_temp_file;
406 use servers::grpc::GrpcOptions;
407 use servers::http::HttpOptions;
408
409 use super::*;
410 use crate::options::GlobalOptions;
411
412 #[test]
413 fn test_try_from_start_command() {
414 let command = StartCommand {
415 http_addr: Some("127.0.0.1:1234".to_string()),
416 mysql_addr: Some("127.0.0.1:5678".to_string()),
417 postgres_addr: Some("127.0.0.1:5432".to_string()),
418 influxdb_enable: Some(false),
419 disable_dashboard: Some(false),
420 ..Default::default()
421 };
422
423 let opts = command.load_options(&Default::default()).unwrap().component;
424
425 assert_eq!(opts.http.addr, "127.0.0.1:1234");
426 assert_eq!(ReadableSize::mb(64), opts.http.body_limit);
427 assert_eq!(opts.mysql.addr, "127.0.0.1:5678");
428 assert_eq!(opts.postgres.addr, "127.0.0.1:5432");
429
430 let default_opts = FrontendOptions::default().component;
431
432 assert_eq!(opts.grpc.bind_addr, default_opts.grpc.bind_addr);
433 assert!(opts.mysql.enable);
434 assert_eq!(opts.mysql.runtime_size, default_opts.mysql.runtime_size);
435 assert!(opts.postgres.enable);
436 assert_eq!(
437 opts.postgres.runtime_size,
438 default_opts.postgres.runtime_size
439 );
440 assert!(opts.opentsdb.enable);
441
442 assert!(!opts.influxdb.enable);
443 }
444
445 #[test]
446 fn test_read_from_config_file() {
447 let mut file = create_named_temp_file();
448 let toml_str = r#"
449 [http]
450 addr = "127.0.0.1:4000"
451 timeout = "0s"
452 body_limit = "2GB"
453
454 [opentsdb]
455 enable = false
456
457 [logging]
458 level = "debug"
459 dir = "./greptimedb_data/test/logs"
460 "#;
461 write!(file, "{}", toml_str).unwrap();
462
463 let command = StartCommand {
464 config_file: Some(file.path().to_str().unwrap().to_string()),
465 disable_dashboard: Some(false),
466 ..Default::default()
467 };
468
469 let fe_opts = command.load_options(&Default::default()).unwrap().component;
470
471 assert_eq!("127.0.0.1:4000".to_string(), fe_opts.http.addr);
472 assert_eq!(Duration::from_secs(0), fe_opts.http.timeout);
473
474 assert_eq!(ReadableSize::gb(2), fe_opts.http.body_limit);
475
476 assert_eq!("debug", fe_opts.logging.level.as_ref().unwrap());
477 assert_eq!(
478 "./greptimedb_data/test/logs".to_string(),
479 fe_opts.logging.dir
480 );
481 assert!(!fe_opts.opentsdb.enable);
482 }
483
484 #[tokio::test]
485 async fn test_try_from_start_command_to_anymap() {
486 let fe_opts = frontend::frontend::FrontendOptions {
487 http: HttpOptions {
488 disable_dashboard: false,
489 ..Default::default()
490 },
491 user_provider: Some("static_user_provider:cmd:test=test".to_string()),
492 ..Default::default()
493 };
494
495 let mut plugins = Plugins::new();
496 plugins::setup_frontend_plugins(&mut plugins, &[], &fe_opts)
497 .await
498 .unwrap();
499
500 let provider = plugins.get::<UserProviderRef>().unwrap();
501 let result = provider
502 .authenticate(
503 Identity::UserId("test", None),
504 Password::PlainText("test".to_string().into()),
505 )
506 .await;
507 let _ = result.unwrap();
508 }
509
510 #[test]
511 fn test_load_log_options_from_cli() {
512 let cmd = StartCommand {
513 disable_dashboard: Some(false),
514 ..Default::default()
515 };
516
517 let options = cmd
518 .load_options(&GlobalOptions {
519 log_dir: Some("./greptimedb_data/test/logs".to_string()),
520 log_level: Some("debug".to_string()),
521
522 #[cfg(feature = "tokio-console")]
523 tokio_console_addr: None,
524 })
525 .unwrap()
526 .component;
527
528 let logging_opt = options.logging;
529 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
530 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
531 }
532
533 #[test]
534 fn test_config_precedence_order() {
535 let mut file = create_named_temp_file();
536 let toml_str = r#"
537 [http]
538 addr = "127.0.0.1:4000"
539
540 [meta_client]
541 timeout = "3s"
542 connect_timeout = "5s"
543 tcp_nodelay = true
544
545 [mysql]
546 addr = "127.0.0.1:4002"
547 "#;
548 write!(file, "{}", toml_str).unwrap();
549
550 let env_prefix = "FRONTEND_UT";
551 temp_env::with_vars(
552 [
553 (
554 [
556 env_prefix.to_string(),
557 "mysql".to_uppercase(),
558 "addr".to_uppercase(),
559 ]
560 .join(ENV_VAR_SEP),
561 Some("127.0.0.1:14002"),
562 ),
563 (
564 [
566 env_prefix.to_string(),
567 "mysql".to_uppercase(),
568 "runtime_size".to_uppercase(),
569 ]
570 .join(ENV_VAR_SEP),
571 Some("11"),
572 ),
573 (
574 [
576 env_prefix.to_string(),
577 "http".to_uppercase(),
578 "addr".to_uppercase(),
579 ]
580 .join(ENV_VAR_SEP),
581 Some("127.0.0.1:24000"),
582 ),
583 (
584 [
586 env_prefix.to_string(),
587 "meta_client".to_uppercase(),
588 "metasrv_addrs".to_uppercase(),
589 ]
590 .join(ENV_VAR_SEP),
591 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
592 ),
593 ],
594 || {
595 let command = StartCommand {
596 config_file: Some(file.path().to_str().unwrap().to_string()),
597 http_addr: Some("127.0.0.1:14000".to_string()),
598 env_prefix: env_prefix.to_string(),
599 ..Default::default()
600 };
601
602 let fe_opts = command.load_options(&Default::default()).unwrap().component;
603
604 assert_eq!(fe_opts.mysql.runtime_size, 11);
606 assert_eq!(
607 fe_opts.meta_client.unwrap().metasrv_addrs,
608 vec![
609 "127.0.0.1:3001".to_string(),
610 "127.0.0.1:3002".to_string(),
611 "127.0.0.1:3003".to_string()
612 ]
613 );
614
615 assert_eq!(fe_opts.mysql.addr, "127.0.0.1:4002");
617
618 assert_eq!(fe_opts.http.addr, "127.0.0.1:14000");
620
621 assert_eq!(fe_opts.grpc.bind_addr, GrpcOptions::default().bind_addr);
623 },
624 );
625 }
626}