cmd/
metasrv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt;
16use std::path::Path;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_base::Plugins;
22use common_config::Configurable;
23use common_telemetry::info;
24use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
25use common_version::{short_version, verbose_version};
26use meta_srv::bootstrap::MetasrvInstance;
27use meta_srv::metasrv::BackendImpl;
28use snafu::ResultExt;
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::error::{self, LoadLayeredConfigSnafu, Result, StartMetaServerSnafu};
32use crate::options::{GlobalOptions, GreptimeOptions};
33use crate::{create_resource_limit_metrics, log_versions, maybe_activate_heap_profile, App};
34
35type MetasrvOptions = GreptimeOptions<meta_srv::metasrv::MetasrvOptions>;
36
37pub const APP_NAME: &str = "greptime-metasrv";
38
39pub struct Instance {
40    instance: MetasrvInstance,
41
42    // Keep the logging guard to prevent the worker from being dropped.
43    _guard: Vec<WorkerGuard>,
44}
45
46impl Instance {
47    pub fn new(instance: MetasrvInstance, guard: Vec<WorkerGuard>) -> Self {
48        Self {
49            instance,
50            _guard: guard,
51        }
52    }
53
54    pub fn get_inner(&self) -> &MetasrvInstance {
55        &self.instance
56    }
57
58    pub fn mut_inner(&mut self) -> &mut MetasrvInstance {
59        &mut self.instance
60    }
61}
62
63#[async_trait]
64impl App for Instance {
65    fn name(&self) -> &str {
66        APP_NAME
67    }
68
69    async fn start(&mut self) -> Result<()> {
70        plugins::start_metasrv_plugins(self.instance.plugins())
71            .await
72            .context(StartMetaServerSnafu)?;
73
74        self.instance.start().await.context(StartMetaServerSnafu)
75    }
76
77    async fn stop(&mut self) -> Result<()> {
78        self.instance
79            .shutdown()
80            .await
81            .context(error::ShutdownMetaServerSnafu)
82    }
83}
84
85#[derive(Parser)]
86pub struct Command {
87    #[clap(subcommand)]
88    subcmd: SubCommand,
89}
90
91impl Command {
92    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
93        self.subcmd.build(opts).await
94    }
95
96    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
97        self.subcmd.load_options(global_options)
98    }
99
100    pub fn config_file(&self) -> &Option<String> {
101        self.subcmd.config_file()
102    }
103
104    pub fn env_prefix(&self) -> &String {
105        self.subcmd.env_prefix()
106    }
107}
108
109#[derive(Parser)]
110enum SubCommand {
111    Start(StartCommand),
112}
113
114impl SubCommand {
115    async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
116        match self {
117            SubCommand::Start(cmd) => cmd.build(opts).await,
118        }
119    }
120
121    fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
122        match self {
123            SubCommand::Start(cmd) => cmd.load_options(global_options),
124        }
125    }
126
127    fn config_file(&self) -> &Option<String> {
128        match self {
129            SubCommand::Start(cmd) => &cmd.config_file,
130        }
131    }
132
133    fn env_prefix(&self) -> &String {
134        match self {
135            SubCommand::Start(cmd) => &cmd.env_prefix,
136        }
137    }
138}
139
140#[derive(Default, Parser)]
141pub struct StartCommand {
142    /// The address to bind the gRPC server.
143    #[clap(long, alias = "bind-addr")]
144    rpc_bind_addr: Option<String>,
145    /// The communication server address for the frontend and datanode to connect to metasrv.
146    /// If left empty or unset, the server will automatically use the IP address of the first network interface
147    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
148    #[clap(long, alias = "server-addr")]
149    rpc_server_addr: Option<String>,
150    #[clap(long, alias = "store-addr", value_delimiter = ',', num_args = 1..)]
151    store_addrs: Option<Vec<String>>,
152    #[clap(short, long)]
153    config_file: Option<String>,
154    #[clap(short, long)]
155    selector: Option<String>,
156    #[clap(long)]
157    use_memory_store: Option<bool>,
158    #[clap(long)]
159    enable_region_failover: Option<bool>,
160    #[clap(long)]
161    http_addr: Option<String>,
162    #[clap(long)]
163    http_timeout: Option<u64>,
164    #[clap(long, default_value = "GREPTIMEDB_METASRV")]
165    env_prefix: String,
166    /// The working home directory of this metasrv instance.
167    #[clap(long)]
168    data_home: Option<String>,
169    /// If it's not empty, the metasrv will store all data with this key prefix.
170    #[clap(long, default_value = "")]
171    store_key_prefix: String,
172    /// The max operations per txn
173    #[clap(long)]
174    max_txn_ops: Option<usize>,
175    /// The database backend.
176    #[clap(long, value_enum)]
177    backend: Option<BackendImpl>,
178}
179
180impl fmt::Debug for StartCommand {
181    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
182        f.debug_struct("StartCommand")
183            .field("rpc_bind_addr", &self.rpc_bind_addr)
184            .field("rpc_server_addr", &self.rpc_server_addr)
185            .field("store_addrs", &self.sanitize_store_addrs())
186            .field("config_file", &self.config_file)
187            .field("selector", &self.selector)
188            .field("use_memory_store", &self.use_memory_store)
189            .field("enable_region_failover", &self.enable_region_failover)
190            .field("http_addr", &self.http_addr)
191            .field("http_timeout", &self.http_timeout)
192            .field("env_prefix", &self.env_prefix)
193            .field("data_home", &self.data_home)
194            .field("store_key_prefix", &self.store_key_prefix)
195            .field("max_txn_ops", &self.max_txn_ops)
196            .field("backend", &self.backend)
197            .finish()
198    }
199}
200
201impl StartCommand {
202    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<MetasrvOptions> {
203        let mut opts = MetasrvOptions::load_layered_options(
204            self.config_file.as_deref(),
205            self.env_prefix.as_ref(),
206        )
207        .context(LoadLayeredConfigSnafu)?;
208
209        self.merge_with_cli_options(global_options, &mut opts)?;
210
211        Ok(opts)
212    }
213
214    fn sanitize_store_addrs(&self) -> Option<Vec<String>> {
215        self.store_addrs.as_ref().map(|addrs| {
216            addrs
217                .iter()
218                .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
219                .collect()
220        })
221    }
222
223    // The precedence order is: cli > config file > environment variables > default values.
224    fn merge_with_cli_options(
225        &self,
226        global_options: &GlobalOptions,
227        opts: &mut MetasrvOptions,
228    ) -> Result<()> {
229        let opts = &mut opts.component;
230
231        if let Some(dir) = &global_options.log_dir {
232            opts.logging.dir.clone_from(dir);
233        }
234
235        if global_options.log_level.is_some() {
236            opts.logging.level.clone_from(&global_options.log_level);
237        }
238
239        opts.tracing = TracingOptions {
240            #[cfg(feature = "tokio-console")]
241            tokio_console_addr: global_options.tokio_console_addr.clone(),
242        };
243
244        #[allow(deprecated)]
245        if let Some(addr) = &self.rpc_bind_addr {
246            opts.bind_addr.clone_from(addr);
247            opts.grpc.bind_addr.clone_from(addr);
248        } else if !opts.bind_addr.is_empty() {
249            opts.grpc.bind_addr.clone_from(&opts.bind_addr);
250        }
251
252        #[allow(deprecated)]
253        if let Some(addr) = &self.rpc_server_addr {
254            opts.server_addr.clone_from(addr);
255            opts.grpc.server_addr.clone_from(addr);
256        } else if !opts.server_addr.is_empty() {
257            opts.grpc.server_addr.clone_from(&opts.server_addr);
258        }
259
260        if let Some(addrs) = &self.store_addrs {
261            opts.store_addrs.clone_from(addrs);
262        }
263
264        if let Some(selector_type) = &self.selector {
265            opts.selector = selector_type[..]
266                .try_into()
267                .context(error::UnsupportedSelectorTypeSnafu { selector_type })?;
268        }
269
270        if let Some(use_memory_store) = self.use_memory_store {
271            opts.use_memory_store = use_memory_store;
272        }
273
274        if let Some(enable_region_failover) = self.enable_region_failover {
275            opts.enable_region_failover = enable_region_failover;
276        }
277
278        if let Some(http_addr) = &self.http_addr {
279            opts.http.addr.clone_from(http_addr);
280        }
281
282        if let Some(http_timeout) = self.http_timeout {
283            opts.http.timeout = Duration::from_secs(http_timeout);
284        }
285
286        if let Some(data_home) = &self.data_home {
287            opts.data_home.clone_from(data_home);
288        }
289
290        // If the logging dir is not set, use the default logs dir in the data home.
291        if opts.logging.dir.is_empty() {
292            opts.logging.dir = Path::new(&opts.data_home)
293                .join(DEFAULT_LOGGING_DIR)
294                .to_string_lossy()
295                .to_string();
296        }
297
298        if !self.store_key_prefix.is_empty() {
299            opts.store_key_prefix.clone_from(&self.store_key_prefix)
300        }
301
302        if let Some(max_txn_ops) = self.max_txn_ops {
303            opts.max_txn_ops = max_txn_ops;
304        }
305
306        if let Some(backend) = &self.backend {
307            opts.backend.clone_from(backend);
308        }
309
310        // Disable dashboard in metasrv.
311        opts.http.disable_dashboard = true;
312
313        Ok(())
314    }
315
316    pub async fn build(&self, opts: MetasrvOptions) -> Result<Instance> {
317        common_runtime::init_global_runtimes(&opts.runtime);
318
319        let guard = common_telemetry::init_global_logging(
320            APP_NAME,
321            &opts.component.logging,
322            &opts.component.tracing,
323            None,
324            None,
325        );
326
327        log_versions(verbose_version(), short_version(), APP_NAME);
328        maybe_activate_heap_profile(&opts.component.memory);
329        create_resource_limit_metrics(APP_NAME);
330
331        info!("Metasrv start command: {:#?}", self);
332
333        let plugin_opts = opts.plugins;
334        let mut opts = opts.component;
335        opts.grpc.detect_server_addr();
336
337        info!("Metasrv options: {:#?}", opts);
338
339        let mut plugins = Plugins::new();
340        plugins::setup_metasrv_plugins(&mut plugins, &plugin_opts, &opts)
341            .await
342            .context(StartMetaServerSnafu)?;
343
344        let builder = meta_srv::bootstrap::metasrv_builder(&opts, plugins, None)
345            .await
346            .context(error::BuildMetaServerSnafu)?;
347        let metasrv = builder.build().await.context(error::BuildMetaServerSnafu)?;
348
349        let instance = MetasrvInstance::new(metasrv)
350            .await
351            .context(error::BuildMetaServerSnafu)?;
352
353        Ok(Instance::new(instance, guard))
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::io::Write;
360
361    use common_base::readable_size::ReadableSize;
362    use common_config::ENV_VAR_SEP;
363    use common_test_util::temp_dir::create_named_temp_file;
364    use meta_srv::selector::SelectorType;
365
366    use super::*;
367
368    #[test]
369    fn test_read_from_cmd() {
370        let cmd = StartCommand {
371            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
372            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
373            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
374            selector: Some("LoadBased".to_string()),
375            ..Default::default()
376        };
377
378        let options = cmd.load_options(&Default::default()).unwrap().component;
379        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
380        assert_eq!(vec!["127.0.0.1:2380".to_string()], options.store_addrs);
381        assert_eq!(SelectorType::LoadBased, options.selector);
382    }
383
384    #[test]
385    fn test_read_from_config_file() {
386        let mut file = create_named_temp_file();
387        let toml_str = r#"
388            bind_addr = "127.0.0.1:3002"
389            server_addr = "127.0.0.1:3002"
390            store_addr = "127.0.0.1:2379"
391            selector = "LeaseBased"
392            use_memory_store = false
393
394            [logging]
395            level = "debug"
396            dir = "./greptimedb_data/test/logs"
397
398            [failure_detector]
399            threshold = 8.0
400            min_std_deviation = "100ms"
401            acceptable_heartbeat_pause = "3000ms"
402            first_heartbeat_estimate = "1000ms"
403        "#;
404        write!(file, "{}", toml_str).unwrap();
405
406        let cmd = StartCommand {
407            config_file: Some(file.path().to_str().unwrap().to_string()),
408            ..Default::default()
409        };
410
411        let options = cmd.load_options(&Default::default()).unwrap().component;
412        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.bind_addr);
413        assert_eq!("127.0.0.1:3002".to_string(), options.grpc.server_addr);
414        assert_eq!(vec!["127.0.0.1:2379".to_string()], options.store_addrs);
415        assert_eq!(SelectorType::LeaseBased, options.selector);
416        assert_eq!("debug", options.logging.level.as_ref().unwrap());
417        assert_eq!(
418            "./greptimedb_data/test/logs".to_string(),
419            options.logging.dir
420        );
421        assert_eq!(8.0, options.failure_detector.threshold);
422        assert_eq!(
423            100.0,
424            options.failure_detector.min_std_deviation.as_millis() as f32
425        );
426        assert_eq!(
427            3000,
428            options
429                .failure_detector
430                .acceptable_heartbeat_pause
431                .as_millis()
432        );
433        assert_eq!(
434            1000,
435            options
436                .failure_detector
437                .first_heartbeat_estimate
438                .as_millis()
439        );
440        assert_eq!(
441            options.procedure.max_metadata_value_size,
442            Some(ReadableSize::kb(1500))
443        );
444    }
445
446    #[test]
447    fn test_load_log_options_from_cli() {
448        let cmd = StartCommand {
449            rpc_bind_addr: Some("127.0.0.1:3002".to_string()),
450            rpc_server_addr: Some("127.0.0.1:3002".to_string()),
451            store_addrs: Some(vec!["127.0.0.1:2380".to_string()]),
452            selector: Some("LoadBased".to_string()),
453            ..Default::default()
454        };
455
456        let options = cmd
457            .load_options(&GlobalOptions {
458                log_dir: Some("./greptimedb_data/test/logs".to_string()),
459                log_level: Some("debug".to_string()),
460
461                #[cfg(feature = "tokio-console")]
462                tokio_console_addr: None,
463            })
464            .unwrap()
465            .component;
466
467        let logging_opt = options.logging;
468        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
469        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
470    }
471
472    #[test]
473    fn test_config_precedence_order() {
474        let mut file = create_named_temp_file();
475        let toml_str = r#"
476            server_addr = "127.0.0.1:3002"
477            datanode_lease_secs = 15
478            selector = "LeaseBased"
479            use_memory_store = false
480
481            [http]
482            addr = "127.0.0.1:4000"
483
484            [logging]
485            level = "debug"
486            dir = "./greptimedb_data/test/logs"
487        "#;
488        write!(file, "{}", toml_str).unwrap();
489
490        let env_prefix = "METASRV_UT";
491        temp_env::with_vars(
492            [
493                (
494                    // bind_addr = 127.0.0.1:14002
495                    [env_prefix.to_string(), "bind_addr".to_uppercase()].join(ENV_VAR_SEP),
496                    Some("127.0.0.1:14002"),
497                ),
498                (
499                    // server_addr = 127.0.0.1:13002
500                    [env_prefix.to_string(), "server_addr".to_uppercase()].join(ENV_VAR_SEP),
501                    Some("127.0.0.1:13002"),
502                ),
503                (
504                    // http.addr = 127.0.0.1:24000
505                    [
506                        env_prefix.to_string(),
507                        "http".to_uppercase(),
508                        "addr".to_uppercase(),
509                    ]
510                    .join(ENV_VAR_SEP),
511                    Some("127.0.0.1:24000"),
512                ),
513            ],
514            || {
515                let command = StartCommand {
516                    http_addr: Some("127.0.0.1:14000".to_string()),
517                    config_file: Some(file.path().to_str().unwrap().to_string()),
518                    env_prefix: env_prefix.to_string(),
519                    ..Default::default()
520                };
521
522                let opts = command.load_options(&Default::default()).unwrap().component;
523
524                // Should be read from env, env > default values.
525                assert_eq!(opts.grpc.bind_addr, "127.0.0.1:14002");
526
527                // Should be read from config file, config file > env > default values.
528                assert_eq!(opts.grpc.server_addr, "127.0.0.1:3002");
529
530                // Should be read from cli, cli > config file > env > default values.
531                assert_eq!(opts.http.addr, "127.0.0.1:14000");
532
533                // Should be default value.
534                assert_eq!(opts.store_addrs, vec!["127.0.0.1:2379".to_string()]);
535            },
536        );
537    }
538}