Skip to main content

cmd/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Debug};
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_meta::distributed_time_constants::init_distributed_time_constants;
24use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
25use common_telemetry::{info, warn};
26use common_version::{short_version, verbose_version};
27use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
28use meta_srv::metasrv::BackendImpl;
29use snafu::ResultExt;
30use tracing_appender::non_blocking::WorkerGuard;
31
32use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
33use crate::options::{GlobalOptions, GreptimeOptions};
34use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
35
36type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
37
38pub const APP_NAME: &str = "greptime-metasrv";
39
40pub struct Instance {
41    instance: MetasrvInstance,
42
43    // Keep the logging guard to prevent the worker from being dropped.
44    _guard: Vec<WorkerGuard>,
45}
46
47impl Instance {
48    pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
49        Self {
50            instance,
51            _guard: guard,
52        }
53    }
54
55    pub fn get_inner(&self) -> &MetasrvInstance {
56        &self.instance
57    }
58
59    pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
60        &mut self.instance
61    }
62}
63
64#[async_trait]
65impl App for Instance {
66    fn name(&self) -> &str {
67        APP_NAME
68    }
69
70    async fn start(&mut self) -> Result<()> {
71        plugins::start_metasrv_plugins(self.instance.plugins())
72            .await
73            .context(StartMetaServerSnafu)?;
74
75        self.instance.start().await.context(StartMetaServerSnafu)
76    }
77
78    async fn stop(&mut self) -> Result<()> {
79        self.instance
80            .shutdown()
81            .await
82            .context(error::ShutdownMetaServerSnafu)
83    }
84}
85
86#[derive(Parser)]
87pub struct Command {
88    #[clap(subcommand)]
89    subcmd: SubCommand,
90}
91
92impl Command {
93    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
94        self.subcmd.build(opts).await
95    }
96
97    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
98        self.subcmd.load_options(global_options)
99    }
100
101    pub fn config_file(&self) -> &Option<String> {
102        self.subcmd.config_file()
103    }
104
105    pub fn env_prefix(&self) -> &String {
106        self.subcmd.env_prefix()
107    }
108}
109
110#[derive(Parser)]
111enum SubCommand {
112    Start(StartCommand),
113}
114
115impl SubCommand {
116    async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
117        match self {
118            SubCommand::Start(cmd) => cmd.build(opts).await,
119        }
120    }
121
122    fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
123        match self {
124            SubCommand::Start(cmd) => cmd.load_options(global_options),
125        }
126    }
127
128    fn config_file(&self) -> &Option<String> {
129        match self {
130            SubCommand::Start(cmd) => &cmd.config_file,
131        }
132    }
133
134    fn env_prefix(&self) -> &String {
135        match self {
136            SubCommand::Start(cmd) => &cmd.env_prefix,
137        }
138    }
139}
140
141#[derive(Default, Parser)]
142pub struct StartCommand {
143    /// The address to bind the gRPC server.
144    #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "bind-addr")]
145    grpc_bind_addr: Option<String>,
146    /// The communication server address for the frontend and datanode to connect to metasrv.
147    /// If left empty or unset, the server will automatically use the IP address of the first network interface
148    /// on the host, with the same port number as the one specified in `grpc_bind_addr`.
149    #[clap(
150        long = "grpc-server-addr",
151        alias = "rpc-server-addr",
152        alias = "server-addr"
153    )]
154    grpc_server_addr: Option<String>,
155    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
156    store_addrs: Option<Vec<String>>,
157    #[clap(short, long)]
158    config_file: Option<String>,
159    #[clap(short, long)]
160    selector: Option<String>,
161    #[clap(long)]
162    enable_region_failover: Option<bool>,
163    #[clap(long)]
164    http_addr: Option<String>,
165    #[clap(long)]
166    http_timeout: Option<u64>,
167    #[clap(long, default_value = "GREPTIMEDB_METASRV")]
168    env_prefix: String,
169    /// The working home directory of this metasrv instance.
170    #[clap(long)]
171    data_home: Option<String>,
172    /// If it's not empty, the metasrv will store all data with this key prefix.
173    #[clap(long, default_value = "")]
174    store_key_prefix: String,
175    /// The max operations per txn
176    #[clap(long)]
177    max_txn_ops: Option<usize>,
178    /// The database backend.
179    #[clap(long, value_enum)]
180    backend: Option<BackendImpl>,
181}
182
183impl Debug for StartCommand {
184    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
185        f.debug_struct("StartCommand")
186            .field("grpc_bind_addr", &self.grpc_bind_addr)
187            .field("grpc_server_addr", &self.grpc_server_addr)
188            .field("store_addrs", &self.sanitize_store_addrs())
189            .field("config_file", &self.config_file)
190            .field("selector", &self.selector)
191            .field("enable_region_failover", &self.enable_region_failover)
192            .field("http_addr", &self.http_addr)
193            .field("http_timeout", &self.http_timeout)
194            .field("env_prefix", &self.env_prefix)
195            .field("data_home", &self.data_home)
196            .field("store_key_prefix", &self.store_key_prefix)
197            .field("max_txn_ops", &self.max_txn_ops)
198            .field("backend", &self.backend)
199            .finish()
200    }
201}
202
203impl StartCommand {
204    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
205        let mut opts = MetasrvOptions::load_layered_options(
206            self.config_file.as_deref(),
207            self.env_prefix.as_ref(),
208        )
209        .context(LoadLayeredConfigSnafu)?;
210
211        self.merge_with_cli_options(global_options, &mut opts)?;
212
213        Ok(opts)
214    }
215
216    fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
217        self.store_addrs.as_ref().map(|addrs| {
218            addrs
219                .iter()
220                .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
221                .collect()
222        })
223    }
224
225    // The precedence order is: cli > config file > environment variables > default values.
226    fn merge_with_cli_options(
227        &self,
228        global_options: &GlobalOptions,
229        opts: &mut MetasrvOptions,
230    ) -> Result<()> {
231        let opts = &mut opts.component;
232
233        if let Some(dir) = &global_options.log_dir {
234            opts.logging.dir.clone_from(dir);
235        }
236
237        if global_options.log_level.is_some() {
238            opts.logging.level.clone_from(&global_options.log_level);
239        }
240
241        opts.tracing = TracingOptions {
242            #[cfg(feature = "tokio-console")]
243            tokio_console_addr: global_options.tokio_console_addr.clone(),
244        };
245
246        #[allow(deprecated)]
247        if let Some(addr) = &self.grpc_bind_addr {
248            opts.bind_addr.clone_from(addr);
249            opts.grpc.bind_addr.clone_from(addr);
250        } else if !opts.bind_addr.is_empty() {
251            warn!(
252                "Use the deprecated attribute `MetasrvOptions.bind_addr`, please use `grpc.bind_addr` instead."
253            );
254            opts.grpc.bind_addr.clone_from(&opts.bind_addr);
255        }
256
257        #[allow(deprecated)]
258        if let Some(addr) = &self.grpc_server_addr {
259            opts.server_addr.clone_from(addr);
260            opts.grpc.server_addr.clone_from(addr);
261        } else if !opts.server_addr.is_empty() {
262            warn!(
263                "Use the deprecated attribute `MetasrvOptions.server_addr`, please use `grpc.server_addr` instead."
264            );
265            opts.grpc.server_addr.clone_from(&opts.server_addr);
266        }
267
268        if let Some(addrs) = &self.store_addrs {
269            opts.store_addrs.clone_from(addrs);
270        }
271
272        if let Some(selector_type) = &self.selector {
273            opts.selector = selector_type[..]
274                .try_into()
275                .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
276        }
277
278        if let Some(enable_region_failover) = self.enable_region_failover {
279            opts.enable_region_failover = enable_region_failover;
280        }
281
282        if let Some(http_addr) = &self.http_addr {
283            opts.http.addr.clone_from(http_addr);
284        }
285
286        if let Some(http_timeout) = self.http_timeout {
287            opts.http.timeout = Duration::from_secs(http_timeout);
288        }
289
290        if let Some(data_home) = &self.data_home {
291            opts.data_home.clone_from(data_home);
292        }
293
294        // If the logging dir is not set, use the default logs dir in the data home.
295        if opts.logging.dir.is_empty() {
296            opts.logging.dir = Path::new(&opts.data_home)
297                .join(DEFAULT_LOGGING_DIR)
298                .to_string_lossy()
299                .to_string();
300        }
301
302        if !self.store_key_prefix.is_empty() {
303            opts.store_key_prefix.clone_from(&self.store_key_prefix)
304        }
305
306        if let Some(max_txn_ops) = self.max_txn_ops {
307            opts.max_txn_ops = max_txn_ops;
308        }
309
310        if let Some(backend) = &self.backend {
311            opts.backend.clone_from(backend);
312        }
313
314        // Disable dashboard in metasrv.
315        opts.http.disable_dashboard = true;
316
317        Ok(())
318    }
319
320    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
321        common_runtime::init_global_runtimes(&opts.runtime);
322
323        let guard = common_telemetry::init_global_logging(
324            APP_NAME,
325            &opts.component.logging,
326            &opts.component.tracing,
327            None,
328            None,
329        );
330
331        log_versions(verbose_version(), short_version(), APP_NAME);
332        maybe_activate_heap_profile(&opts.component.memory);
333        create_resource_limit_metrics(APP_NAME);
334        init_distributed_time_constants(opts.component.heartbeat_interval);
335
336        info!("Metasrv start command: {:#?}", self);
337
338        let plugin_opts = opts.plugins;
339        let mut opts = opts.component;
340        opts.grpc.detect_server_addr();
341
342        info!("Metasrv options: {:#?}", opts);
343
344        let mut plugins = Plugins::new();
345        plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
346            .await
347            .context(StartMetaServerSnafu)?;
348
349        let builder = metasrv_builder(&opts, plugins, None)
350            .await
351            .context(error::BuildMetaServerSnafu)?;
352        let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
353
354        let instance = MetasrvInstance::new(metasrv)
355            .await
356            .context(error::BuildMetaServerSnafu)?;
357
358        Ok(Instance::new(instance, guard))
359    }
360}
361
362#[cfg(test)]
363mod tests {
364    use std::io::Write;
365
366    use clap::{CommandFactory, Parser};
367    use common_base::readable_size::ReadableSize;
368    use common_config::ENV_VAR_SEP;
369    use common_test_util::temp_dir::create_named_temp_file;
370    use meta_srv::selector::SelectorType;
371
372    use super::*;
373
374    #[test]
375    fn test_read_from_cmd() {
376        let cmd = StartCommand {
377            grpc_bind_addr: Some("127.0.0.1:3002".to_string()),
378            grpc_server_addr: Some("127.0.0.1:3002".to_string()),
379            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
380            selector: Some("LoadBased".to_string()),
381            ..Default::default()
382        };
383
384        let options = cmd.load_options(&Default::default()).unwrap().component;
385        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
386        assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
387        assert_eq!(SelectorType::LoadBased, options.selector);
388    }
389
390    #[test]
391    fn test_read_from_config_file() {
392        let mut file = create_named_temp_file();
393        let toml_str = r#"
394            bind_addr = "127.0.0.1:3002"
395            server_addr = "127.0.0.1:3002"
396            store_addr = "127.0.0.1:2379"
397            selector = "LeaseBased"
398
399            [logging]
400            level = "debug"
401            dir = "./greptimedb_data/test/logs"
402
403            [failure_detector]
404            threshold = 8.0
405            min_std_deviation = "100ms"
406            acceptable_heartbeat_pause = "3000ms"
407        "#;
408        write!(file, "{}", toml_str).unwrap();
409
410        let cmd = StartCommand {
411            config_file: Some(file.path().to_str().unwrap().to_string()),
412            ..Default::default()
413        };
414
415        let options = cmd.load_options(&Default::default()).unwrap().component;
416        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
417        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
418        assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
419        assert_eq!(SelectorType::LeaseBased, options.selector);
420        assert_eq!("debug", options.logging.level.as_ref().unwrap());
421        assert_eq!(
422            "./greptimedb_data/test/logs".to_string(),
423            options.logging.dir
424        );
425        assert_eq!(8.0, options.failure_detector.threshold);
426        assert_eq!(
427            100.0,
428            options.failure_detector.min_std_deviation.as_millis() as f32
429        );
430        assert_eq!(
431            3000,
432            options
433                .failure_detector
434                .acceptable_heartbeat_pause
435                .as_millis()
436        );
437        assert_eq!(
438            options.procedure.max_metadata_value_size,
439            Some(ReadableSize::kb(1500))
440        );
441    }
442
443    #[test]
444    fn test_load_log_options_from_cli() {
445        let cmd = StartCommand {
446            grpc_bind_addr: Some("127.0.0.1:3002".to_string()),
447            grpc_server_addr: Some("127.0.0.1:3002".to_string()),
448            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
449            selector: Some("LoadBased".to_string()),
450            ..Default::default()
451        };
452
453        let options = cmd
454            .load_options(&GlobalOptions {
455                log_dir: Some("./greptimedb_data/test/logs".to_string()),
456                log_level: Some("debug".to_string()),
457
458                #[cfg(feature = "tokio-console")]
459                tokio_console_addr: None,
460            })
461            .unwrap()
462            .component;
463
464        let logging_opt = options.logging;
465        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
466        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
467    }
468
469    #[test]
470    fn test_config_precedence_order() {
471        let mut file = create_named_temp_file();
472        let toml_str = r#"
473            server_addr = "127.0.0.1:3002"
474            datanode_lease_secs = 15
475            selector = "LeaseBased"
476
477            [http]
478            addr = "127.0.0.1:4000"
479
480            [logging]
481            level = "debug"
482            dir = "./greptimedb_data/test/logs"
483        "#;
484        write!(file, "{}", toml_str).unwrap();
485
486        let env_prefix = "METASRV_UT";
487        temp_env::with_vars(
488            [
489                (
490                    // bind_addr = 127.0.0.1:14002
491                    [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
492                    Some("127.0.0.1:14002"),
493                ),
494                (
495                    // server_addr = 127.0.0.1:13002
496                    [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
497                    Some("127.0.0.1:13002"),
498                ),
499                (
500                    // http.addr = 127.0.0.1:24000
501                    [
502                        env_prefix.to_string(),
503                        "http".to_uppercase(),
504                        "addr".to_uppercase(),
505                    ]
506                    .join(ENV_VAR_SEP),
507                    Some("127.0.0.1:24000"),
508                ),
509            ],
510            || {
511                let command = StartCommand {
512                    http_addr: Some("127.0.0.1:14000".to_string()),
513                    config_file: Some(file.path().to_str().unwrap().to_string()),
514                    env_prefix: env_prefix.to_string(),
515                    ..Default::default()
516                };
517
518                let opts = command.load_options(&Default::default()).unwrap().component;
519
520                // Should be read from env, env > default values.
521                assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
522
523                // Should be read from config file, config file > env > default values.
524                assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
525
526                // Should be read from cli, cli > config file > env > default values.
527                assert_eq!(opts.http.addr, "127.0.0.1:14000");
528
529                // Should be default value.
530                assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
531            },
532        );
533    }
534
535    #[test]
536    fn test_parse_grpc_cli_aliases() {
537        let command = StartCommand::try_parse_from([
538            "metasrv",
539            "--grpc-bind-addr",
540            "127.0.0.1:13002",
541            "--grpc-server-addr",
542            "10.0.0.1:13002",
543        ])
544        .unwrap();
545        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:13002"));
546        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:13002"));
547
548        let command = StartCommand::try_parse_from([
549            "metasrv",
550            "--rpc-bind-addr",
551            "127.0.0.1:23002",
552            "--rpc-server-addr",
553            "10.0.0.2:23002",
554        ])
555        .unwrap();
556        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:23002"));
557        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:23002"));
558
559        let command = StartCommand::try_parse_from([
560            "metasrv",
561            "--bind-addr",
562            "127.0.0.1:33002",
563            "--server-addr",
564            "10.0.0.3:33002",
565        ])
566        .unwrap();
567        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:33002"));
568        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:33002"));
569    }
570
571    #[test]
572    fn test_help_uses_grpc_option_names() {
573        let mut cmd = StartCommand::command();
574        let mut help = Vec::new();
575        cmd.write_long_help(&mut help).unwrap();
576        let help = String::from_utf8(help).unwrap();
577
578        assert!(help.contains("--grpc-bind-addr"));
579        assert!(help.contains("--grpc-server-addr"));
580        assert!(!help.contains("--rpc-bind-addr"));
581        assert!(!help.contains("--rpc-server-addr"));
582        assert!(!help.contains("--bind-addr"));
583        assert!(!help.contains("--server-addr"));
584    }
585}