1use std::fmt::{self, Debug};
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_meta::distributed_time_constants::init_distributed_time_constants;
24use common_telemetry::info;
25use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
26use common_version::{short_version, verbose_version};
27use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
28use meta_srv::metasrv::BackendImpl;
29use snafu::ResultExt;
30use tracing_appender::non_blocking::WorkerGuard;
31
32use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
33use crate::options::{GlobalOptions, GreptimeOptions};
34use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
35
36type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
37
38pub const APP_NAME: &str = "greptime-metasrv";
39
40pub struct Instance {
41 instance: MetasrvInstance,
42
43 _guard: Vec<WorkerGuard>,
45}
46
47impl Instance {
48 pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
49 Self {
50 instance,
51 _guard: guard,
52 }
53 }
54
55 pub fn get_inner(&self) -> &MetasrvInstance {
56 &self.instance
57 }
58
59 pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
60 &mut self.instance
61 }
62}
63
64#[async_trait]
65impl App for Instance {
66 fn name(&self) -> &str {
67 APP_NAME
68 }
69
70 async fn start(&mut self) -> Result<()> {
71 plugins::start_metasrv_plugins(self.instance.plugins())
72 .await
73 .context(StartMetaServerSnafu)?;
74
75 self.instance.start().await.context(StartMetaServerSnafu)
76 }
77
78 async fn stop(&mut self) -> Result<()> {
79 self.instance
80 .shutdown()
81 .await
82 .context(error::ShutdownMetaServerSnafu)
83 }
84}
85
86#[derive(Parser)]
87pub struct Command {
88 #[clap(subcommand)]
89 subcmd: SubCommand,
90}
91
92impl Command {
93 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
94 self.subcmd.build(opts).await
95 }
96
97 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
98 self.subcmd.load_options(global_options)
99 }
100
101 pub fn config_file(&self) -> &Option<String> {
102 self.subcmd.config_file()
103 }
104
105 pub fn env_prefix(&self) -> &String {
106 self.subcmd.env_prefix()
107 }
108}
109
110#[derive(Parser)]
111enum SubCommand {
112 Start(StartCommand),
113}
114
115impl SubCommand {
116 async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
117 match self {
118 SubCommand::Start(cmd) => cmd.build(opts).await,
119 }
120 }
121
122 fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
123 match self {
124 SubCommand::Start(cmd) => cmd.load_options(global_options),
125 }
126 }
127
128 fn config_file(&self) -> &Option<String> {
129 match self {
130 SubCommand::Start(cmd) => &cmd.config_file,
131 }
132 }
133
134 fn env_prefix(&self) -> &String {
135 match self {
136 SubCommand::Start(cmd) => &cmd.env_prefix,
137 }
138 }
139}
140
141#[derive(Default, Parser)]
142pub struct StartCommand {
143 #[clap(long, alias = "bind-addr")]
145 rpc_bind_addr: Option<String>,
146 #[clap(long, alias = "server-addr")]
150 rpc_server_addr: Option<String>,
151 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
152 store_addrs: Option<Vec<String>>,
153 #[clap(short, long)]
154 config_file: Option<String>,
155 #[clap(short, long)]
156 selector: Option<String>,
157 #[clap(long)]
158 enable_region_failover: Option<bool>,
159 #[clap(long)]
160 http_addr: Option<String>,
161 #[clap(long)]
162 http_timeout: Option<u64>,
163 #[clap(long, default_value = "GREPTIMEDB_METASRV")]
164 env_prefix: String,
165 #[clap(long)]
167 data_home: Option<String>,
168 #[clap(long, default_value = "")]
170 store_key_prefix: String,
171 #[clap(long)]
173 max_txn_ops: Option<usize>,
174 #[clap(long, value_enum)]
176 backend: Option<BackendImpl>,
177}
178
179impl Debug for StartCommand {
180 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181 f.debug_struct("StartCommand")
182 .field("rpc_bind_addr", &self.rpc_bind_addr)
183 .field("rpc_server_addr", &self.rpc_server_addr)
184 .field("store_addrs", &self.sanitize_store_addrs())
185 .field("config_file", &self.config_file)
186 .field("selector", &self.selector)
187 .field("enable_region_failover", &self.enable_region_failover)
188 .field("http_addr", &self.http_addr)
189 .field("http_timeout", &self.http_timeout)
190 .field("env_prefix", &self.env_prefix)
191 .field("data_home", &self.data_home)
192 .field("store_key_prefix", &self.store_key_prefix)
193 .field("max_txn_ops", &self.max_txn_ops)
194 .field("backend", &self.backend)
195 .finish()
196 }
197}
198
199impl StartCommand {
200 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
201 let mut opts = MetasrvOptions::load_layered_options(
202 self.config_file.as_deref(),
203 self.env_prefix.as_ref(),
204 )
205 .context(LoadLayeredConfigSnafu)?;
206
207 self.merge_with_cli_options(global_options, &mut opts)?;
208
209 Ok(opts)
210 }
211
212 fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
213 self.store_addrs.as_ref().map(|addrs| {
214 addrs
215 .iter()
216 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
217 .collect()
218 })
219 }
220
221 fn merge_with_cli_options(
223 &self,
224 global_options: &GlobalOptions,
225 opts: &mut MetasrvOptions,
226 ) -> Result<()> {
227 let opts = &mut opts.component;
228
229 if let Some(dir) = &global_options.log_dir {
230 opts.logging.dir.clone_from(dir);
231 }
232
233 if global_options.log_level.is_some() {
234 opts.logging.level.clone_from(&global_options.log_level);
235 }
236
237 opts.tracing = TracingOptions {
238 #[cfg(feature = "tokio-console")]
239 tokio_console_addr: global_options.tokio_console_addr.clone(),
240 };
241
242 #[allow(deprecated)]
243 if let Some(addr) = &self.rpc_bind_addr {
244 opts.bind_addr.clone_from(addr);
245 opts.grpc.bind_addr.clone_from(addr);
246 } else if !opts.bind_addr.is_empty() {
247 opts.grpc.bind_addr.clone_from(&opts.bind_addr);
248 }
249
250 #[allow(deprecated)]
251 if let Some(addr) = &self.rpc_server_addr {
252 opts.server_addr.clone_from(addr);
253 opts.grpc.server_addr.clone_from(addr);
254 } else if !opts.server_addr.is_empty() {
255 opts.grpc.server_addr.clone_from(&opts.server_addr);
256 }
257
258 if let Some(addrs) = &self.store_addrs {
259 opts.store_addrs.clone_from(addrs);
260 }
261
262 if let Some(selector_type) = &self.selector {
263 opts.selector = selector_type[..]
264 .try_into()
265 .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
266 }
267
268 if let Some(enable_region_failover) = self.enable_region_failover {
269 opts.enable_region_failover = enable_region_failover;
270 }
271
272 if let Some(http_addr) = &self.http_addr {
273 opts.http.addr.clone_from(http_addr);
274 }
275
276 if let Some(http_timeout) = self.http_timeout {
277 opts.http.timeout = Duration::from_secs(http_timeout);
278 }
279
280 if let Some(data_home) = &self.data_home {
281 opts.data_home.clone_from(data_home);
282 }
283
284 if opts.logging.dir.is_empty() {
286 opts.logging.dir = Path::new(&opts.data_home)
287 .join(DEFAULT_LOGGING_DIR)
288 .to_string_lossy()
289 .to_string();
290 }
291
292 if !self.store_key_prefix.is_empty() {
293 opts.store_key_prefix.clone_from(&self.store_key_prefix)
294 }
295
296 if let Some(max_txn_ops) = self.max_txn_ops {
297 opts.max_txn_ops = max_txn_ops;
298 }
299
300 if let Some(backend) = &self.backend {
301 opts.backend.clone_from(backend);
302 }
303
304 opts.http.disable_dashboard = true;
306
307 Ok(())
308 }
309
310 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
311 common_runtime::init_global_runtimes(&opts.runtime);
312
313 let guard = common_telemetry::init_global_logging(
314 APP_NAME,
315 &opts.component.logging,
316 &opts.component.tracing,
317 None,
318 None,
319 );
320
321 log_versions(verbose_version(), short_version(), APP_NAME);
322 maybe_activate_heap_profile(&opts.component.memory);
323 create_resource_limit_metrics(APP_NAME);
324 init_distributed_time_constants(opts.component.heartbeat_interval);
325
326 info!("Metasrv start command: {:#?}", self);
327
328 let plugin_opts = opts.plugins;
329 let mut opts = opts.component;
330 opts.grpc.detect_server_addr();
331
332 info!("Metasrv options: {:#?}", opts);
333
334 let mut plugins = Plugins::new();
335 plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
336 .await
337 .context(StartMetaServerSnafu)?;
338
339 let builder = metasrv_builder(&opts, plugins, None)
340 .await
341 .context(error::BuildMetaServerSnafu)?;
342 let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
343
344 let instance = MetasrvInstance::new(metasrv)
345 .await
346 .context(error::BuildMetaServerSnafu)?;
347
348 Ok(Instance::new(instance, guard))
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use std::io::Write;
355
356 use common_base::readable_size::ReadableSize;
357 use common_config::ENV_VAR_SEP;
358 use common_test_util::temp_dir::create_named_temp_file;
359 use meta_srv::selector::SelectorType;
360
361 use super::*;
362
363 #[test]
364 fn test_read_from_cmd() {
365 let cmd = StartCommand {
366 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
367 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
368 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
369 selector: Some("LoadBased".to_string()),
370 ..Default::default()
371 };
372
373 let options = cmd.load_options(&Default::default()).unwrap().component;
374 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
375 assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
376 assert_eq!(SelectorType::LoadBased, options.selector);
377 }
378
379 #[test]
380 fn test_read_from_config_file() {
381 let mut file = create_named_temp_file();
382 let toml_str = r#"
383 bind_addr = "127.0.0.1:3002"
384 server_addr = "127.0.0.1:3002"
385 store_addr = "127.0.0.1:2379"
386 selector = "LeaseBased"
387
388 [logging]
389 level = "debug"
390 dir = "./greptimedb_data/test/logs"
391
392 [failure_detector]
393 threshold = 8.0
394 min_std_deviation = "100ms"
395 acceptable_heartbeat_pause = "3000ms"
396 "#;
397 write!(file, "{}", toml_str).unwrap();
398
399 let cmd = StartCommand {
400 config_file: Some(file.path().to_str().unwrap().to_string()),
401 ..Default::default()
402 };
403
404 let options = cmd.load_options(&Default::default()).unwrap().component;
405 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
406 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
407 assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
408 assert_eq!(SelectorType::LeaseBased, options.selector);
409 assert_eq!("debug", options.logging.level.as_ref().unwrap());
410 assert_eq!(
411 "./greptimedb_data/test/logs".to_string(),
412 options.logging.dir
413 );
414 assert_eq!(8.0, options.failure_detector.threshold);
415 assert_eq!(
416 100.0,
417 options.failure_detector.min_std_deviation.as_millis() as f32
418 );
419 assert_eq!(
420 3000,
421 options
422 .failure_detector
423 .acceptable_heartbeat_pause
424 .as_millis()
425 );
426 assert_eq!(
427 options.procedure.max_metadata_value_size,
428 Some(ReadableSize::kb(1500))
429 );
430 }
431
432 #[test]
433 fn test_load_log_options_from_cli() {
434 let cmd = StartCommand {
435 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
436 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
437 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
438 selector: Some("LoadBased".to_string()),
439 ..Default::default()
440 };
441
442 let options = cmd
443 .load_options(&GlobalOptions {
444 log_dir: Some("./greptimedb_data/test/logs".to_string()),
445 log_level: Some("debug".to_string()),
446
447 #[cfg(feature = "tokio-console")]
448 tokio_console_addr: None,
449 })
450 .unwrap()
451 .component;
452
453 let logging_opt = options.logging;
454 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
455 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
456 }
457
458 #[test]
459 fn test_config_precedence_order() {
460 let mut file = create_named_temp_file();
461 let toml_str = r#"
462 server_addr = "127.0.0.1:3002"
463 datanode_lease_secs = 15
464 selector = "LeaseBased"
465
466 [http]
467 addr = "127.0.0.1:4000"
468
469 [logging]
470 level = "debug"
471 dir = "./greptimedb_data/test/logs"
472 "#;
473 write!(file, "{}", toml_str).unwrap();
474
475 let env_prefix = "METASRV_UT";
476 temp_env::with_vars(
477 [
478 (
479 [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
481 Some("127.0.0.1:14002"),
482 ),
483 (
484 [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
486 Some("127.0.0.1:13002"),
487 ),
488 (
489 [
491 env_prefix.to_string(),
492 "http".to_uppercase(),
493 "addr".to_uppercase(),
494 ]
495 .join(ENV_VAR_SEP),
496 Some("127.0.0.1:24000"),
497 ),
498 ],
499 || {
500 let command = StartCommand {
501 http_addr: Some("127.0.0.1:14000".to_string()),
502 config_file: Some(file.path().to_str().unwrap().to_string()),
503 env_prefix: env_prefix.to_string(),
504 ..Default::default()
505 };
506
507 let opts = command.load_options(&Default::default()).unwrap().component;
508
509 assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
511
512 assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
514
515 assert_eq!(opts.http.addr, "127.0.0.1:14000");
517
518 assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
520 },
521 );
522 }
523}