1use std::fmt;
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_telemetry::info;
24use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
25use common_version::{short_version, verbose_version};
26use meta_srv::bootstrap::MetasrvInstance;
27use meta_srv::metasrv::BackendImpl;
28use snafu::ResultExt;
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
32use crate::options::{GlobalOptions, GreptimeOptions};
33use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
34
35type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
36
37pub const APP_NAME: &str = "greptime-metasrv";
38
39pub struct Instance {
40 instance: MetasrvInstance,
41
42 _guard: Vec<WorkerGuard>,
44}
45
46impl Instance {
47 pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
48 Self {
49 instance,
50 _guard: guard,
51 }
52 }
53
54 pub fn get_inner(&self) -> &MetasrvInstance {
55 &self.instance
56 }
57
58 pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
59 &mut self.instance
60 }
61}
62
63#[async_trait]
64impl App for Instance {
65 fn name(&self) -> &str {
66 APP_NAME
67 }
68
69 async fn start(&mut self) -> Result<()> {
70 plugins::start_metasrv_plugins(self.instance.plugins())
71 .await
72 .context(StartMetaServerSnafu)?;
73
74 self.instance.start().await.context(StartMetaServerSnafu)
75 }
76
77 async fn stop(&mut self) -> Result<()> {
78 self.instance
79 .shutdown()
80 .await
81 .context(error::ShutdownMetaServerSnafu)
82 }
83}
84
85#[derive(Parser)]
86pub struct Command {
87 #[clap(subcommand)]
88 subcmd: SubCommand,
89}
90
91impl Command {
92 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
93 self.subcmd.build(opts).await
94 }
95
96 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
97 self.subcmd.load_options(global_options)
98 }
99
100 pub fn config_file(&self) -> &Option<String> {
101 self.subcmd.config_file()
102 }
103
104 pub fn env_prefix(&self) -> &String {
105 self.subcmd.env_prefix()
106 }
107}
108
109#[derive(Parser)]
110enum SubCommand {
111 Start(StartCommand),
112}
113
114impl SubCommand {
115 async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
116 match self {
117 SubCommand::Start(cmd) => cmd.build(opts).await,
118 }
119 }
120
121 fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
122 match self {
123 SubCommand::Start(cmd) => cmd.load_options(global_options),
124 }
125 }
126
127 fn config_file(&self) -> &Option<String> {
128 match self {
129 SubCommand::Start(cmd) => &cmd.config_file,
130 }
131 }
132
133 fn env_prefix(&self) -> &String {
134 match self {
135 SubCommand::Start(cmd) => &cmd.env_prefix,
136 }
137 }
138}
139
140#[derive(Default, Parser)]
141pub struct StartCommand {
142 #[clap(long, alias = "bind-addr")]
144 rpc_bind_addr: Option<String>,
145 #[clap(long, alias = "server-addr")]
149 rpc_server_addr: Option<String>,
150 #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
151 store_addrs: Option<Vec<String>>,
152 #[clap(short, long)]
153 config_file: Option<String>,
154 #[clap(short, long)]
155 selector: Option<String>,
156 #[clap(long)]
157 use_memory_store: Option<bool>,
158 #[clap(long)]
159 enable_region_failover: Option<bool>,
160 #[clap(long)]
161 http_addr: Option<String>,
162 #[clap(long)]
163 http_timeout: Option<u64>,
164 #[clap(long, default_value = "GREPTIMEDB_METASRV")]
165 env_prefix: String,
166 #[clap(long)]
168 data_home: Option<String>,
169 #[clap(long, default_value = "")]
171 store_key_prefix: String,
172 #[clap(long)]
174 max_txn_ops: Option<usize>,
175 #[clap(long, value_enum)]
177 backend: Option<BackendImpl>,
178}
179
180impl fmt::Debug for StartCommand {
181 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182 f.debug_struct("StartCommand")
183 .field("rpc_bind_addr", &self.rpc_bind_addr)
184 .field("rpc_server_addr", &self.rpc_server_addr)
185 .field("store_addrs", &self.sanitize_store_addrs())
186 .field("config_file", &self.config_file)
187 .field("selector", &self.selector)
188 .field("use_memory_store", &self.use_memory_store)
189 .field("enable_region_failover", &self.enable_region_failover)
190 .field("http_addr", &self.http_addr)
191 .field("http_timeout", &self.http_timeout)
192 .field("env_prefix", &self.env_prefix)
193 .field("data_home", &self.data_home)
194 .field("store_key_prefix", &self.store_key_prefix)
195 .field("max_txn_ops", &self.max_txn_ops)
196 .field("backend", &self.backend)
197 .finish()
198 }
199}
200
201impl StartCommand {
202 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
203 let mut opts = MetasrvOptions::load_layered_options(
204 self.config_file.as_deref(),
205 self.env_prefix.as_ref(),
206 )
207 .context(LoadLayeredConfigSnafu)?;
208
209 self.merge_with_cli_options(global_options, &mut opts)?;
210
211 Ok(opts)
212 }
213
214 fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
215 self.store_addrs.as_ref().map(|addrs| {
216 addrs
217 .iter()
218 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
219 .collect()
220 })
221 }
222
223 fn merge_with_cli_options(
225 &self,
226 global_options: &GlobalOptions,
227 opts: &mut MetasrvOptions,
228 ) -> Result<()> {
229 let opts = &mut opts.component;
230
231 if let Some(dir) = &global_options.log_dir {
232 opts.logging.dir.clone_from(dir);
233 }
234
235 if global_options.log_level.is_some() {
236 opts.logging.level.clone_from(&global_options.log_level);
237 }
238
239 opts.tracing = TracingOptions {
240 #[cfg(feature = "tokio-console")]
241 tokio_console_addr: global_options.tokio_console_addr.clone(),
242 };
243
244 #[allow(deprecated)]
245 if let Some(addr) = &self.rpc_bind_addr {
246 opts.bind_addr.clone_from(addr);
247 opts.grpc.bind_addr.clone_from(addr);
248 } else if !opts.bind_addr.is_empty() {
249 opts.grpc.bind_addr.clone_from(&opts.bind_addr);
250 }
251
252 #[allow(deprecated)]
253 if let Some(addr) = &self.rpc_server_addr {
254 opts.server_addr.clone_from(addr);
255 opts.grpc.server_addr.clone_from(addr);
256 } else if !opts.server_addr.is_empty() {
257 opts.grpc.server_addr.clone_from(&opts.server_addr);
258 }
259
260 if let Some(addrs) = &self.store_addrs {
261 opts.store_addrs.clone_from(addrs);
262 }
263
264 if let Some(selector_type) = &self.selector {
265 opts.selector = selector_type[..]
266 .try_into()
267 .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
268 }
269
270 if let Some(use_memory_store) = self.use_memory_store {
271 opts.use_memory_store = use_memory_store;
272 }
273
274 if let Some(enable_region_failover) = self.enable_region_failover {
275 opts.enable_region_failover = enable_region_failover;
276 }
277
278 if let Some(http_addr) = &self.http_addr {
279 opts.http.addr.clone_from(http_addr);
280 }
281
282 if let Some(http_timeout) = self.http_timeout {
283 opts.http.timeout = Duration::from_secs(http_timeout);
284 }
285
286 if let Some(data_home) = &self.data_home {
287 opts.data_home.clone_from(data_home);
288 }
289
290 if opts.logging.dir.is_empty() {
292 opts.logging.dir = Path::new(&opts.data_home)
293 .join(DEFAULT_LOGGING_DIR)
294 .to_string_lossy()
295 .to_string();
296 }
297
298 if !self.store_key_prefix.is_empty() {
299 opts.store_key_prefix.clone_from(&self.store_key_prefix)
300 }
301
302 if let Some(max_txn_ops) = self.max_txn_ops {
303 opts.max_txn_ops = max_txn_ops;
304 }
305
306 if let Some(backend) = &self.backend {
307 opts.backend.clone_from(backend);
308 }
309
310 opts.http.disable_dashboard = true;
312
313 Ok(())
314 }
315
316 pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
317 common_runtime::init_global_runtimes(&opts.runtime);
318
319 let guard = common_telemetry::init_global_logging(
320 APP_NAME,
321 &opts.component.logging,
322 &opts.component.tracing,
323 None,
324 None,
325 );
326
327 log_versions(verbose_version(), short_version(), APP_NAME);
328 maybe_activate_heap_profile(&opts.component.memory);
329 create_resource_limit_metrics(APP_NAME);
330
331 info!("Metasrv start command: {:#?}", self);
332
333 let plugin_opts = opts.plugins;
334 let mut opts = opts.component;
335 opts.grpc.detect_server_addr();
336
337 info!("Metasrv options: {:#?}", opts);
338
339 let mut plugins = Plugins::new();
340 plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
341 .await
342 .context(StartMetaServerSnafu)?;
343
344 let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
345 .await
346 .context(error::BuildMetaServerSnafu)?;
347 let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
348
349 let instance = MetasrvInstance::new(metasrv)
350 .await
351 .context(error::BuildMetaServerSnafu)?;
352
353 Ok(Instance::new(instance, guard))
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::io::Write;
360
361 use common_base::readable_size::ReadableSize;
362 use common_config::ENV_VAR_SEP;
363 use common_test_util::temp_dir::create_named_temp_file;
364 use meta_srv::selector::SelectorType;
365
366 use super::*;
367
368 #[test]
369 fn test_read_from_cmd() {
370 let cmd = StartCommand {
371 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
372 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
373 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
374 selector: Some("LoadBased".to_string()),
375 ..Default::default()
376 };
377
378 let options = cmd.load_options(&Default::default()).unwrap().component;
379 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
380 assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
381 assert_eq!(SelectorType::LoadBased, options.selector);
382 }
383
384 #[test]
385 fn test_read_from_config_file() {
386 let mut file = create_named_temp_file();
387 let toml_str = r#"
388 bind_addr = "127.0.0.1:3002"
389 server_addr = "127.0.0.1:3002"
390 store_addr = "127.0.0.1:2379"
391 selector = "LeaseBased"
392 use_memory_store = false
393
394 [logging]
395 level = "debug"
396 dir = "./greptimedb_data/test/logs"
397
398 [failure_detector]
399 threshold = 8.0
400 min_std_deviation = "100ms"
401 acceptable_heartbeat_pause = "3000ms"
402 "#;
403 write!(file, "{}", toml_str).unwrap();
404
405 let cmd = StartCommand {
406 config_file: Some(file.path().to_str().unwrap().to_string()),
407 ..Default::default()
408 };
409
410 let options = cmd.load_options(&Default::default()).unwrap().component;
411 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
412 assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
413 assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
414 assert_eq!(SelectorType::LeaseBased, options.selector);
415 assert_eq!("debug", options.logging.level.as_ref().unwrap());
416 assert_eq!(
417 "./greptimedb_data/test/logs".to_string(),
418 options.logging.dir
419 );
420 assert_eq!(8.0, options.failure_detector.threshold);
421 assert_eq!(
422 100.0,
423 options.failure_detector.min_std_deviation.as_millis() as f32
424 );
425 assert_eq!(
426 3000,
427 options
428 .failure_detector
429 .acceptable_heartbeat_pause
430 .as_millis()
431 );
432 assert_eq!(
433 options.procedure.max_metadata_value_size,
434 Some(ReadableSize::kb(1500))
435 );
436 }
437
438 #[test]
439 fn test_load_log_options_from_cli() {
440 let cmd = StartCommand {
441 rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
442 rpc_server_addr: Some("127.0.0.1:3002".to_string()),
443 store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
444 selector: Some("LoadBased".to_string()),
445 ..Default::default()
446 };
447
448 let options = cmd
449 .load_options(&GlobalOptions {
450 log_dir: Some("./greptimedb_data/test/logs".to_string()),
451 log_level: Some("debug".to_string()),
452
453 #[cfg(feature = "tokio-console")]
454 tokio_console_addr: None,
455 })
456 .unwrap()
457 .component;
458
459 let logging_opt = options.logging;
460 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
461 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
462 }
463
464 #[test]
465 fn test_config_precedence_order() {
466 let mut file = create_named_temp_file();
467 let toml_str = r#"
468 server_addr = "127.0.0.1:3002"
469 datanode_lease_secs = 15
470 selector = "LeaseBased"
471 use_memory_store = false
472
473 [http]
474 addr = "127.0.0.1:4000"
475
476 [logging]
477 level = "debug"
478 dir = "./greptimedb_data/test/logs"
479 "#;
480 write!(file, "{}", toml_str).unwrap();
481
482 let env_prefix = "METASRV_UT";
483 temp_env::with_vars(
484 [
485 (
486 [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
488 Some("127.0.0.1:14002"),
489 ),
490 (
491 [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
493 Some("127.0.0.1:13002"),
494 ),
495 (
496 [
498 env_prefix.to_string(),
499 "http".to_uppercase(),
500 "addr".to_uppercase(),
501 ]
502 .join(ENV_VAR_SEP),
503 Some("127.0.0.1:24000"),
504 ),
505 ],
506 || {
507 let command = StartCommand {
508 http_addr: Some("127.0.0.1:14000".to_string()),
509 config_file: Some(file.path().to_str().unwrap().to_string()),
510 env_prefix: env_prefix.to_string(),
511 ..Default::default()
512 };
513
514 let opts = command.load_options(&Default::default()).unwrap().component;
515
516 assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
518
519 assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
521
522 assert_eq!(opts.http.addr, "127.0.0.1:14000");
524
525 assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
527 },
528 );
529 }
530}