cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18#[allow(clippy::print_stdout)]
19mod scanbench;
20
21use std::path::Path;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use clap::Parser;
26use common_config::Configurable;
27use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
28use common_telemetry::{info, warn};
29use common_wal::config::DatanodeWalConfig;
30use datanode::config::RegionEngineConfig;
31use datanode::datanode::Datanode;
32use meta_client::MetaClientOptions;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use tracing_appender::non_blocking::WorkerGuard;
36
37use crate::App;
38use crate::datanode::builder::InstanceBuilder;
39use crate::datanode::objbench::ObjbenchCommand;
40use crate::datanode::scanbench::ScanbenchCommand;
41use crate::error::{
42    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
43};
44use crate::options::{GlobalOptions, GreptimeOptions};
45
46pub const APP_NAME: &str = "greptime-datanode";
47
48type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
49
50pub struct Instance {
51    datanode: Datanode,
52
53    // Keep the logging guard to prevent the worker from being dropped.
54    _guard: Vec<WorkerGuard>,
55}
56
57impl Instance {
58    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
59        Self {
60            datanode,
61            _guard: guard,
62        }
63    }
64
65    pub fn datanode(&self) -> &Datanode {
66        &self.datanode
67    }
68
69    /// allow customizing datanode for downstream projects
70    pub fn datanode_mut(&mut self) -> &mut Datanode {
71        &mut self.datanode
72    }
73}
74
75#[async_trait]
76impl App for Instance {
77    fn name(&self) -> &str {
78        APP_NAME
79    }
80
81    async fn start(&mut self) -> Result<()> {
82        plugins::start_datanode_plugins(self.datanode.plugins())
83            .await
84            .context(StartDatanodeSnafu)?;
85
86        self.datanode.start().await.context(StartDatanodeSnafu)
87    }
88
89    async fn stop(&mut self) -> Result<()> {
90        self.datanode
91            .shutdown()
92            .await
93            .context(ShutdownDatanodeSnafu)
94    }
95}
96
97#[derive(Parser)]
98pub struct Command {
99    #[clap(subcommand)]
100    pub subcmd: SubCommand,
101}
102
103impl Command {
104    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
105        self.subcmd.build_with(builder).await
106    }
107
108    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
109        match &self.subcmd {
110            SubCommand::Start(cmd) => cmd.load_options(global_options),
111            SubCommand::Objbench(_) | SubCommand::Scanbench(_) => {
112                // For bench commands, we don't need to load DatanodeOptions
113                // They are standalone utility commands
114                let mut opts = datanode::config::DatanodeOptions::default();
115                opts.sanitize();
116                Ok(DatanodeOptions {
117                    runtime: Default::default(),
118                    plugins: Default::default(),
119                    component: opts,
120                })
121            }
122        }
123    }
124}
125
126#[derive(Parser)]
127pub enum SubCommand {
128    Start(StartCommand),
129    /// Object storage benchmark tool
130    Objbench(ObjbenchCommand),
131    /// Scan benchmark tool - benchmarks scanning a region directly from storage
132    Scanbench(ScanbenchCommand),
133}
134
135impl SubCommand {
136    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
137        match self {
138            SubCommand::Start(cmd) => {
139                info!("Building datanode with {:#?}", cmd);
140                builder.build().await
141            }
142            SubCommand::Objbench(cmd) => {
143                cmd.run().await?;
144                std::process::exit(0);
145            }
146            SubCommand::Scanbench(cmd) => {
147                cmd.run().await?;
148                std::process::exit(0);
149            }
150        }
151    }
152}
153
154/// Storage engine config
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct StorageConfig {
158    /// The working directory of database
159    pub data_home: String,
160    #[serde(flatten)]
161    pub store: object_store::config::ObjectStoreConfig,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
165#[serde(default)]
166struct StorageConfigWrapper {
167    storage: StorageConfig,
168    region_engine: Vec<RegionEngineConfig>,
169    #[serde(default, deserialize_with = "deserialize_wal_config")]
170    wal: DatanodeWalConfig,
171}
172
173/// Deserializes [`DatanodeWalConfig`], defaulting `provider` to `"raft_engine"` when
174/// the `[wal]` section is present but omits it. This mirrors the behavior of the
175/// datanode's layered config loader which merges over a default that already contains
176/// `provider`.
177fn deserialize_wal_config<'de, D>(
178    deserializer: D,
179) -> std::result::Result<DatanodeWalConfig, D::Error>
180where
181    D: serde::Deserializer<'de>,
182{
183    use serde::de::Error as _;
184
185    let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
186    if !table.contains_key("provider") {
187        table.insert(
188            "provider".to_string(),
189            toml::Value::String("raft_engine".to_string()),
190        );
191    }
192    DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
193}
194
195#[derive(Debug, Parser, Default)]
196pub struct StartCommand {
197    #[clap(long)]
198    node_id: Option<u64>,
199    /// The address to bind the gRPC server.
200    #[clap(long, alias = "rpc-addr")]
201    rpc_bind_addr: Option<String>,
202    /// The address advertised to the metasrv, and used for connections from outside the host.
203    /// If left empty or unset, the server will automatically use the IP address of the first network interface
204    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
205    #[clap(long, alias = "rpc-hostname")]
206    rpc_server_addr: Option<String>,
207    #[clap(long, value_delimiter = ',', num_args = 1..)]
208    metasrv_addrs: Option<Vec<String>>,
209    #[clap(short, long)]
210    config_file: Option<String>,
211    #[clap(long)]
212    data_home: Option<String>,
213    #[clap(long)]
214    wal_dir: Option<String>,
215    #[clap(long)]
216    http_addr: Option<String>,
217    #[clap(long)]
218    http_timeout: Option<u64>,
219    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
220    env_prefix: String,
221}
222
223impl StartCommand {
224    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
225        let mut opts = DatanodeOptions::load_layered_options(
226            self.config_file.as_deref(),
227            self.env_prefix.as_ref(),
228        )
229        .context(LoadLayeredConfigSnafu)?;
230
231        self.merge_with_cli_options(global_options, &mut opts)?;
232        opts.component.sanitize();
233
234        Ok(opts)
235    }
236
237    // The precedence order is: cli > config file > environment variables > default values.
238    #[allow(deprecated)]
239    fn merge_with_cli_options(
240        &self,
241        global_options: &GlobalOptions,
242        opts: &mut DatanodeOptions,
243    ) -> Result<()> {
244        let opts = &mut opts.component;
245
246        if let Some(dir) = &global_options.log_dir {
247            opts.logging.dir.clone_from(dir);
248        }
249
250        if global_options.log_level.is_some() {
251            opts.logging.level.clone_from(&global_options.log_level);
252        }
253
254        opts.tracing = TracingOptions {
255            #[cfg(feature = "tokio-console")]
256            tokio_console_addr: global_options.tokio_console_addr.clone(),
257        };
258
259        if let Some(addr) = &self.rpc_bind_addr {
260            opts.grpc.bind_addr.clone_from(addr);
261        } else if let Some(addr) = &opts.rpc_addr {
262            warn!(
263                "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
264            );
265            opts.grpc.bind_addr.clone_from(addr);
266        }
267
268        if let Some(server_addr) = &self.rpc_server_addr {
269            opts.grpc.server_addr.clone_from(server_addr);
270        } else if let Some(server_addr) = &opts.rpc_hostname {
271            warn!(
272                "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
273            );
274            opts.grpc.server_addr.clone_from(server_addr);
275        }
276
277        if let Some(runtime_size) = opts.rpc_runtime_size {
278            warn!(
279                "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
280            );
281            opts.grpc.runtime_size = runtime_size;
282        }
283
284        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
285            warn!(
286                "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
287            );
288            opts.grpc.max_recv_message_size = max_recv_message_size;
289        }
290
291        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
292            warn!(
293                "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
294            );
295            opts.grpc.max_send_message_size = max_send_message_size;
296        }
297
298        if let Some(node_id) = self.node_id {
299            opts.node_id = Some(node_id);
300        }
301
302        if let Some(metasrv_addrs) = &self.metasrv_addrs {
303            opts.meta_client
304                .get_or_insert_with(MetaClientOptions::default)
305                .metasrv_addrs
306                .clone_from(metasrv_addrs);
307        }
308
309        ensure!(
310            opts.node_id.is_some(),
311            MissingConfigSnafu {
312                msg: "Missing node id option"
313            }
314        );
315
316        if let Some(data_home) = &self.data_home {
317            opts.storage.data_home.clone_from(data_home);
318        }
319
320        // `wal_dir` only affects raft-engine config.
321        if let Some(wal_dir) = &self.wal_dir
322            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
323        {
324            if raft_engine_config
325                .dir
326                .as_ref()
327                .is_some_and(|original_dir| original_dir != wal_dir)
328            {
329                info!("The wal dir of raft-engine is altered to {wal_dir}");
330            }
331            raft_engine_config.dir.replace(wal_dir.clone());
332        }
333
334        // If the logging dir is not set, use the default logs dir in the data home.
335        if opts.logging.dir.is_empty() {
336            opts.logging.dir = Path::new(&opts.storage.data_home)
337                .join(DEFAULT_LOGGING_DIR)
338                .to_string_lossy()
339                .to_string();
340        }
341
342        if let Some(http_addr) = &self.http_addr {
343            opts.http.addr.clone_from(http_addr);
344        }
345
346        if let Some(http_timeout) = self.http_timeout {
347            opts.http.timeout = Duration::from_secs(http_timeout)
348        }
349
350        // Disable dashboard in datanode.
351        opts.http.disable_dashboard = true;
352
353        Ok(())
354    }
355}
356
357#[cfg(test)]
358mod tests {
359    use std::assert_matches::assert_matches;
360    use std::io::Write;
361    use std::time::Duration;
362
363    use common_config::ENV_VAR_SEP;
364    use common_test_util::temp_dir::create_named_temp_file;
365    use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
366
367    use super::*;
368    use crate::options::GlobalOptions;
369
370    #[test]
371    fn test_deprecated_cli_options() {
372        common_telemetry::init_default_ut_logging();
373        let mut file = create_named_temp_file();
374        let toml_str = r#"
375            enable_memory_catalog = false
376            node_id = 42
377
378            rpc_addr = "127.0.0.1:4001"
379            rpc_hostname = "192.168.0.1"
380            [grpc]
381            bind_addr = "127.0.0.1:3001"
382            server_addr = "127.0.0.1"
383            runtime_size = 8
384        "#;
385        write!(file, "{}", toml_str).unwrap();
386
387        let cmd = StartCommand {
388            config_file: Some(file.path().to_str().unwrap().to_string()),
389            ..Default::default()
390        };
391
392        let options = cmd.load_options(&Default::default()).unwrap().component;
393        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
394        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
395    }
396
397    #[test]
398    fn test_read_from_config_file() {
399        let mut file = create_named_temp_file();
400        let toml_str = r#"
401            enable_memory_catalog = false
402            node_id = 42
403
404            [grpc]
405            addr = "127.0.0.1:3001"
406            hostname = "127.0.0.1"
407            runtime_size = 8
408
409            [meta_client]
410            metasrv_addrs = ["127.0.0.1:3002"]
411            timeout = "3s"
412            connect_timeout = "5s"
413            ddl_timeout = "10s"
414            tcp_nodelay = true
415
416            [wal]
417            provider = "raft_engine"
418            dir = "/other/wal"
419            file_size = "1GB"
420            purge_threshold = "50GB"
421            purge_interval = "10m"
422            read_batch_size = 128
423            sync_write = false
424
425            [storage]
426            data_home = "./greptimedb_data/"
427            type = "File"
428
429            [[storage.providers]]
430            type = "Gcs"
431            bucket = "foo"
432            endpoint = "bar"
433
434            [[storage.providers]]
435            type = "S3"
436            bucket = "foo"
437
438            [logging]
439            level = "debug"
440            dir = "./greptimedb_data/test/logs"
441        "#;
442        write!(file, "{}", toml_str).unwrap();
443
444        let cmd = StartCommand {
445            config_file: Some(file.path().to_str().unwrap().to_string()),
446            ..Default::default()
447        };
448
449        let options = cmd.load_options(&Default::default()).unwrap().component;
450
451        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
452        assert_eq!(Some(42), options.node_id);
453
454        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
455            unreachable!()
456        };
457        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
458        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
459        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
460        assert_eq!(
461            1024 * 1024 * 1024 * 50,
462            raft_engine_config.purge_threshold.0
463        );
464        assert!(!raft_engine_config.sync_write);
465
466        let MetaClientOptions {
467            metasrv_addrs: metasrv_addr,
468            timeout,
469            connect_timeout,
470            ddl_timeout,
471            tcp_nodelay,
472            ..
473        } = options.meta_client.unwrap();
474
475        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
476        assert_eq!(5000, connect_timeout.as_millis());
477        assert_eq!(10000, ddl_timeout.as_millis());
478        assert_eq!(3000, timeout.as_millis());
479        assert!(tcp_nodelay);
480        assert_eq!("./greptimedb_data/", options.storage.data_home);
481        assert!(matches!(
482            &options.storage.store,
483            ObjectStoreConfig::File(FileConfig { .. })
484        ));
485        assert_eq!(options.storage.providers.len(), 2);
486        assert!(matches!(
487            options.storage.providers[0],
488            ObjectStoreConfig::Gcs(GcsConfig { .. })
489        ));
490        assert!(matches!(
491            options.storage.providers[1],
492            ObjectStoreConfig::S3(S3Config { .. })
493        ));
494
495        assert_eq!("debug", options.logging.level.unwrap());
496        assert_eq!(
497            "./greptimedb_data/test/logs".to_string(),
498            options.logging.dir
499        );
500    }
501
502    #[test]
503    fn test_try_from_cmd() {
504        assert!(
505            (StartCommand {
506                metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
507                ..Default::default()
508            })
509            .load_options(&GlobalOptions::default())
510            .is_err()
511        );
512
513        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
514        assert!(
515            (StartCommand {
516                node_id: Some(42),
517                ..Default::default()
518            })
519            .load_options(&GlobalOptions::default())
520            .is_ok()
521        );
522    }
523
524    #[test]
525    fn test_load_log_options_from_cli() {
526        let mut cmd = StartCommand::default();
527
528        let result = cmd.load_options(&GlobalOptions {
529            log_dir: Some("./greptimedb_data/test/logs".to_string()),
530            log_level: Some("debug".to_string()),
531
532            #[cfg(feature = "tokio-console")]
533            tokio_console_addr: None,
534        });
535        // Missing node_id.
536        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
537
538        cmd.node_id = Some(42);
539
540        let options = cmd
541            .load_options(&GlobalOptions {
542                log_dir: Some("./greptimedb_data/test/logs".to_string()),
543                log_level: Some("debug".to_string()),
544
545                #[cfg(feature = "tokio-console")]
546                tokio_console_addr: None,
547            })
548            .unwrap()
549            .component;
550
551        let logging_opt = options.logging;
552        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
553        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
554    }
555
556    #[test]
557    fn test_config_precedence_order() {
558        let mut file = create_named_temp_file();
559        let toml_str = r#"
560            enable_memory_catalog = false
561            node_id = 42
562            rpc_addr = "127.0.0.1:3001"
563            rpc_runtime_size = 8
564            rpc_hostname = "10.103.174.219"
565
566            [meta_client]
567            timeout = "3s"
568            connect_timeout = "5s"
569            tcp_nodelay = true
570
571            [wal]
572            provider = "raft_engine"
573            file_size = "1GB"
574            purge_threshold = "50GB"
575            purge_interval = "5m"
576            sync_write = false
577
578            [storage]
579            type = "File"
580            data_home = "./greptimedb_data/"
581
582            [logging]
583            level = "debug"
584            dir = "./greptimedb_data/test/logs"
585        "#;
586        write!(file, "{}", toml_str).unwrap();
587
588        let env_prefix = "DATANODE_UT";
589        temp_env::with_vars(
590            [
591                (
592                    // wal.purge_interval = 1m
593                    [
594                        env_prefix.to_string(),
595                        "wal".to_uppercase(),
596                        "purge_interval".to_uppercase(),
597                    ]
598                    .join(ENV_VAR_SEP),
599                    Some("1m"),
600                ),
601                (
602                    // wal.read_batch_size = 100
603                    [
604                        env_prefix.to_string(),
605                        "wal".to_uppercase(),
606                        "read_batch_size".to_uppercase(),
607                    ]
608                    .join(ENV_VAR_SEP),
609                    Some("100"),
610                ),
611                (
612                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
613                    [
614                        env_prefix.to_string(),
615                        "meta_client".to_uppercase(),
616                        "metasrv_addrs".to_uppercase(),
617                    ]
618                    .join(ENV_VAR_SEP),
619                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
620                ),
621            ],
622            || {
623                let command = StartCommand {
624                    config_file: Some(file.path().to_str().unwrap().to_string()),
625                    wal_dir: Some("/other/wal/dir".to_string()),
626                    env_prefix: env_prefix.to_string(),
627                    ..Default::default()
628                };
629
630                let opts = command.load_options(&Default::default()).unwrap().component;
631
632                // Should be read from env, env > default values.
633                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
634                    unreachable!()
635                };
636                assert_eq!(raft_engine_config.read_batch_size, 100);
637                assert_eq!(
638                    opts.meta_client.unwrap().metasrv_addrs,
639                    vec![
640                        "127.0.0.1:3001".to_string(),
641                        "127.0.0.1:3002".to_string(),
642                        "127.0.0.1:3003".to_string()
643                    ]
644                );
645
646                // Should be read from config file, config file > env > default values.
647                assert_eq!(
648                    raft_engine_config.purge_interval,
649                    Duration::from_secs(60 * 5)
650                );
651
652                // Should be read from cli, cli > config file > env > default values.
653                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
654
655                // Should be default value.
656                assert_eq!(
657                    opts.http.addr,
658                    DatanodeOptions::default().component.http.addr
659                );
660                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
661            },
662        );
663    }
664}