1use std::fmt;
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_telemetry::info;
24use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
25use common_version::{short_version, version};
26use meta_srv::bootstrap::MetasrvInstance;
27use meta_srv::metasrv::BackendImpl;
28use snafu::ResultExt;
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
32use crate::options::{GlobalOptions, GreptimeOptions};
33use crate::{create_resource_limit_metrics, log_versions, App};
34
35type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
36
37pub const APP_NAME: &str = "greptime-metasrv";
38
39pub struct Instance {
40 instance: MetasrvInstance,
41
42 _guard: Vec<WorkerGuard>,
44}
45
46impl Instance {
47 pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
48 Self {
49 instance,
50 _guard: guard,
51 }
52 }
53
54 pub fn get_inner(&self) -> &MetasrvInstance {
55 &self.instance
56 }
57}
58
59#[async_trait]
60impl App for Instance {
61 fn name(&self) -> &str {
62 APP_NAME
63 }
64
65 async fn start(&mut self) -> Result<()> {
66 plugins::start_metasrv_plugins(self.instance.plugins())
67 .await
68 .context(StartMetaServerSnafu)?;
69
70 self.instance.start().await.context(StartMetaServerSnafu)
71 }
72
73 async fn stop(&mut self) -> Result<()> {
74 self.instance
75 .shutdown()
76 .await
77 .context(error::ShutdownMetaServerSnafu)
78 }
79}
80
81#[derive(Parser)]
82pub struct Command {
83 #[clap(subcommand)]
84 subcmd: SubCommand,
85}
86
87impl Command {
88 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
89 self.subcmd.build(opts).await
90 }
91
92 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
93 self.subcmd.load_options(global_options)
94 }
95
96 pub fn config_file(&self) -> &Option<String> {
97 self.subcmd.config_file()
98 }
99
100 pub fn env_prefix(&self) -> &String {
101 self.subcmd.env_prefix()
102 }
103}
104
105#[derive(Parser)]
106enum SubCommand {
107 Start(StartCommand),
108}
109
110impl SubCommand {
111 async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
112 match self {
113 SubCommand::Start(cmd) => cmd.build(opts).await,
114 }
115 }
116
117 fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
118 match self {
119 SubCommand::Start(cmd) => cmd.load_options(global_options),
120 }
121 }
122
123 fn config_file(&self) -> &Option<String> {
124 match self {
125 SubCommand::Start(cmd) => &cmd.config_file,
126 }
127 }
128
129 fn env_prefix(&self) -> &String {
130 match self {
131 SubCommand::Start(cmd) => &cmd.env_prefix,
132 }
133 }
134}
135
136#[derive(Default, Parser)]
137pub struct StartCommand {
138 #[clap(long, alias = "bind-addr")]
140 rpc_bind_addr: Option<String>,
141 #[clap(long, alias = "server-addr")]
145 rpc_server_addr: Option<String>,
146 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
147 store_addrs: Option<Vec<String>>,
148 #[clap(short, long)]
149 config_file: Option<String>,
150 #[clap(short, long)]
151 selector: Option<String>,
152 #[clap(long)]
153 use_memory_store: Option<bool>,
154 #[clap(long)]
155 enable_region_failover: Option<bool>,
156 #[clap(long)]
157 http_addr: Option<String>,
158 #[clap(long)]
159 http_timeout: Option<u64>,
160 #[clap(long, default_value = "GREPTIMEDB_METASRV")]
161 env_prefix: String,
162 #[clap(long)]
164 data_home: Option<String>,
165 #[clap(long, default_value = "")]
167 store_key_prefix: String,
168 #[clap(long)]
170 max_txn_ops: Option<usize>,
171 #[clap(long, value_enum)]
173 backend: Option<BackendImpl>,
174}
175
176impl fmt::Debug for StartCommand {
177 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178 f.debug_struct("StartCommand")
179 .field("rpc_bind_addr", &self.rpc_bind_addr)
180 .field("rpc_server_addr", &self.rpc_server_addr)
181 .field("store_addrs", &self.sanitize_store_addrs())
182 .field("config_file", &self.config_file)
183 .field("selector", &self.selector)
184 .field("use_memory_store", &self.use_memory_store)
185 .field("enable_region_failover", &self.enable_region_failover)
186 .field("http_addr", &self.http_addr)
187 .field("http_timeout", &self.http_timeout)
188 .field("env_prefix", &self.env_prefix)
189 .field("data_home", &self.data_home)
190 .field("store_key_prefix", &self.store_key_prefix)
191 .field("max_txn_ops", &self.max_txn_ops)
192 .field("backend", &self.backend)
193 .finish()
194 }
195}
196
197impl StartCommand {
198 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
199 let mut opts = MetasrvOptions::load_layered_options(
200 self.config_file.as_deref(),
201 self.env_prefix.as_ref(),
202 )
203 .context(LoadLayeredConfigSnafu)?;
204
205 self.merge_with_cli_options(global_options, &mut opts)?;
206
207 Ok(opts)
208 }
209
210 fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
211 self.store_addrs.as_ref().map(|addrs| {
212 addrs
213 .iter()
214 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
215 .collect()
216 })
217 }
218
219 fn merge_with_cli_options(
221 &self,
222 global_options: &GlobalOptions,
223 opts: &mut MetasrvOptions,
224 ) -> Result<()> {
225 let opts = &mut opts.component;
226
227 if let Some(dir) = &global_options.log_dir {
228 opts.logging.dir.clone_from(dir);
229 }
230
231 if global_options.log_level.is_some() {
232 opts.logging.level.clone_from(&global_options.log_level);
233 }
234
235 opts.tracing = TracingOptions {
236 #[cfg(feature = "tokio-console")]
237 tokio_console_addr: global_options.tokio_console_addr.clone(),
238 };
239
240 #[allow(deprecated)]
241 if let Some(addr) = &self.rpc_bind_addr {
242 opts.bind_addr.clone_from(addr);
243 opts.grpc.bind_addr.clone_from(addr);
244 } else if !opts.bind_addr.is_empty() {
245 opts.grpc.bind_addr.clone_from(&opts.bind_addr);
246 }
247
248 #[allow(deprecated)]
249 if let Some(addr) = &self.rpc_server_addr {
250 opts.server_addr.clone_from(addr);
251 opts.grpc.server_addr.clone_from(addr);
252 } else if !opts.server_addr.is_empty() {
253 opts.grpc.server_addr.clone_from(&opts.server_addr);
254 }
255
256 if let Some(addrs) = &self.store_addrs {
257 opts.store_addrs.clone_from(addrs);
258 }
259
260 if let Some(selector_type) = &self.selector {
261 opts.selector = selector_type[..]
262 .try_into()
263 .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
264 }
265
266 if let Some(use_memory_store) = self.use_memory_store {
267 opts.use_memory_store = use_memory_store;
268 }
269
270 if let Some(enable_region_failover) = self.enable_region_failover {
271 opts.enable_region_failover = enable_region_failover;
272 }
273
274 if let Some(http_addr) = &self.http_addr {
275 opts.http.addr.clone_from(http_addr);
276 }
277
278 if let Some(http_timeout) = self.http_timeout {
279 opts.http.timeout = Duration::from_secs(http_timeout);
280 }
281
282 if let Some(data_home) = &self.data_home {
283 opts.data_home.clone_from(data_home);
284 }
285
286 if opts.logging.dir.is_empty() {
288 opts.logging.dir = Path::new(&opts.data_home)
289 .join(DEFAULT_LOGGING_DIR)
290 .to_string_lossy()
291 .to_string();
292 }
293
294 if !self.store_key_prefix.is_empty() {
295 opts.store_key_prefix.clone_from(&self.store_key_prefix)
296 }
297
298 if let Some(max_txn_ops) = self.max_txn_ops {
299 opts.max_txn_ops = max_txn_ops;
300 }
301
302 if let Some(backend) = &self.backend {
303 opts.backend.clone_from(backend);
304 }
305
306 opts.http.disable_dashboard = true;
308
309 Ok(())
310 }
311
312 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
313 common_runtime::init_global_runtimes(&opts.runtime);
314
315 let guard = common_telemetry::init_global_logging(
316 APP_NAME,
317 &opts.component.logging,
318 &opts.component.tracing,
319 None,
320 None,
321 );
322
323 log_versions(version(), short_version(), APP_NAME);
324 create_resource_limit_metrics(APP_NAME);
325
326 info!("Metasrv start command: {:#?}", self);
327
328 let plugin_opts = opts.plugins;
329 let mut opts = opts.component;
330 opts.grpc.detect_server_addr();
331
332 info!("Metasrv options: {:#?}", opts);
333
334 let mut plugins = Plugins::new();
335 plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
336 .await
337 .context(StartMetaServerSnafu)?;
338
339 let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
340 .await
341 .context(error::BuildMetaServerSnafu)?;
342 let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
343
344 let instance = MetasrvInstance::new(opts, plugins, metasrv)
345 .await
346 .context(error::BuildMetaServerSnafu)?;
347
348 Ok(Instance::new(instance, guard))
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use std::io::Write;
355
356 use common_base::readable_size::ReadableSize;
357 use common_config::ENV_VAR_SEP;
358 use common_test_util::temp_dir::create_named_temp_file;
359 use meta_srv::selector::SelectorType;
360
361 use super::*;
362
363 #[test]
364 fn test_read_from_cmd() {
365 let cmd = StartCommand {
366 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
367 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
368 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
369 selector: Some("LoadBased".to_string()),
370 ..Default::default()
371 };
372
373 let options = cmd.load_options(&Default::default()).unwrap().component;
374 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
375 assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
376 assert_eq!(SelectorType::LoadBased, options.selector);
377 }
378
379 #[test]
380 fn test_read_from_config_file() {
381 let mut file = create_named_temp_file();
382 let toml_str = r#"
383 bind_addr = "127.0.0.1:3002"
384 server_addr = "127.0.0.1:3002"
385 store_addr = "127.0.0.1:2379"
386 selector = "LeaseBased"
387 use_memory_store = false
388
389 [logging]
390 level = "debug"
391 dir = "./greptimedb_data/test/logs"
392
393 [failure_detector]
394 threshold = 8.0
395 min_std_deviation = "100ms"
396 acceptable_heartbeat_pause = "3000ms"
397 first_heartbeat_estimate = "1000ms"
398 "#;
399 write!(file, "{}", toml_str).unwrap();
400
401 let cmd = StartCommand {
402 config_file: Some(file.path().to_str().unwrap().to_string()),
403 ..Default::default()
404 };
405
406 let options = cmd.load_options(&Default::default()).unwrap().component;
407 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
408 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
409 assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
410 assert_eq!(SelectorType::LeaseBased, options.selector);
411 assert_eq!("debug", options.logging.level.as_ref().unwrap());
412 assert_eq!(
413 "./greptimedb_data/test/logs".to_string(),
414 options.logging.dir
415 );
416 assert_eq!(8.0, options.failure_detector.threshold);
417 assert_eq!(
418 100.0,
419 options.failure_detector.min_std_deviation.as_millis() as f32
420 );
421 assert_eq!(
422 3000,
423 options
424 .failure_detector
425 .acceptable_heartbeat_pause
426 .as_millis()
427 );
428 assert_eq!(
429 1000,
430 options
431 .failure_detector
432 .first_heartbeat_estimate
433 .as_millis()
434 );
435 assert_eq!(
436 options.procedure.max_metadata_value_size,
437 Some(ReadableSize::kb(1500))
438 );
439 }
440
441 #[test]
442 fn test_load_log_options_from_cli() {
443 let cmd = StartCommand {
444 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
445 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
446 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
447 selector: Some("LoadBased".to_string()),
448 ..Default::default()
449 };
450
451 let options = cmd
452 .load_options(&GlobalOptions {
453 log_dir: Some("./greptimedb_data/test/logs".to_string()),
454 log_level: Some("debug".to_string()),
455
456 #[cfg(feature = "tokio-console")]
457 tokio_console_addr: None,
458 })
459 .unwrap()
460 .component;
461
462 let logging_opt = options.logging;
463 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
464 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
465 }
466
467 #[test]
468 fn test_config_precedence_order() {
469 let mut file = create_named_temp_file();
470 let toml_str = r#"
471 server_addr = "127.0.0.1:3002"
472 datanode_lease_secs = 15
473 selector = "LeaseBased"
474 use_memory_store = false
475
476 [http]
477 addr = "127.0.0.1:4000"
478
479 [logging]
480 level = "debug"
481 dir = "./greptimedb_data/test/logs"
482 "#;
483 write!(file, "{}", toml_str).unwrap();
484
485 let env_prefix = "METASRV_UT";
486 temp_env::with_vars(
487 [
488 (
489 [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
491 Some("127.0.0.1:14002"),
492 ),
493 (
494 [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
496 Some("127.0.0.1:13002"),
497 ),
498 (
499 [
501 env_prefix.to_string(),
502 "http".to_uppercase(),
503 "addr".to_uppercase(),
504 ]
505 .join(ENV_VAR_SEP),
506 Some("127.0.0.1:24000"),
507 ),
508 ],
509 || {
510 let command = StartCommand {
511 http_addr: Some("127.0.0.1:14000".to_string()),
512 config_file: Some(file.path().to_str().unwrap().to_string()),
513 env_prefix: env_prefix.to_string(),
514 ..Default::default()
515 };
516
517 let opts = command.load_options(&Default::default()).unwrap().component;
518
519 assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
521
522 assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
524
525 assert_eq!(opts.http.addr, "127.0.0.1:14000");
527
528 assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
530 },
531 );
532 }
533}