Skip to main content

cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18#[allow(clippy::print_stdout)]
19mod scanbench;
20
21use std::path::Path;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use clap::Parser;
26use common_config::Configurable;
27use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
28use common_telemetry::{info, warn};
29use common_wal::config::DatanodeWalConfig;
30use datanode::config::RegionEngineConfig;
31use datanode::datanode::Datanode;
32use meta_client::MetaClientOptions;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use tracing_appender::non_blocking::WorkerGuard;
36
37use crate::App;
38use crate::datanode::builder::InstanceBuilder;
39use crate::datanode::objbench::ObjbenchCommand;
40use crate::datanode::scanbench::ScanbenchCommand;
41use crate::error::{
42    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
43};
44use crate::options::{GlobalOptions, GreptimeOptions};
45
46pub const APP_NAME: &str = "greptime-datanode";
47
48type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
49
50pub struct Instance {
51    datanode: Datanode,
52
53    // Keep the logging guard to prevent the worker from being dropped.
54    _guard: Vec<WorkerGuard>,
55}
56
57impl Instance {
58    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
59        Self {
60            datanode,
61            _guard: guard,
62        }
63    }
64
65    pub fn datanode(&self) -> &Datanode {
66        &self.datanode
67    }
68
69    /// allow customizing datanode for downstream projects
70    pub fn datanode_mut(&mut self) -> &mut Datanode {
71        &mut self.datanode
72    }
73}
74
75#[async_trait]
76impl App for Instance {
77    fn name(&self) -> &str {
78        APP_NAME
79    }
80
81    async fn start(&mut self) -> Result<()> {
82        plugins::start_datanode_plugins(self.datanode.plugins())
83            .await
84            .context(StartDatanodeSnafu)?;
85
86        self.datanode.start().await.context(StartDatanodeSnafu)
87    }
88
89    async fn stop(&mut self) -> Result<()> {
90        self.datanode
91            .shutdown()
92            .await
93            .context(ShutdownDatanodeSnafu)
94    }
95}
96
97#[derive(Parser)]
98pub struct Command {
99    #[clap(subcommand)]
100    pub subcmd: SubCommand,
101}
102
103impl Command {
104    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
105        self.subcmd.build_with(builder).await
106    }
107
108    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
109        match &self.subcmd {
110            SubCommand::Start(cmd) => cmd.load_options(global_options),
111            SubCommand::Objbench(_) | SubCommand::Scanbench(_) => {
112                // For bench commands, we don't need to load DatanodeOptions
113                // They are standalone utility commands
114                let mut opts = datanode::config::DatanodeOptions::default();
115                opts.sanitize();
116                Ok(DatanodeOptions {
117                    runtime: Default::default(),
118                    plugins: Default::default(),
119                    component: opts,
120                })
121            }
122        }
123    }
124}
125
126#[derive(Parser)]
127pub enum SubCommand {
128    Start(StartCommand),
129    /// Object storage benchmark tool
130    Objbench(ObjbenchCommand),
131    /// Scan benchmark tool - benchmarks scanning a region directly from storage
132    Scanbench(ScanbenchCommand),
133}
134
135impl SubCommand {
136    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
137        match self {
138            SubCommand::Start(cmd) => {
139                info!("Building datanode with {:#?}", cmd);
140                builder.build().await
141            }
142            SubCommand::Objbench(cmd) => {
143                cmd.run().await?;
144                std::process::exit(0);
145            }
146            SubCommand::Scanbench(cmd) => {
147                cmd.run().await?;
148                std::process::exit(0);
149            }
150        }
151    }
152}
153
154/// Storage engine config
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct StorageConfig {
158    /// The working directory of database
159    pub data_home: String,
160    #[serde(flatten)]
161    pub store: object_store::config::ObjectStoreConfig,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
165#[serde(default)]
166struct StorageConfigWrapper {
167    storage: StorageConfig,
168    region_engine: Vec<RegionEngineConfig>,
169    #[serde(default, deserialize_with = "deserialize_wal_config")]
170    wal: DatanodeWalConfig,
171}
172
173/// Deserializes [`DatanodeWalConfig`], defaulting `provider` to `"raft_engine"` when
174/// the `[wal]` section is present but omits it. This mirrors the behavior of the
175/// datanode's layered config loader which merges over a default that already contains
176/// `provider`.
177fn deserialize_wal_config<'de, D>(
178    deserializer: D,
179) -> std::result::Result<DatanodeWalConfig, D::Error>
180where
181    D: serde::Deserializer<'de>,
182{
183    use serde::de::Error as _;
184
185    let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
186    if !table.contains_key("provider") {
187        table.insert(
188            "provider".to_string(),
189            toml::Value::String("raft_engine".to_string()),
190        );
191    }
192    DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
193}
194
195#[derive(Debug, Parser, Default)]
196pub struct StartCommand {
197    #[clap(long)]
198    node_id: Option<u64>,
199    /// The address to bind the gRPC server.
200    #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
201    grpc_bind_addr: Option<String>,
202    /// The address advertised to the metasrv, and used for connections from outside the host.
203    /// If left empty or unset, the server will automatically use the IP address of the first network interface
204    /// on the host, with the same port number as the one specified in `grpc_bind_addr`.
205    #[clap(
206        long = "grpc-server-addr",
207        alias = "rpc-server-addr",
208        alias = "rpc-hostname"
209    )]
210    grpc_server_addr: Option<String>,
211    #[clap(long, value_delimiter = ',', num_args = 1..)]
212    metasrv_addrs: Option<Vec<String>>,
213    #[clap(short, long)]
214    config_file: Option<String>,
215    #[clap(long)]
216    data_home: Option<String>,
217    #[clap(long)]
218    wal_dir: Option<String>,
219    #[clap(long)]
220    http_addr: Option<String>,
221    #[clap(long)]
222    http_timeout: Option<u64>,
223    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
224    env_prefix: String,
225}
226
227impl StartCommand {
228    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
229        let mut opts = DatanodeOptions::load_layered_options(
230            self.config_file.as_deref(),
231            self.env_prefix.as_ref(),
232        )
233        .context(LoadLayeredConfigSnafu)?;
234
235        self.merge_with_cli_options(global_options, &mut opts)?;
236        opts.component.sanitize();
237
238        Ok(opts)
239    }
240
241    // The precedence order is: cli > config file > environment variables > default values.
242    #[allow(deprecated)]
243    fn merge_with_cli_options(
244        &self,
245        global_options: &GlobalOptions,
246        opts: &mut DatanodeOptions,
247    ) -> Result<()> {
248        let opts = &mut opts.component;
249
250        if let Some(dir) = &global_options.log_dir {
251            opts.logging.dir.clone_from(dir);
252        }
253
254        if global_options.log_level.is_some() {
255            opts.logging.level.clone_from(&global_options.log_level);
256        }
257
258        opts.tracing = TracingOptions {
259            #[cfg(feature = "tokio-console")]
260            tokio_console_addr: global_options.tokio_console_addr.clone(),
261        };
262
263        if let Some(addr) = &self.grpc_bind_addr {
264            opts.grpc.bind_addr.clone_from(addr);
265        } else if let Some(addr) = &opts.rpc_addr {
266            warn!(
267                "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.bind_addr` instead."
268            );
269            opts.grpc.bind_addr.clone_from(addr);
270        }
271
272        if let Some(server_addr) = &self.grpc_server_addr {
273            opts.grpc.server_addr.clone_from(server_addr);
274        } else if let Some(server_addr) = &opts.rpc_hostname {
275            warn!(
276                "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.server_addr` instead."
277            );
278            opts.grpc.server_addr.clone_from(server_addr);
279        }
280
281        if let Some(runtime_size) = opts.rpc_runtime_size {
282            warn!(
283                "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
284            );
285            opts.grpc.runtime_size = runtime_size;
286        }
287
288        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
289            warn!(
290                "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
291            );
292            opts.grpc.max_recv_message_size = max_recv_message_size;
293        }
294
295        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
296            warn!(
297                "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
298            );
299            opts.grpc.max_send_message_size = max_send_message_size;
300        }
301
302        if let Some(node_id) = self.node_id {
303            opts.node_id = Some(node_id);
304        }
305
306        if let Some(metasrv_addrs) = &self.metasrv_addrs {
307            opts.meta_client
308                .get_or_insert_with(MetaClientOptions::default)
309                .metasrv_addrs
310                .clone_from(metasrv_addrs);
311        }
312
313        ensure!(
314            opts.node_id.is_some(),
315            MissingConfigSnafu {
316                msg: "Missing node id option"
317            }
318        );
319
320        if let Some(data_home) = &self.data_home {
321            opts.storage.data_home.clone_from(data_home);
322        }
323
324        // `wal_dir` only affects raft-engine config.
325        if let Some(wal_dir) = &self.wal_dir
326            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
327        {
328            if raft_engine_config
329                .dir
330                .as_ref()
331                .is_some_and(|original_dir| original_dir != wal_dir)
332            {
333                info!("The wal dir of raft-engine is altered to {wal_dir}");
334            }
335            raft_engine_config.dir.replace(wal_dir.clone());
336        }
337
338        // If the logging dir is not set, use the default logs dir in the data home.
339        if opts.logging.dir.is_empty() {
340            opts.logging.dir = Path::new(&opts.storage.data_home)
341                .join(DEFAULT_LOGGING_DIR)
342                .to_string_lossy()
343                .to_string();
344        }
345
346        if let Some(http_addr) = &self.http_addr {
347            opts.http.addr.clone_from(http_addr);
348        }
349
350        if let Some(http_timeout) = self.http_timeout {
351            opts.http.timeout = Duration::from_secs(http_timeout)
352        }
353
354        // Disable dashboard in datanode.
355        opts.http.disable_dashboard = true;
356
357        Ok(())
358    }
359}
360
361#[cfg(test)]
362mod tests {
363    use std::assert_matches;
364    use std::io::Write;
365    use std::time::Duration;
366
367    use clap::{CommandFactory, Parser};
368    use common_config::ENV_VAR_SEP;
369    use common_test_util::temp_dir::create_named_temp_file;
370    use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
371
372    use super::*;
373    use crate::options::GlobalOptions;
374
375    #[test]
376    fn test_deprecated_cli_options() {
377        common_telemetry::init_default_ut_logging();
378        let mut file = create_named_temp_file();
379        let toml_str = r#"
380            enable_memory_catalog = false
381            node_id = 42
382
383            rpc_addr = "127.0.0.1:4001"
384            rpc_hostname = "192.168.0.1"
385            [grpc]
386            bind_addr = "127.0.0.1:3001"
387            server_addr = "127.0.0.1"
388            runtime_size = 8
389        "#;
390        write!(file, "{}", toml_str).unwrap();
391
392        let cmd = StartCommand {
393            config_file: Some(file.path().to_str().unwrap().to_string()),
394            ..Default::default()
395        };
396
397        let options = cmd.load_options(&Default::default()).unwrap().component;
398        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
399        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
400    }
401
402    #[test]
403    fn test_read_from_config_file() {
404        let mut file = create_named_temp_file();
405        let toml_str = r#"
406            enable_memory_catalog = false
407            node_id = 42
408
409            [grpc]
410            bind_addr = "127.0.0.1:3001"
411            server_addr = "127.0.0.1"
412            runtime_size = 8
413
414            [meta_client]
415            metasrv_addrs = ["127.0.0.1:3002"]
416            timeout = "3s"
417            connect_timeout = "5s"
418            ddl_timeout = "10s"
419            tcp_nodelay = true
420
421            [wal]
422            provider = "raft_engine"
423            dir = "/other/wal"
424            file_size = "1GB"
425            purge_threshold = "50GB"
426            purge_interval = "10m"
427            read_batch_size = 128
428            sync_write = false
429
430            [storage]
431            data_home = "./greptimedb_data/"
432            type = "File"
433
434            [[storage.providers]]
435            type = "Gcs"
436            bucket = "foo"
437            endpoint = "bar"
438
439            [[storage.providers]]
440            type = "S3"
441            bucket = "foo"
442
443            [logging]
444            level = "debug"
445            dir = "./greptimedb_data/test/logs"
446        "#;
447        write!(file, "{}", toml_str).unwrap();
448
449        let cmd = StartCommand {
450            config_file: Some(file.path().to_str().unwrap().to_string()),
451            ..Default::default()
452        };
453
454        let options = cmd.load_options(&Default::default()).unwrap().component;
455
456        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
457        assert_eq!("127.0.0.1".to_string(), options.grpc.server_addr);
458        assert_eq!(Some(42), options.node_id);
459
460        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
461            unreachable!()
462        };
463        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
464        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
465        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
466        assert_eq!(
467            1024 * 1024 * 1024 * 50,
468            raft_engine_config.purge_threshold.0
469        );
470        assert!(!raft_engine_config.sync_write);
471
472        let MetaClientOptions {
473            metasrv_addrs: metasrv_addr,
474            timeout,
475            connect_timeout,
476            ddl_timeout,
477            tcp_nodelay,
478            ..
479        } = options.meta_client.unwrap();
480
481        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
482        assert_eq!(5000, connect_timeout.as_millis());
483        assert_eq!(10000, ddl_timeout.as_millis());
484        assert_eq!(3000, timeout.as_millis());
485        assert!(tcp_nodelay);
486        assert_eq!("./greptimedb_data/", options.storage.data_home);
487        assert!(matches!(
488            &options.storage.store,
489            ObjectStoreConfig::File(FileConfig { .. })
490        ));
491        assert_eq!(options.storage.providers.len(), 2);
492        assert!(matches!(
493            options.storage.providers[0],
494            ObjectStoreConfig::Gcs(GcsConfig { .. })
495        ));
496        assert!(matches!(
497            options.storage.providers[1],
498            ObjectStoreConfig::S3(S3Config { .. })
499        ));
500
501        assert_eq!("debug", options.logging.level.unwrap());
502        assert_eq!(
503            "./greptimedb_data/test/logs".to_string(),
504            options.logging.dir
505        );
506    }
507
508    #[test]
509    fn test_try_from_cmd() {
510        assert!(
511            (StartCommand {
512                metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
513                ..Default::default()
514            })
515            .load_options(&GlobalOptions::default())
516            .is_err()
517        );
518
519        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
520        assert!(
521            (StartCommand {
522                node_id: Some(42),
523                ..Default::default()
524            })
525            .load_options(&GlobalOptions::default())
526            .is_ok()
527        );
528    }
529
530    #[test]
531    fn test_load_log_options_from_cli() {
532        let mut cmd = StartCommand::default();
533
534        let result = cmd.load_options(&GlobalOptions {
535            log_dir: Some("./greptimedb_data/test/logs".to_string()),
536            log_level: Some("debug".to_string()),
537
538            #[cfg(feature = "tokio-console")]
539            tokio_console_addr: None,
540        });
541        // Missing node_id.
542        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
543
544        cmd.node_id = Some(42);
545
546        let options = cmd
547            .load_options(&GlobalOptions {
548                log_dir: Some("./greptimedb_data/test/logs".to_string()),
549                log_level: Some("debug".to_string()),
550
551                #[cfg(feature = "tokio-console")]
552                tokio_console_addr: None,
553            })
554            .unwrap()
555            .component;
556
557        let logging_opt = options.logging;
558        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
559        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
560    }
561
562    #[test]
563    fn test_config_precedence_order() {
564        let mut file = create_named_temp_file();
565        let toml_str = r#"
566            enable_memory_catalog = false
567            node_id = 42
568            rpc_addr = "127.0.0.1:3001"
569            rpc_runtime_size = 8
570            rpc_hostname = "10.103.174.219"
571
572            [meta_client]
573            timeout = "3s"
574            connect_timeout = "5s"
575            tcp_nodelay = true
576
577            [wal]
578            provider = "raft_engine"
579            file_size = "1GB"
580            purge_threshold = "50GB"
581            purge_interval = "5m"
582            sync_write = false
583
584            [storage]
585            type = "File"
586            data_home = "./greptimedb_data/"
587
588            [logging]
589            level = "debug"
590            dir = "./greptimedb_data/test/logs"
591        "#;
592        write!(file, "{}", toml_str).unwrap();
593
594        let env_prefix = "DATANODE_UT";
595        temp_env::with_vars(
596            [
597                (
598                    // wal.purge_interval = 1m
599                    [
600                        env_prefix.to_string(),
601                        "wal".to_uppercase(),
602                        "purge_interval".to_uppercase(),
603                    ]
604                    .join(ENV_VAR_SEP),
605                    Some("1m"),
606                ),
607                (
608                    // wal.read_batch_size = 100
609                    [
610                        env_prefix.to_string(),
611                        "wal".to_uppercase(),
612                        "read_batch_size".to_uppercase(),
613                    ]
614                    .join(ENV_VAR_SEP),
615                    Some("100"),
616                ),
617                (
618                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
619                    [
620                        env_prefix.to_string(),
621                        "meta_client".to_uppercase(),
622                        "metasrv_addrs".to_uppercase(),
623                    ]
624                    .join(ENV_VAR_SEP),
625                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
626                ),
627            ],
628            || {
629                let command = StartCommand {
630                    config_file: Some(file.path().to_str().unwrap().to_string()),
631                    wal_dir: Some("/other/wal/dir".to_string()),
632                    env_prefix: env_prefix.to_string(),
633                    ..Default::default()
634                };
635
636                let opts = command.load_options(&Default::default()).unwrap().component;
637
638                // Should be read from env, env > default values.
639                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
640                    unreachable!()
641                };
642                assert_eq!(raft_engine_config.read_batch_size, 100);
643                assert_eq!(
644                    opts.meta_client.unwrap().metasrv_addrs,
645                    vec![
646                        "127.0.0.1:3001".to_string(),
647                        "127.0.0.1:3002".to_string(),
648                        "127.0.0.1:3003".to_string()
649                    ]
650                );
651
652                // Should be read from config file, config file > env > default values.
653                assert_eq!(
654                    raft_engine_config.purge_interval,
655                    Duration::from_secs(60 * 5)
656                );
657
658                // Should be read from cli, cli > config file > env > default values.
659                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
660
661                // Should be default value.
662                assert_eq!(
663                    opts.http.addr,
664                    DatanodeOptions::default().component.http.addr
665                );
666                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
667            },
668        );
669    }
670
671    #[test]
672    fn test_parse_grpc_cli_aliases() {
673        let command = StartCommand::try_parse_from([
674            "datanode",
675            "--grpc-bind-addr",
676            "127.0.0.1:13001",
677            "--grpc-server-addr",
678            "10.0.0.1:13001",
679        ])
680        .unwrap();
681        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:13001"));
682        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:13001"));
683
684        let command = StartCommand::try_parse_from([
685            "datanode",
686            "--rpc-bind-addr",
687            "127.0.0.1:23001",
688            "--rpc-server-addr",
689            "10.0.0.2:23001",
690        ])
691        .unwrap();
692        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:23001"));
693        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:23001"));
694
695        let command = StartCommand::try_parse_from([
696            "datanode",
697            "--rpc-addr",
698            "127.0.0.1:33001",
699            "--rpc-hostname",
700            "10.0.0.3:33001",
701        ])
702        .unwrap();
703        assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:33001"));
704        assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:33001"));
705    }
706
707    #[test]
708    fn test_help_uses_grpc_option_names() {
709        let mut cmd = StartCommand::command();
710        let mut help = Vec::new();
711        cmd.write_long_help(&mut help).unwrap();
712        let help = String::from_utf8(help).unwrap();
713
714        assert!(help.contains("--grpc-bind-addr"));
715        assert!(help.contains("--grpc-server-addr"));
716        assert!(!help.contains("--rpc-bind-addr"));
717        assert!(!help.contains("--rpc-server-addr"));
718        assert!(!help.contains("--rpc-addr"));
719        assert!(!help.contains("--rpc-hostname"));
720    }
721}