cmd/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_telemetry::info;
24use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
25use common_version::{short_version, version};
26use meta_srv::bootstrap::MetasrvInstance;
27use meta_srv::metasrv::BackendImpl;
28use snafu::ResultExt;
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
32use crate::options::{GlobalOptions, GreptimeOptions};
33use crate::{create_resource_limit_metrics, log_versions, App};
34
35type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
36
37pub const APP_NAME: &str = "greptime-metasrv";
38
39pub struct Instance {
40    instance: MetasrvInstance,
41
42    // Keep the logging guard to prevent the worker from being dropped.
43    _guard: Vec<WorkerGuard>,
44}
45
46impl Instance {
47    pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
48        Self {
49            instance,
50            _guard: guard,
51        }
52    }
53
54    pub fn get_inner(&self) -> &MetasrvInstance {
55        &self.instance
56    }
57}
58
59#[async_trait]
60impl App for Instance {
61    fn name(&self) -> &str {
62        APP_NAME
63    }
64
65    async fn start(&mut self) -> Result<()> {
66        plugins::start_metasrv_plugins(self.instance.plugins())
67            .await
68            .context(StartMetaServerSnafu)?;
69
70        self.instance.start().await.context(StartMetaServerSnafu)
71    }
72
73    async fn stop(&mut self) -> Result<()> {
74        self.instance
75            .shutdown()
76            .await
77            .context(error::ShutdownMetaServerSnafu)
78    }
79}
80
81#[derive(Parser)]
82pub struct Command {
83    #[clap(subcommand)]
84    subcmd: SubCommand,
85}
86
87impl Command {
88    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
89        self.subcmd.build(opts).await
90    }
91
92    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
93        self.subcmd.load_options(global_options)
94    }
95
96    pub fn config_file(&self) -> &Option<String> {
97        self.subcmd.config_file()
98    }
99
100    pub fn env_prefix(&self) -> &String {
101        self.subcmd.env_prefix()
102    }
103}
104
105#[derive(Parser)]
106enum SubCommand {
107    Start(StartCommand),
108}
109
110impl SubCommand {
111    async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
112        match self {
113            SubCommand::Start(cmd) => cmd.build(opts).await,
114        }
115    }
116
117    fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
118        match self {
119            SubCommand::Start(cmd) => cmd.load_options(global_options),
120        }
121    }
122
123    fn config_file(&self) -> &Option<String> {
124        match self {
125            SubCommand::Start(cmd) => &cmd.config_file,
126        }
127    }
128
129    fn env_prefix(&self) -> &String {
130        match self {
131            SubCommand::Start(cmd) => &cmd.env_prefix,
132        }
133    }
134}
135
136#[derive(Default, Parser)]
137pub struct StartCommand {
138    /// The address to bind the gRPC server.
139    #[clap(long, alias = "bind-addr")]
140    rpc_bind_addr: Option<String>,
141    /// The communication server address for the frontend and datanode to connect to metasrv.
142    /// If left empty or unset, the server will automatically use the IP address of the first network interface
143    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
144    #[clap(long, alias = "server-addr")]
145    rpc_server_addr: Option<String>,
146    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
147    store_addrs: Option<Vec<String>>,
148    #[clap(short, long)]
149    config_file: Option<String>,
150    #[clap(short, long)]
151    selector: Option<String>,
152    #[clap(long)]
153    use_memory_store: Option<bool>,
154    #[clap(long)]
155    enable_region_failover: Option<bool>,
156    #[clap(long)]
157    http_addr: Option<String>,
158    #[clap(long)]
159    http_timeout: Option<u64>,
160    #[clap(long, default_value = "GREPTIMEDB_METASRV")]
161    env_prefix: String,
162    /// The working home directory of this metasrv instance.
163    #[clap(long)]
164    data_home: Option<String>,
165    /// If it's not empty, the metasrv will store all data with this key prefix.
166    #[clap(long, default_value = "")]
167    store_key_prefix: String,
168    /// The max operations per txn
169    #[clap(long)]
170    max_txn_ops: Option<usize>,
171    /// The database backend.
172    #[clap(long, value_enum)]
173    backend: Option<BackendImpl>,
174}
175
176impl fmt::Debug for StartCommand {
177    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
178        f.debug_struct("StartCommand")
179            .field("rpc_bind_addr", &self.rpc_bind_addr)
180            .field("rpc_server_addr", &self.rpc_server_addr)
181            .field("store_addrs", &self.sanitize_store_addrs())
182            .field("config_file", &self.config_file)
183            .field("selector", &self.selector)
184            .field("use_memory_store", &self.use_memory_store)
185            .field("enable_region_failover", &self.enable_region_failover)
186            .field("http_addr", &self.http_addr)
187            .field("http_timeout", &self.http_timeout)
188            .field("env_prefix", &self.env_prefix)
189            .field("data_home", &self.data_home)
190            .field("store_key_prefix", &self.store_key_prefix)
191            .field("max_txn_ops", &self.max_txn_ops)
192            .field("backend", &self.backend)
193            .finish()
194    }
195}
196
197impl StartCommand {
198    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
199        let mut opts = MetasrvOptions::load_layered_options(
200            self.config_file.as_deref(),
201            self.env_prefix.as_ref(),
202        )
203        .context(LoadLayeredConfigSnafu)?;
204
205        self.merge_with_cli_options(global_options, &mut opts)?;
206
207        Ok(opts)
208    }
209
210    fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
211        self.store_addrs.as_ref().map(|addrs| {
212            addrs
213                .iter()
214                .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
215                .collect()
216        })
217    }
218
219    // The precedence order is: cli > config file > environment variables > default values.
220    fn merge_with_cli_options(
221        &self,
222        global_options: &GlobalOptions,
223        opts: &mut MetasrvOptions,
224    ) -> Result<()> {
225        let opts = &mut opts.component;
226
227        if let Some(dir) = &global_options.log_dir {
228            opts.logging.dir.clone_from(dir);
229        }
230
231        if global_options.log_level.is_some() {
232            opts.logging.level.clone_from(&global_options.log_level);
233        }
234
235        opts.tracing = TracingOptions {
236            #[cfg(feature = "tokio-console")]
237            tokio_console_addr: global_options.tokio_console_addr.clone(),
238        };
239
240        #[allow(deprecated)]
241        if let Some(addr) = &self.rpc_bind_addr {
242            opts.bind_addr.clone_from(addr);
243            opts.grpc.bind_addr.clone_from(addr);
244        } else if !opts.bind_addr.is_empty() {
245            opts.grpc.bind_addr.clone_from(&opts.bind_addr);
246        }
247
248        #[allow(deprecated)]
249        if let Some(addr) = &self.rpc_server_addr {
250            opts.server_addr.clone_from(addr);
251            opts.grpc.server_addr.clone_from(addr);
252        } else if !opts.server_addr.is_empty() {
253            opts.grpc.server_addr.clone_from(&opts.server_addr);
254        }
255
256        if let Some(addrs) = &self.store_addrs {
257            opts.store_addrs.clone_from(addrs);
258        }
259
260        if let Some(selector_type) = &self.selector {
261            opts.selector = selector_type[..]
262                .try_into()
263                .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
264        }
265
266        if let Some(use_memory_store) = self.use_memory_store {
267            opts.use_memory_store = use_memory_store;
268        }
269
270        if let Some(enable_region_failover) = self.enable_region_failover {
271            opts.enable_region_failover = enable_region_failover;
272        }
273
274        if let Some(http_addr) = &self.http_addr {
275            opts.http.addr.clone_from(http_addr);
276        }
277
278        if let Some(http_timeout) = self.http_timeout {
279            opts.http.timeout = Duration::from_secs(http_timeout);
280        }
281
282        if let Some(data_home) = &self.data_home {
283            opts.data_home.clone_from(data_home);
284        }
285
286        // If the logging dir is not set, use the default logs dir in the data home.
287        if opts.logging.dir.is_empty() {
288            opts.logging.dir = Path::new(&opts.data_home)
289                .join(DEFAULT_LOGGING_DIR)
290                .to_string_lossy()
291                .to_string();
292        }
293
294        if !self.store_key_prefix.is_empty() {
295            opts.store_key_prefix.clone_from(&self.store_key_prefix)
296        }
297
298        if let Some(max_txn_ops) = self.max_txn_ops {
299            opts.max_txn_ops = max_txn_ops;
300        }
301
302        if let Some(backend) = &self.backend {
303            opts.backend.clone_from(backend);
304        }
305
306        // Disable dashboard in metasrv.
307        opts.http.disable_dashboard = true;
308
309        Ok(())
310    }
311
312    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
313        common_runtime::init_global_runtimes(&opts.runtime);
314
315        let guard = common_telemetry::init_global_logging(
316            APP_NAME,
317            &opts.component.logging,
318            &opts.component.tracing,
319            None,
320            None,
321        );
322
323        log_versions(version(), short_version(), APP_NAME);
324        create_resource_limit_metrics(APP_NAME);
325
326        info!("Metasrv start command: {:#?}", self);
327
328        let plugin_opts = opts.plugins;
329        let mut opts = opts.component;
330        opts.grpc.detect_server_addr();
331
332        info!("Metasrv options: {:#?}", opts);
333
334        let mut plugins = Plugins::new();
335        plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
336            .await
337            .context(StartMetaServerSnafu)?;
338
339        let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins.clone(), None)
340            .await
341            .context(error::BuildMetaServerSnafu)?;
342        let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
343
344        let instance = MetasrvInstance::new(opts, plugins, metasrv)
345            .await
346            .context(error::BuildMetaServerSnafu)?;
347
348        Ok(Instance::new(instance, guard))
349    }
350}
351
352#[cfg(test)]
353mod tests {
354    use std::io::Write;
355
356    use common_base::readable_size::ReadableSize;
357    use common_config::ENV_VAR_SEP;
358    use common_test_util::temp_dir::create_named_temp_file;
359    use meta_srv::selector::SelectorType;
360
361    use super::*;
362
363    #[test]
364    fn test_read_from_cmd() {
365        let cmd = StartCommand {
366            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
367            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
368            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
369            selector: Some("LoadBased".to_string()),
370            ..Default::default()
371        };
372
373        let options = cmd.load_options(&Default::default()).unwrap().component;
374        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
375        assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
376        assert_eq!(SelectorType::LoadBased, options.selector);
377    }
378
379    #[test]
380    fn test_read_from_config_file() {
381        let mut file = create_named_temp_file();
382        let toml_str = r#"
383            bind_addr = "127.0.0.1:3002"
384            server_addr = "127.0.0.1:3002"
385            store_addr = "127.0.0.1:2379"
386            selector = "LeaseBased"
387            use_memory_store = false
388
389            [logging]
390            level = "debug"
391            dir = "./greptimedb_data/test/logs"
392
393            [failure_detector]
394            threshold = 8.0
395            min_std_deviation = "100ms"
396            acceptable_heartbeat_pause = "3000ms"
397            first_heartbeat_estimate = "1000ms"
398        "#;
399        write!(file, "{}", toml_str).unwrap();
400
401        let cmd = StartCommand {
402            config_file: Some(file.path().to_str().unwrap().to_string()),
403            ..Default::default()
404        };
405
406        let options = cmd.load_options(&Default::default()).unwrap().component;
407        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
408        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
409        assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
410        assert_eq!(SelectorType::LeaseBased, options.selector);
411        assert_eq!("debug", options.logging.level.as_ref().unwrap());
412        assert_eq!(
413            "./greptimedb_data/test/logs".to_string(),
414            options.logging.dir
415        );
416        assert_eq!(8.0, options.failure_detector.threshold);
417        assert_eq!(
418            100.0,
419            options.failure_detector.min_std_deviation.as_millis() as f32
420        );
421        assert_eq!(
422            3000,
423            options
424                .failure_detector
425                .acceptable_heartbeat_pause
426                .as_millis()
427        );
428        assert_eq!(
429            1000,
430            options
431                .failure_detector
432                .first_heartbeat_estimate
433                .as_millis()
434        );
435        assert_eq!(
436            options.procedure.max_metadata_value_size,
437            Some(ReadableSize::kb(1500))
438        );
439    }
440
441    #[test]
442    fn test_load_log_options_from_cli() {
443        let cmd = StartCommand {
444            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
445            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
446            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
447            selector: Some("LoadBased".to_string()),
448            ..Default::default()
449        };
450
451        let options = cmd
452            .load_options(&GlobalOptions {
453                log_dir: Some("./greptimedb_data/test/logs".to_string()),
454                log_level: Some("debug".to_string()),
455
456                #[cfg(feature = "tokio-console")]
457                tokio_console_addr: None,
458            })
459            .unwrap()
460            .component;
461
462        let logging_opt = options.logging;
463        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
464        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
465    }
466
467    #[test]
468    fn test_config_precedence_order() {
469        let mut file = create_named_temp_file();
470        let toml_str = r#"
471            server_addr = "127.0.0.1:3002"
472            datanode_lease_secs = 15
473            selector = "LeaseBased"
474            use_memory_store = false
475
476            [http]
477            addr = "127.0.0.1:4000"
478
479            [logging]
480            level = "debug"
481            dir = "./greptimedb_data/test/logs"
482        "#;
483        write!(file, "{}", toml_str).unwrap();
484
485        let env_prefix = "METASRV_UT";
486        temp_env::with_vars(
487            [
488                (
489                    // bind_addr = 127.0.0.1:14002
490                    [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
491                    Some("127.0.0.1:14002"),
492                ),
493                (
494                    // server_addr = 127.0.0.1:13002
495                    [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
496                    Some("127.0.0.1:13002"),
497                ),
498                (
499                    // http.addr = 127.0.0.1:24000
500                    [
501                        env_prefix.to_string(),
502                        "http".to_uppercase(),
503                        "addr".to_uppercase(),
504                    ]
505                    .join(ENV_VAR_SEP),
506                    Some("127.0.0.1:24000"),
507                ),
508            ],
509            || {
510                let command = StartCommand {
511                    http_addr: Some("127.0.0.1:14000".to_string()),
512                    config_file: Some(file.path().to_str().unwrap().to_string()),
513                    env_prefix: env_prefix.to_string(),
514                    ..Default::default()
515                };
516
517                let opts = command.load_options(&Default::default()).unwrap().component;
518
519                // Should be read from env, env > default values.
520                assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
521
522                // Should be read from config file, config file > env > default values.
523                assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
524
525                // Should be read from cli, cli > config file > env > default values.
526                assert_eq!(opts.http.addr, "127.0.0.1:14000");
527
528                // Should be default value.
529                assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
530            },
531        );
532    }
533}