1use std::fmt::{self, Debug};
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_meta::distributed_time_constants::init_distributed_time_constants;
24use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
25use common_telemetry::{info, warn};
26use common_version::{short_version, verbose_version};
27use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
28use meta_srv::metasrv::BackendImpl;
29use snafu::ResultExt;
30use tracing_appender::non_blocking::WorkerGuard;
31
32use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
33use crate::options::{GlobalOptions, GreptimeOptions};
34use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
35
36type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
37
38pub const APP_NAME: &str = "greptime-metasrv";
39
40pub struct Instance {
41 instance: MetasrvInstance,
42
43 _guard: Vec<WorkerGuard>,
45}
46
47impl Instance {
48 pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
49 Self {
50 instance,
51 _guard: guard,
52 }
53 }
54
55 pub fn get_inner(&self) -> &MetasrvInstance {
56 &self.instance
57 }
58
59 pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
60 &mut self.instance
61 }
62}
63
64#[async_trait]
65impl App for Instance {
66 fn name(&self) -> &str {
67 APP_NAME
68 }
69
70 async fn start(&mut self) -> Result<()> {
71 plugins::start_metasrv_plugins(self.instance.plugins())
72 .await
73 .context(StartMetaServerSnafu)?;
74
75 self.instance.start().await.context(StartMetaServerSnafu)
76 }
77
78 async fn stop(&mut self) -> Result<()> {
79 self.instance
80 .shutdown()
81 .await
82 .context(error::ShutdownMetaServerSnafu)
83 }
84}
85
86#[derive(Parser)]
87pub struct Command {
88 #[clap(subcommand)]
89 subcmd: SubCommand,
90}
91
92impl Command {
93 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
94 self.subcmd.build(opts).await
95 }
96
97 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
98 self.subcmd.load_options(global_options)
99 }
100
101 pub fn config_file(&self) -> &Option<String> {
102 self.subcmd.config_file()
103 }
104
105 pub fn env_prefix(&self) -> &String {
106 self.subcmd.env_prefix()
107 }
108}
109
110#[derive(Parser)]
111enum SubCommand {
112 Start(StartCommand),
113}
114
115impl SubCommand {
116 async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
117 match self {
118 SubCommand::Start(cmd) => cmd.build(opts).await,
119 }
120 }
121
122 fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
123 match self {
124 SubCommand::Start(cmd) => cmd.load_options(global_options),
125 }
126 }
127
128 fn config_file(&self) -> &Option<String> {
129 match self {
130 SubCommand::Start(cmd) => &cmd.config_file,
131 }
132 }
133
134 fn env_prefix(&self) -> &String {
135 match self {
136 SubCommand::Start(cmd) => &cmd.env_prefix,
137 }
138 }
139}
140
141#[derive(Default, Parser)]
142pub struct StartCommand {
143 #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "bind-addr")]
145 grpc_bind_addr: Option<String>,
146 #[clap(
150 long = "grpc-server-addr",
151 alias = "rpc-server-addr",
152 alias = "server-addr"
153 )]
154 grpc_server_addr: Option<String>,
155 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
156 store_addrs: Option<Vec<String>>,
157 #[clap(short, long)]
158 config_file: Option<String>,
159 #[clap(short, long)]
160 selector: Option<String>,
161 #[clap(long)]
162 enable_region_failover: Option<bool>,
163 #[clap(long)]
164 http_addr: Option<String>,
165 #[clap(long)]
166 http_timeout: Option<u64>,
167 #[clap(long, default_value = "GREPTIMEDB_METASRV")]
168 env_prefix: String,
169 #[clap(long)]
171 data_home: Option<String>,
172 #[clap(long, default_value = "")]
174 store_key_prefix: String,
175 #[clap(long)]
177 max_txn_ops: Option<usize>,
178 #[clap(long, value_enum)]
180 backend: Option<BackendImpl>,
181}
182
183impl Debug for StartCommand {
184 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185 f.debug_struct("StartCommand")
186 .field("grpc_bind_addr", &self.grpc_bind_addr)
187 .field("grpc_server_addr", &self.grpc_server_addr)
188 .field("store_addrs", &self.sanitize_store_addrs())
189 .field("config_file", &self.config_file)
190 .field("selector", &self.selector)
191 .field("enable_region_failover", &self.enable_region_failover)
192 .field("http_addr", &self.http_addr)
193 .field("http_timeout", &self.http_timeout)
194 .field("env_prefix", &self.env_prefix)
195 .field("data_home", &self.data_home)
196 .field("store_key_prefix", &self.store_key_prefix)
197 .field("max_txn_ops", &self.max_txn_ops)
198 .field("backend", &self.backend)
199 .finish()
200 }
201}
202
203impl StartCommand {
204 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
205 let mut opts = MetasrvOptions::load_layered_options(
206 self.config_file.as_deref(),
207 self.env_prefix.as_ref(),
208 )
209 .context(LoadLayeredConfigSnafu)?;
210
211 self.merge_with_cli_options(global_options, &mut opts)?;
212
213 Ok(opts)
214 }
215
216 fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
217 self.store_addrs.as_ref().map(|addrs| {
218 addrs
219 .iter()
220 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
221 .collect()
222 })
223 }
224
225 fn merge_with_cli_options(
227 &self,
228 global_options: &GlobalOptions,
229 opts: &mut MetasrvOptions,
230 ) -> Result<()> {
231 let opts = &mut opts.component;
232
233 if let Some(dir) = &global_options.log_dir {
234 opts.logging.dir.clone_from(dir);
235 }
236
237 if global_options.log_level.is_some() {
238 opts.logging.level.clone_from(&global_options.log_level);
239 }
240
241 opts.tracing = TracingOptions {
242 #[cfg(feature = "tokio-console")]
243 tokio_console_addr: global_options.tokio_console_addr.clone(),
244 };
245
246 #[allow(deprecated)]
247 if let Some(addr) = &self.grpc_bind_addr {
248 opts.bind_addr.clone_from(addr);
249 opts.grpc.bind_addr.clone_from(addr);
250 } else if !opts.bind_addr.is_empty() {
251 warn!(
252 "Use the deprecated attribute `MetasrvOptions.bind_addr`, please use `grpc.bind_addr` instead."
253 );
254 opts.grpc.bind_addr.clone_from(&opts.bind_addr);
255 }
256
257 #[allow(deprecated)]
258 if let Some(addr) = &self.grpc_server_addr {
259 opts.server_addr.clone_from(addr);
260 opts.grpc.server_addr.clone_from(addr);
261 } else if !opts.server_addr.is_empty() {
262 warn!(
263 "Use the deprecated attribute `MetasrvOptions.server_addr`, please use `grpc.server_addr` instead."
264 );
265 opts.grpc.server_addr.clone_from(&opts.server_addr);
266 }
267
268 if let Some(addrs) = &self.store_addrs {
269 opts.store_addrs.clone_from(addrs);
270 }
271
272 if let Some(selector_type) = &self.selector {
273 opts.selector = selector_type[..]
274 .try_into()
275 .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
276 }
277
278 if let Some(enable_region_failover) = self.enable_region_failover {
279 opts.enable_region_failover = enable_region_failover;
280 }
281
282 if let Some(http_addr) = &self.http_addr {
283 opts.http.addr.clone_from(http_addr);
284 }
285
286 if let Some(http_timeout) = self.http_timeout {
287 opts.http.timeout = Duration::from_secs(http_timeout);
288 }
289
290 if let Some(data_home) = &self.data_home {
291 opts.data_home.clone_from(data_home);
292 }
293
294 if opts.logging.dir.is_empty() {
296 opts.logging.dir = Path::new(&opts.data_home)
297 .join(DEFAULT_LOGGING_DIR)
298 .to_string_lossy()
299 .to_string();
300 }
301
302 if !self.store_key_prefix.is_empty() {
303 opts.store_key_prefix.clone_from(&self.store_key_prefix)
304 }
305
306 if let Some(max_txn_ops) = self.max_txn_ops {
307 opts.max_txn_ops = max_txn_ops;
308 }
309
310 if let Some(backend) = &self.backend {
311 opts.backend.clone_from(backend);
312 }
313
314 opts.http.disable_dashboard = true;
316
317 Ok(())
318 }
319
320 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
321 common_runtime::init_global_runtimes(&opts.runtime);
322
323 let guard = common_telemetry::init_global_logging(
324 APP_NAME,
325 &opts.component.logging,
326 &opts.component.tracing,
327 None,
328 None,
329 );
330
331 log_versions(verbose_version(), short_version(), APP_NAME);
332 maybe_activate_heap_profile(&opts.component.memory);
333 create_resource_limit_metrics(APP_NAME);
334 init_distributed_time_constants(opts.component.heartbeat_interval);
335
336 info!("Metasrv start command: {:#?}", self);
337
338 let plugin_opts = opts.plugins;
339 let mut opts = opts.component;
340 opts.grpc.detect_server_addr();
341
342 info!("Metasrv options: {:#?}", opts);
343
344 let mut plugins = Plugins::new();
345 plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
346 .await
347 .context(StartMetaServerSnafu)?;
348
349 let builder = metasrv_builder(&opts, plugins, None)
350 .await
351 .context(error::BuildMetaServerSnafu)?;
352 let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
353
354 let instance = MetasrvInstance::new(metasrv)
355 .await
356 .context(error::BuildMetaServerSnafu)?;
357
358 Ok(Instance::new(instance, guard))
359 }
360}
361
362#[cfg(test)]
363mod tests {
364 use std::io::Write;
365
366 use clap::{CommandFactory, Parser};
367 use common_base::readable_size::ReadableSize;
368 use common_config::ENV_VAR_SEP;
369 use common_test_util::temp_dir::create_named_temp_file;
370 use meta_srv::selector::SelectorType;
371
372 use super::*;
373
374 #[test]
375 fn test_read_from_cmd() {
376 let cmd = StartCommand {
377 grpc_bind_addr: Some("127.0.0.1:3002".to_string()),
378 grpc_server_addr: Some("127.0.0.1:3002".to_string()),
379 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
380 selector: Some("LoadBased".to_string()),
381 ..Default::default()
382 };
383
384 let options = cmd.load_options(&Default::default()).unwrap().component;
385 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
386 assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
387 assert_eq!(SelectorType::LoadBased, options.selector);
388 }
389
390 #[test]
391 fn test_read_from_config_file() {
392 let mut file = create_named_temp_file();
393 let toml_str = r#"
394 bind_addr = "127.0.0.1:3002"
395 server_addr = "127.0.0.1:3002"
396 store_addr = "127.0.0.1:2379"
397 selector = "LeaseBased"
398
399 [logging]
400 level = "debug"
401 dir = "./greptimedb_data/test/logs"
402
403 [failure_detector]
404 threshold = 8.0
405 min_std_deviation = "100ms"
406 acceptable_heartbeat_pause = "3000ms"
407 "#;
408 write!(file, "{}", toml_str).unwrap();
409
410 let cmd = StartCommand {
411 config_file: Some(file.path().to_str().unwrap().to_string()),
412 ..Default::default()
413 };
414
415 let options = cmd.load_options(&Default::default()).unwrap().component;
416 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
417 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
418 assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
419 assert_eq!(SelectorType::LeaseBased, options.selector);
420 assert_eq!("debug", options.logging.level.as_ref().unwrap());
421 assert_eq!(
422 "./greptimedb_data/test/logs".to_string(),
423 options.logging.dir
424 );
425 assert_eq!(8.0, options.failure_detector.threshold);
426 assert_eq!(
427 100.0,
428 options.failure_detector.min_std_deviation.as_millis() as f32
429 );
430 assert_eq!(
431 3000,
432 options
433 .failure_detector
434 .acceptable_heartbeat_pause
435 .as_millis()
436 );
437 assert_eq!(
438 options.procedure.max_metadata_value_size,
439 Some(ReadableSize::kb(1500))
440 );
441 }
442
443 #[test]
444 fn test_load_log_options_from_cli() {
445 let cmd = StartCommand {
446 grpc_bind_addr: Some("127.0.0.1:3002".to_string()),
447 grpc_server_addr: Some("127.0.0.1:3002".to_string()),
448 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
449 selector: Some("LoadBased".to_string()),
450 ..Default::default()
451 };
452
453 let options = cmd
454 .load_options(&GlobalOptions {
455 log_dir: Some("./greptimedb_data/test/logs".to_string()),
456 log_level: Some("debug".to_string()),
457
458 #[cfg(feature = "tokio-console")]
459 tokio_console_addr: None,
460 })
461 .unwrap()
462 .component;
463
464 let logging_opt = options.logging;
465 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
466 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
467 }
468
469 #[test]
470 fn test_config_precedence_order() {
471 let mut file = create_named_temp_file();
472 let toml_str = r#"
473 server_addr = "127.0.0.1:3002"
474 datanode_lease_secs = 15
475 selector = "LeaseBased"
476
477 [http]
478 addr = "127.0.0.1:4000"
479
480 [logging]
481 level = "debug"
482 dir = "./greptimedb_data/test/logs"
483 "#;
484 write!(file, "{}", toml_str).unwrap();
485
486 let env_prefix = "METASRV_UT";
487 temp_env::with_vars(
488 [
489 (
490 [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
492 Some("127.0.0.1:14002"),
493 ),
494 (
495 [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
497 Some("127.0.0.1:13002"),
498 ),
499 (
500 [
502 env_prefix.to_string(),
503 "http".to_uppercase(),
504 "addr".to_uppercase(),
505 ]
506 .join(ENV_VAR_SEP),
507 Some("127.0.0.1:24000"),
508 ),
509 ],
510 || {
511 let command = StartCommand {
512 http_addr: Some("127.0.0.1:14000".to_string()),
513 config_file: Some(file.path().to_str().unwrap().to_string()),
514 env_prefix: env_prefix.to_string(),
515 ..Default::default()
516 };
517
518 let opts = command.load_options(&Default::default()).unwrap().component;
519
520 assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
522
523 assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
525
526 assert_eq!(opts.http.addr, "127.0.0.1:14000");
528
529 assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
531 },
532 );
533 }
534
535 #[test]
536 fn test_parse_grpc_cli_aliases() {
537 let command = StartCommand::try_parse_from([
538 "metasrv",
539 "--grpc-bind-addr",
540 "127.0.0.1:13002",
541 "--grpc-server-addr",
542 "10.0.0.1:13002",
543 ])
544 .unwrap();
545 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:13002"));
546 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:13002"));
547
548 let command = StartCommand::try_parse_from([
549 "metasrv",
550 "--rpc-bind-addr",
551 "127.0.0.1:23002",
552 "--rpc-server-addr",
553 "10.0.0.2:23002",
554 ])
555 .unwrap();
556 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:23002"));
557 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:23002"));
558
559 let command = StartCommand::try_parse_from([
560 "metasrv",
561 "--bind-addr",
562 "127.0.0.1:33002",
563 "--server-addr",
564 "10.0.0.3:33002",
565 ])
566 .unwrap();
567 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:33002"));
568 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:33002"));
569 }
570
571 #[test]
572 fn test_help_uses_grpc_option_names() {
573 let mut cmd = StartCommand::command();
574 let mut help = Vec::new();
575 cmd.write_long_help(&mut help).unwrap();
576 let help = String::from_utf8(help).unwrap();
577
578 assert!(help.contains("--grpc-bind-addr"));
579 assert!(help.contains("--grpc-server-addr"));
580 assert!(!help.contains("--rpc-bind-addr"));
581 assert!(!help.contains("--rpc-server-addr"));
582 assert!(!help.contains("--bind-addr"));
583 assert!(!help.contains("--server-addr"));
584 }
585}