1use std::fmt;
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_telemetry::info;
24use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
25use common_version::{short_version, verbose_version};
26use meta_srv::bootstrap::MetasrvInstance;
27use meta_srv::metasrv::BackendImpl;
28use snafu::ResultExt;
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
32use crate::options::{GlobalOptions, GreptimeOptions};
33use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile, App};
34
35type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
36
37pub const APP_NAME: &str = "greptime-metasrv";
38
39pub struct Instance {
40 instance: MetasrvInstance,
41
42 _guard: Vec<WorkerGuard>,
44}
45
46impl Instance {
47 pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
48 Self {
49 instance,
50 _guard: guard,
51 }
52 }
53
54 pub fn get_inner(&self) -> &MetasrvInstance {
55 &self.instance
56 }
57
58 pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
59 &mut self.instance
60 }
61}
62
63#[async_trait]
64impl App for Instance {
65 fn name(&self) -> &str {
66 APP_NAME
67 }
68
69 async fn start(&mut self) -> Result<()> {
70 plugins::start_metasrv_plugins(self.instance.plugins())
71 .await
72 .context(StartMetaServerSnafu)?;
73
74 self.instance.start().await.context(StartMetaServerSnafu)
75 }
76
77 async fn stop(&mut self) -> Result<()> {
78 self.instance
79 .shutdown()
80 .await
81 .context(error::ShutdownMetaServerSnafu)
82 }
83}
84
85#[derive(Parser)]
86pub struct Command {
87 #[clap(subcommand)]
88 subcmd: SubCommand,
89}
90
91impl Command {
92 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
93 self.subcmd.build(opts).await
94 }
95
96 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
97 self.subcmd.load_options(global_options)
98 }
99
100 pub fn config_file(&self) -> &Option<String> {
101 self.subcmd.config_file()
102 }
103
104 pub fn env_prefix(&self) -> &String {
105 self.subcmd.env_prefix()
106 }
107}
108
109#[derive(Parser)]
110enum SubCommand {
111 Start(StartCommand),
112}
113
114impl SubCommand {
115 async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
116 match self {
117 SubCommand::Start(cmd) => cmd.build(opts).await,
118 }
119 }
120
121 fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
122 match self {
123 SubCommand::Start(cmd) => cmd.load_options(global_options),
124 }
125 }
126
127 fn config_file(&self) -> &Option<String> {
128 match self {
129 SubCommand::Start(cmd) => &cmd.config_file,
130 }
131 }
132
133 fn env_prefix(&self) -> &String {
134 match self {
135 SubCommand::Start(cmd) => &cmd.env_prefix,
136 }
137 }
138}
139
140#[derive(Default, Parser)]
141pub struct StartCommand {
142 #[clap(long, alias = "bind-addr")]
144 rpc_bind_addr: Option<String>,
145 #[clap(long, alias = "server-addr")]
149 rpc_server_addr: Option<String>,
150 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
151 store_addrs: Option<Vec<String>>,
152 #[clap(short, long)]
153 config_file: Option<String>,
154 #[clap(short, long)]
155 selector: Option<String>,
156 #[clap(long)]
157 use_memory_store: Option<bool>,
158 #[clap(long)]
159 enable_region_failover: Option<bool>,
160 #[clap(long)]
161 http_addr: Option<String>,
162 #[clap(long)]
163 http_timeout: Option<u64>,
164 #[clap(long, default_value = "GREPTIMEDB_METASRV")]
165 env_prefix: String,
166 #[clap(long)]
168 data_home: Option<String>,
169 #[clap(long, default_value = "")]
171 store_key_prefix: String,
172 #[clap(long)]
174 max_txn_ops: Option<usize>,
175 #[clap(long, value_enum)]
177 backend: Option<BackendImpl>,
178}
179
180impl fmt::Debug for StartCommand {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 f.debug_struct("StartCommand")
183 .field("rpc_bind_addr", &self.rpc_bind_addr)
184 .field("rpc_server_addr", &self.rpc_server_addr)
185 .field("store_addrs", &self.sanitize_store_addrs())
186 .field("config_file", &self.config_file)
187 .field("selector", &self.selector)
188 .field("use_memory_store", &self.use_memory_store)
189 .field("enable_region_failover", &self.enable_region_failover)
190 .field("http_addr", &self.http_addr)
191 .field("http_timeout", &self.http_timeout)
192 .field("env_prefix", &self.env_prefix)
193 .field("data_home", &self.data_home)
194 .field("store_key_prefix", &self.store_key_prefix)
195 .field("max_txn_ops", &self.max_txn_ops)
196 .field("backend", &self.backend)
197 .finish()
198 }
199}
200
201impl StartCommand {
202 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
203 let mut opts = MetasrvOptions::load_layered_options(
204 self.config_file.as_deref(),
205 self.env_prefix.as_ref(),
206 )
207 .context(LoadLayeredConfigSnafu)?;
208
209 self.merge_with_cli_options(global_options, &mut opts)?;
210
211 Ok(opts)
212 }
213
214 fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
215 self.store_addrs.as_ref().map(|addrs| {
216 addrs
217 .iter()
218 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
219 .collect()
220 })
221 }
222
223 fn merge_with_cli_options(
225 &self,
226 global_options: &GlobalOptions,
227 opts: &mut MetasrvOptions,
228 ) -> Result<()> {
229 let opts = &mut opts.component;
230
231 if let Some(dir) = &global_options.log_dir {
232 opts.logging.dir.clone_from(dir);
233 }
234
235 if global_options.log_level.is_some() {
236 opts.logging.level.clone_from(&global_options.log_level);
237 }
238
239 opts.tracing = TracingOptions {
240 #[cfg(feature = "tokio-console")]
241 tokio_console_addr: global_options.tokio_console_addr.clone(),
242 };
243
244 #[allow(deprecated)]
245 if let Some(addr) = &self.rpc_bind_addr {
246 opts.bind_addr.clone_from(addr);
247 opts.grpc.bind_addr.clone_from(addr);
248 } else if !opts.bind_addr.is_empty() {
249 opts.grpc.bind_addr.clone_from(&opts.bind_addr);
250 }
251
252 #[allow(deprecated)]
253 if let Some(addr) = &self.rpc_server_addr {
254 opts.server_addr.clone_from(addr);
255 opts.grpc.server_addr.clone_from(addr);
256 } else if !opts.server_addr.is_empty() {
257 opts.grpc.server_addr.clone_from(&opts.server_addr);
258 }
259
260 if let Some(addrs) = &self.store_addrs {
261 opts.store_addrs.clone_from(addrs);
262 }
263
264 if let Some(selector_type) = &self.selector {
265 opts.selector = selector_type[..]
266 .try_into()
267 .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
268 }
269
270 if let Some(use_memory_store) = self.use_memory_store {
271 opts.use_memory_store = use_memory_store;
272 }
273
274 if let Some(enable_region_failover) = self.enable_region_failover {
275 opts.enable_region_failover = enable_region_failover;
276 }
277
278 if let Some(http_addr) = &self.http_addr {
279 opts.http.addr.clone_from(http_addr);
280 }
281
282 if let Some(http_timeout) = self.http_timeout {
283 opts.http.timeout = Duration::from_secs(http_timeout);
284 }
285
286 if let Some(data_home) = &self.data_home {
287 opts.data_home.clone_from(data_home);
288 }
289
290 if opts.logging.dir.is_empty() {
292 opts.logging.dir = Path::new(&opts.data_home)
293 .join(DEFAULT_LOGGING_DIR)
294 .to_string_lossy()
295 .to_string();
296 }
297
298 if !self.store_key_prefix.is_empty() {
299 opts.store_key_prefix.clone_from(&self.store_key_prefix)
300 }
301
302 if let Some(max_txn_ops) = self.max_txn_ops {
303 opts.max_txn_ops = max_txn_ops;
304 }
305
306 if let Some(backend) = &self.backend {
307 opts.backend.clone_from(backend);
308 }
309
310 opts.http.disable_dashboard = true;
312
313 Ok(())
314 }
315
316 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
317 common_runtime::init_global_runtimes(&opts.runtime);
318
319 let guard = common_telemetry::init_global_logging(
320 APP_NAME,
321 &opts.component.logging,
322 &opts.component.tracing,
323 None,
324 None,
325 );
326
327 log_versions(verbose_version(), short_version(), APP_NAME);
328 maybe_activate_heap_profile(&opts.component.memory);
329 create_resource_limit_metrics(APP_NAME);
330
331 info!("Metasrv start command: {:#?}", self);
332
333 let plugin_opts = opts.plugins;
334 let mut opts = opts.component;
335 opts.grpc.detect_server_addr();
336
337 info!("Metasrv options: {:#?}", opts);
338
339 let mut plugins = Plugins::new();
340 plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
341 .await
342 .context(StartMetaServerSnafu)?;
343
344 let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
345 .await
346 .context(error::BuildMetaServerSnafu)?;
347 let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
348
349 let instance = MetasrvInstance::new(metasrv)
350 .await
351 .context(error::BuildMetaServerSnafu)?;
352
353 Ok(Instance::new(instance, guard))
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::io::Write;
360
361 use common_base::readable_size::ReadableSize;
362 use common_config::ENV_VAR_SEP;
363 use common_test_util::temp_dir::create_named_temp_file;
364 use meta_srv::selector::SelectorType;
365
366 use super::*;
367
368 #[test]
369 fn test_read_from_cmd() {
370 let cmd = StartCommand {
371 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
372 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
373 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
374 selector: Some("LoadBased".to_string()),
375 ..Default::default()
376 };
377
378 let options = cmd.load_options(&Default::default()).unwrap().component;
379 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
380 assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
381 assert_eq!(SelectorType::LoadBased, options.selector);
382 }
383
384 #[test]
385 fn test_read_from_config_file() {
386 let mut file = create_named_temp_file();
387 let toml_str = r#"
388 bind_addr = "127.0.0.1:3002"
389 server_addr = "127.0.0.1:3002"
390 store_addr = "127.0.0.1:2379"
391 selector = "LeaseBased"
392 use_memory_store = false
393
394 [logging]
395 level = "debug"
396 dir = "./greptimedb_data/test/logs"
397
398 [failure_detector]
399 threshold = 8.0
400 min_std_deviation = "100ms"
401 acceptable_heartbeat_pause = "3000ms"
402 first_heartbeat_estimate = "1000ms"
403 "#;
404 write!(file, "{}", toml_str).unwrap();
405
406 let cmd = StartCommand {
407 config_file: Some(file.path().to_str().unwrap().to_string()),
408 ..Default::default()
409 };
410
411 let options = cmd.load_options(&Default::default()).unwrap().component;
412 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
413 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
414 assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
415 assert_eq!(SelectorType::LeaseBased, options.selector);
416 assert_eq!("debug", options.logging.level.as_ref().unwrap());
417 assert_eq!(
418 "./greptimedb_data/test/logs".to_string(),
419 options.logging.dir
420 );
421 assert_eq!(8.0, options.failure_detector.threshold);
422 assert_eq!(
423 100.0,
424 options.failure_detector.min_std_deviation.as_millis() as f32
425 );
426 assert_eq!(
427 3000,
428 options
429 .failure_detector
430 .acceptable_heartbeat_pause
431 .as_millis()
432 );
433 assert_eq!(
434 1000,
435 options
436 .failure_detector
437 .first_heartbeat_estimate
438 .as_millis()
439 );
440 assert_eq!(
441 options.procedure.max_metadata_value_size,
442 Some(ReadableSize::kb(1500))
443 );
444 }
445
446 #[test]
447 fn test_load_log_options_from_cli() {
448 let cmd = StartCommand {
449 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
450 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
451 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
452 selector: Some("LoadBased".to_string()),
453 ..Default::default()
454 };
455
456 let options = cmd
457 .load_options(&GlobalOptions {
458 log_dir: Some("./greptimedb_data/test/logs".to_string()),
459 log_level: Some("debug".to_string()),
460
461 #[cfg(feature = "tokio-console")]
462 tokio_console_addr: None,
463 })
464 .unwrap()
465 .component;
466
467 let logging_opt = options.logging;
468 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
469 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
470 }
471
472 #[test]
473 fn test_config_precedence_order() {
474 let mut file = create_named_temp_file();
475 let toml_str = r#"
476 server_addr = "127.0.0.1:3002"
477 datanode_lease_secs = 15
478 selector = "LeaseBased"
479 use_memory_store = false
480
481 [http]
482 addr = "127.0.0.1:4000"
483
484 [logging]
485 level = "debug"
486 dir = "./greptimedb_data/test/logs"
487 "#;
488 write!(file, "{}", toml_str).unwrap();
489
490 let env_prefix = "METASRV_UT";
491 temp_env::with_vars(
492 [
493 (
494 [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
496 Some("127.0.0.1:14002"),
497 ),
498 (
499 [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
501 Some("127.0.0.1:13002"),
502 ),
503 (
504 [
506 env_prefix.to_string(),
507 "http".to_uppercase(),
508 "addr".to_uppercase(),
509 ]
510 .join(ENV_VAR_SEP),
511 Some("127.0.0.1:24000"),
512 ),
513 ],
514 || {
515 let command = StartCommand {
516 http_addr: Some("127.0.0.1:14000".to_string()),
517 config_file: Some(file.path().to_str().unwrap().to_string()),
518 env_prefix: env_prefix.to_string(),
519 ..Default::default()
520 };
521
522 let opts = command.load_options(&Default::default()).unwrap().component;
523
524 assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
526
527 assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
529
530 assert_eq!(opts.http.addr, "127.0.0.1:14000");
532
533 assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
535 },
536 );
537 }
538}