cmd/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Debug};
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_meta::distributed_time_constants::init_distributed_time_constants;
24use common_telemetry::info;
25use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
26use common_version::{short_version, verbose_version};
27use meta_srv::bootstrap::{MetasrvInstance, metasrv_builder};
28use meta_srv::metasrv::BackendImpl;
29use snafu::ResultExt;
30use tracing_appender::non_blocking::WorkerGuard;
31
32use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
33use crate::options::{GlobalOptions, GreptimeOptions};
34use crate::{App, create_resource_limit_metrics, log_versions, maybe_activate_heap_profile};
35
36type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
37
38pub const APP_NAME: &str = "greptime-metasrv";
39
40pub struct Instance {
41    instance: MetasrvInstance,
42
43    // Keep the logging guard to prevent the worker from being dropped.
44    _guard: Vec<WorkerGuard>,
45}
46
47impl Instance {
48    pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
49        Self {
50            instance,
51            _guard: guard,
52        }
53    }
54
55    pub fn get_inner(&self) -> &MetasrvInstance {
56        &self.instance
57    }
58
59    pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
60        &mut self.instance
61    }
62}
63
64#[async_trait]
65impl App for Instance {
66    fn name(&self) -> &str {
67        APP_NAME
68    }
69
70    async fn start(&mut self) -> Result<()> {
71        plugins::start_metasrv_plugins(self.instance.plugins())
72            .await
73            .context(StartMetaServerSnafu)?;
74
75        self.instance.start().await.context(StartMetaServerSnafu)
76    }
77
78    async fn stop(&mut self) -> Result<()> {
79        self.instance
80            .shutdown()
81            .await
82            .context(error::ShutdownMetaServerSnafu)
83    }
84}
85
86#[derive(Parser)]
87pub struct Command {
88    #[clap(subcommand)]
89    subcmd: SubCommand,
90}
91
92impl Command {
93    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
94        self.subcmd.build(opts).await
95    }
96
97    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
98        self.subcmd.load_options(global_options)
99    }
100
101    pub fn config_file(&self) -> &Option<String> {
102        self.subcmd.config_file()
103    }
104
105    pub fn env_prefix(&self) -> &String {
106        self.subcmd.env_prefix()
107    }
108}
109
110#[derive(Parser)]
111enum SubCommand {
112    Start(StartCommand),
113}
114
115impl SubCommand {
116    async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
117        match self {
118            SubCommand::Start(cmd) => cmd.build(opts).await,
119        }
120    }
121
122    fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
123        match self {
124            SubCommand::Start(cmd) => cmd.load_options(global_options),
125        }
126    }
127
128    fn config_file(&self) -> &Option<String> {
129        match self {
130            SubCommand::Start(cmd) => &cmd.config_file,
131        }
132    }
133
134    fn env_prefix(&self) -> &String {
135        match self {
136            SubCommand::Start(cmd) => &cmd.env_prefix,
137        }
138    }
139}
140
141#[derive(Default, Parser)]
142pub struct StartCommand {
143    /// The address to bind the gRPC server.
144    #[clap(long, alias = "bind-addr")]
145    rpc_bind_addr: Option<String>,
146    /// The communication server address for the frontend and datanode to connect to metasrv.
147    /// If left empty or unset, the server will automatically use the IP address of the first network interface
148    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
149    #[clap(long, alias = "server-addr")]
150    rpc_server_addr: Option<String>,
151    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
152    store_addrs: Option<Vec<String>>,
153    #[clap(short, long)]
154    config_file: Option<String>,
155    #[clap(short, long)]
156    selector: Option<String>,
157    #[clap(long)]
158    enable_region_failover: Option<bool>,
159    #[clap(long)]
160    http_addr: Option<String>,
161    #[clap(long)]
162    http_timeout: Option<u64>,
163    #[clap(long, default_value = "GREPTIMEDB_METASRV")]
164    env_prefix: String,
165    /// The working home directory of this metasrv instance.
166    #[clap(long)]
167    data_home: Option<String>,
168    /// If it's not empty, the metasrv will store all data with this key prefix.
169    #[clap(long, default_value = "")]
170    store_key_prefix: String,
171    /// The max operations per txn
172    #[clap(long)]
173    max_txn_ops: Option<usize>,
174    /// The database backend.
175    #[clap(long, value_enum)]
176    backend: Option<BackendImpl>,
177}
178
179impl Debug for StartCommand {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        f.debug_struct("StartCommand")
182            .field("rpc_bind_addr", &self.rpc_bind_addr)
183            .field("rpc_server_addr", &self.rpc_server_addr)
184            .field("store_addrs", &self.sanitize_store_addrs())
185            .field("config_file", &self.config_file)
186            .field("selector", &self.selector)
187            .field("enable_region_failover", &self.enable_region_failover)
188            .field("http_addr", &self.http_addr)
189            .field("http_timeout", &self.http_timeout)
190            .field("env_prefix", &self.env_prefix)
191            .field("data_home", &self.data_home)
192            .field("store_key_prefix", &self.store_key_prefix)
193            .field("max_txn_ops", &self.max_txn_ops)
194            .field("backend", &self.backend)
195            .finish()
196    }
197}
198
199impl StartCommand {
200    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
201        let mut opts = MetasrvOptions::load_layered_options(
202            self.config_file.as_deref(),
203            self.env_prefix.as_ref(),
204        )
205        .context(LoadLayeredConfigSnafu)?;
206
207        self.merge_with_cli_options(global_options, &mut opts)?;
208
209        Ok(opts)
210    }
211
212    fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
213        self.store_addrs.as_ref().map(|addrs| {
214            addrs
215                .iter()
216                .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
217                .collect()
218        })
219    }
220
221    // The precedence order is: cli > config file > environment variables > default values.
222    fn merge_with_cli_options(
223        &self,
224        global_options: &GlobalOptions,
225        opts: &mut MetasrvOptions,
226    ) -> Result<()> {
227        let opts = &mut opts.component;
228
229        if let Some(dir) = &global_options.log_dir {
230            opts.logging.dir.clone_from(dir);
231        }
232
233        if global_options.log_level.is_some() {
234            opts.logging.level.clone_from(&global_options.log_level);
235        }
236
237        opts.tracing = TracingOptions {
238            #[cfg(feature = "tokio-console")]
239            tokio_console_addr: global_options.tokio_console_addr.clone(),
240        };
241
242        #[allow(deprecated)]
243        if let Some(addr) = &self.rpc_bind_addr {
244            opts.bind_addr.clone_from(addr);
245            opts.grpc.bind_addr.clone_from(addr);
246        } else if !opts.bind_addr.is_empty() {
247            opts.grpc.bind_addr.clone_from(&opts.bind_addr);
248        }
249
250        #[allow(deprecated)]
251        if let Some(addr) = &self.rpc_server_addr {
252            opts.server_addr.clone_from(addr);
253            opts.grpc.server_addr.clone_from(addr);
254        } else if !opts.server_addr.is_empty() {
255            opts.grpc.server_addr.clone_from(&opts.server_addr);
256        }
257
258        if let Some(addrs) = &self.store_addrs {
259            opts.store_addrs.clone_from(addrs);
260        }
261
262        if let Some(selector_type) = &self.selector {
263            opts.selector = selector_type[..]
264                .try_into()
265                .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
266        }
267
268        if let Some(enable_region_failover) = self.enable_region_failover {
269            opts.enable_region_failover = enable_region_failover;
270        }
271
272        if let Some(http_addr) = &self.http_addr {
273            opts.http.addr.clone_from(http_addr);
274        }
275
276        if let Some(http_timeout) = self.http_timeout {
277            opts.http.timeout = Duration::from_secs(http_timeout);
278        }
279
280        if let Some(data_home) = &self.data_home {
281            opts.data_home.clone_from(data_home);
282        }
283
284        // If the logging dir is not set, use the default logs dir in the data home.
285        if opts.logging.dir.is_empty() {
286            opts.logging.dir = Path::new(&opts.data_home)
287                .join(DEFAULT_LOGGING_DIR)
288                .to_string_lossy()
289                .to_string();
290        }
291
292        if !self.store_key_prefix.is_empty() {
293            opts.store_key_prefix.clone_from(&self.store_key_prefix)
294        }
295
296        if let Some(max_txn_ops) = self.max_txn_ops {
297            opts.max_txn_ops = max_txn_ops;
298        }
299
300        if let Some(backend) = &self.backend {
301            opts.backend.clone_from(backend);
302        }
303
304        // Disable dashboard in metasrv.
305        opts.http.disable_dashboard = true;
306
307        Ok(())
308    }
309
310    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
311        common_runtime::init_global_runtimes(&opts.runtime);
312
313        let guard = common_telemetry::init_global_logging(
314            APP_NAME,
315            &opts.component.logging,
316            &opts.component.tracing,
317            None,
318            None,
319        );
320
321        log_versions(verbose_version(), short_version(), APP_NAME);
322        maybe_activate_heap_profile(&opts.component.memory);
323        create_resource_limit_metrics(APP_NAME);
324        init_distributed_time_constants(opts.component.heartbeat_interval);
325
326        info!("Metasrv start command: {:#?}", self);
327
328        let plugin_opts = opts.plugins;
329        let mut opts = opts.component;
330        opts.grpc.detect_server_addr();
331
332        info!("Metasrv options: {:#?}", opts);
333
334        let mut plugins = Plugins::new();
335        plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
336            .await
337            .context(StartMetaServerSnafu)?;
338
339        let builder = metasrv_builder(&opts, plugins, None)
340            .await
341            .context(error::BuildMetaServerSnafu)?;
342        let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
343
344        let instance = MetasrvInstance::new(metasrv)
345            .await
346            .context(error::BuildMetaServerSnafu)?;
347
348        Ok(Instance::new(instance, guard))
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use std::io::Write;
355
356    use common_base::readable_size::ReadableSize;
357    use common_config::ENV_VAR_SEP;
358    use common_test_util::temp_dir::create_named_temp_file;
359    use meta_srv::selector::SelectorType;
360
361    use super::*;
362
363    #[test]
364    fn test_read_from_cmd() {
365        let cmd = StartCommand {
366            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
367            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
368            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
369            selector: Some("LoadBased".to_string()),
370            ..Default::default()
371        };
372
373        let options = cmd.load_options(&Default::default()).unwrap().component;
374        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
375        assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
376        assert_eq!(SelectorType::LoadBased, options.selector);
377    }
378
379    #[test]
380    fn test_read_from_config_file() {
381        let mut file = create_named_temp_file();
382        let toml_str = r#"
383            bind_addr = "127.0.0.1:3002"
384            server_addr = "127.0.0.1:3002"
385            store_addr = "127.0.0.1:2379"
386            selector = "LeaseBased"
387
388            [logging]
389            level = "debug"
390            dir = "./greptimedb_data/test/logs"
391
392            [failure_detector]
393            threshold = 8.0
394            min_std_deviation = "100ms"
395            acceptable_heartbeat_pause = "3000ms"
396        "#;
397        write!(file, "{}", toml_str).unwrap();
398
399        let cmd = StartCommand {
400            config_file: Some(file.path().to_str().unwrap().to_string()),
401            ..Default::default()
402        };
403
404        let options = cmd.load_options(&Default::default()).unwrap().component;
405        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
406        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
407        assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
408        assert_eq!(SelectorType::LeaseBased, options.selector);
409        assert_eq!("debug", options.logging.level.as_ref().unwrap());
410        assert_eq!(
411            "./greptimedb_data/test/logs".to_string(),
412            options.logging.dir
413        );
414        assert_eq!(8.0, options.failure_detector.threshold);
415        assert_eq!(
416            100.0,
417            options.failure_detector.min_std_deviation.as_millis() as f32
418        );
419        assert_eq!(
420            3000,
421            options
422                .failure_detector
423                .acceptable_heartbeat_pause
424                .as_millis()
425        );
426        assert_eq!(
427            options.procedure.max_metadata_value_size,
428            Some(ReadableSize::kb(1500))
429        );
430    }
431
432    #[test]
433    fn test_load_log_options_from_cli() {
434        let cmd = StartCommand {
435            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
436            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
437            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
438            selector: Some("LoadBased".to_string()),
439            ..Default::default()
440        };
441
442        let options = cmd
443            .load_options(&GlobalOptions {
444                log_dir: Some("./greptimedb_data/test/logs".to_string()),
445                log_level: Some("debug".to_string()),
446
447                #[cfg(feature = "tokio-console")]
448                tokio_console_addr: None,
449            })
450            .unwrap()
451            .component;
452
453        let logging_opt = options.logging;
454        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
455        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
456    }
457
458    #[test]
459    fn test_config_precedence_order() {
460        let mut file = create_named_temp_file();
461        let toml_str = r#"
462            server_addr = "127.0.0.1:3002"
463            datanode_lease_secs = 15
464            selector = "LeaseBased"
465
466            [http]
467            addr = "127.0.0.1:4000"
468
469            [logging]
470            level = "debug"
471            dir = "./greptimedb_data/test/logs"
472        "#;
473        write!(file, "{}", toml_str).unwrap();
474
475        let env_prefix = "METASRV_UT";
476        temp_env::with_vars(
477            [
478                (
479                    // bind_addr = 127.0.0.1:14002
480                    [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
481                    Some("127.0.0.1:14002"),
482                ),
483                (
484                    // server_addr = 127.0.0.1:13002
485                    [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
486                    Some("127.0.0.1:13002"),
487                ),
488                (
489                    // http.addr = 127.0.0.1:24000
490                    [
491                        env_prefix.to_string(),
492                        "http".to_uppercase(),
493                        "addr".to_uppercase(),
494                    ]
495                    .join(ENV_VAR_SEP),
496                    Some("127.0.0.1:24000"),
497                ),
498            ],
499            || {
500                let command = StartCommand {
501                    http_addr: Some("127.0.0.1:14000".to_string()),
502                    config_file: Some(file.path().to_str().unwrap().to_string()),
503                    env_prefix: env_prefix.to_string(),
504                    ..Default::default()
505                };
506
507                let opts = command.load_options(&Default::default()).unwrap().component;
508
509                // Should be read from env, env > default values.
510                assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
511
512                // Should be read from config file, config file > env > default values.
513                assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
514
515                // Should be read from cli, cli > config file > env > default values.
516                assert_eq!(opts.http.addr, "127.0.0.1:14000");
517
518                // Should be default value.
519                assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
520            },
521        );
522    }
523}