cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18
19use std::path::Path;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use clap::Parser;
24use common_config::Configurable;
25use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
26use common_telemetry::{info, warn};
27use common_wal::config::DatanodeWalConfig;
28use datanode::config::RegionEngineConfig;
29use datanode::datanode::Datanode;
30use meta_client::MetaClientOptions;
31use serde::{Deserialize, Serialize};
32use snafu::{ResultExt, ensure};
33use tracing_appender::non_blocking::WorkerGuard;
34
35use crate::App;
36use crate::datanode::builder::InstanceBuilder;
37use crate::datanode::objbench::ObjbenchCommand;
38use crate::error::{
39    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
40};
41use crate::options::{GlobalOptions, GreptimeOptions};
42
43pub const APP_NAME: &str = "greptime-datanode";
44
45type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
46
47pub struct Instance {
48    datanode: Datanode,
49
50    // Keep the logging guard to prevent the worker from being dropped.
51    _guard: Vec<WorkerGuard>,
52}
53
54impl Instance {
55    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
56        Self {
57            datanode,
58            _guard: guard,
59        }
60    }
61
62    pub fn datanode(&self) -> &Datanode {
63        &self.datanode
64    }
65
66    /// allow customizing datanode for downstream projects
67    pub fn datanode_mut(&mut self) -> &mut Datanode {
68        &mut self.datanode
69    }
70}
71
72#[async_trait]
73impl App for Instance {
74    fn name(&self) -> &str {
75        APP_NAME
76    }
77
78    async fn start(&mut self) -> Result<()> {
79        plugins::start_datanode_plugins(self.datanode.plugins())
80            .await
81            .context(StartDatanodeSnafu)?;
82
83        self.datanode.start().await.context(StartDatanodeSnafu)
84    }
85
86    async fn stop(&mut self) -> Result<()> {
87        self.datanode
88            .shutdown()
89            .await
90            .context(ShutdownDatanodeSnafu)
91    }
92}
93
94#[derive(Parser)]
95pub struct Command {
96    #[clap(subcommand)]
97    pub subcmd: SubCommand,
98}
99
100impl Command {
101    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
102        self.subcmd.build_with(builder).await
103    }
104
105    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
106        match &self.subcmd {
107            SubCommand::Start(cmd) => cmd.load_options(global_options),
108            SubCommand::Objbench(_) => {
109                // For objbench command, we don't need to load DatanodeOptions
110                // It's a standalone utility command
111                let mut opts = datanode::config::DatanodeOptions::default();
112                opts.sanitize();
113                Ok(DatanodeOptions {
114                    runtime: Default::default(),
115                    plugins: Default::default(),
116                    component: opts,
117                })
118            }
119        }
120    }
121}
122
123#[derive(Parser)]
124pub enum SubCommand {
125    Start(StartCommand),
126    /// Object storage benchmark tool
127    Objbench(ObjbenchCommand),
128}
129
130impl SubCommand {
131    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
132        match self {
133            SubCommand::Start(cmd) => {
134                info!("Building datanode with {:#?}", cmd);
135                builder.build().await
136            }
137            SubCommand::Objbench(cmd) => {
138                cmd.run().await?;
139                std::process::exit(0);
140            }
141        }
142    }
143}
144
145/// Storage engine config
146#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
147#[serde(default)]
148pub struct StorageConfig {
149    /// The working directory of database
150    pub data_home: String,
151    #[serde(flatten)]
152    pub store: object_store::config::ObjectStoreConfig,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157struct StorageConfigWrapper {
158    storage: StorageConfig,
159    region_engine: Vec<RegionEngineConfig>,
160}
161
162#[derive(Debug, Parser, Default)]
163pub struct StartCommand {
164    #[clap(long)]
165    node_id: Option<u64>,
166    /// The address to bind the gRPC server.
167    #[clap(long, alias = "rpc-addr")]
168    rpc_bind_addr: Option<String>,
169    /// The address advertised to the metasrv, and used for connections from outside the host.
170    /// If left empty or unset, the server will automatically use the IP address of the first network interface
171    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
172    #[clap(long, alias = "rpc-hostname")]
173    rpc_server_addr: Option<String>,
174    #[clap(long, value_delimiter = ',', num_args = 1..)]
175    metasrv_addrs: Option<Vec<String>>,
176    #[clap(short, long)]
177    config_file: Option<String>,
178    #[clap(long)]
179    data_home: Option<String>,
180    #[clap(long)]
181    wal_dir: Option<String>,
182    #[clap(long)]
183    http_addr: Option<String>,
184    #[clap(long)]
185    http_timeout: Option<u64>,
186    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
187    env_prefix: String,
188}
189
190impl StartCommand {
191    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
192        let mut opts = DatanodeOptions::load_layered_options(
193            self.config_file.as_deref(),
194            self.env_prefix.as_ref(),
195        )
196        .context(LoadLayeredConfigSnafu)?;
197
198        self.merge_with_cli_options(global_options, &mut opts)?;
199        opts.component.sanitize();
200
201        Ok(opts)
202    }
203
204    // The precedence order is: cli > config file > environment variables > default values.
205    #[allow(deprecated)]
206    fn merge_with_cli_options(
207        &self,
208        global_options: &GlobalOptions,
209        opts: &mut DatanodeOptions,
210    ) -> Result<()> {
211        let opts = &mut opts.component;
212
213        if let Some(dir) = &global_options.log_dir {
214            opts.logging.dir.clone_from(dir);
215        }
216
217        if global_options.log_level.is_some() {
218            opts.logging.level.clone_from(&global_options.log_level);
219        }
220
221        opts.tracing = TracingOptions {
222            #[cfg(feature = "tokio-console")]
223            tokio_console_addr: global_options.tokio_console_addr.clone(),
224        };
225
226        if let Some(addr) = &self.rpc_bind_addr {
227            opts.grpc.bind_addr.clone_from(addr);
228        } else if let Some(addr) = &opts.rpc_addr {
229            warn!(
230                "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
231            );
232            opts.grpc.bind_addr.clone_from(addr);
233        }
234
235        if let Some(server_addr) = &self.rpc_server_addr {
236            opts.grpc.server_addr.clone_from(server_addr);
237        } else if let Some(server_addr) = &opts.rpc_hostname {
238            warn!(
239                "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
240            );
241            opts.grpc.server_addr.clone_from(server_addr);
242        }
243
244        if let Some(runtime_size) = opts.rpc_runtime_size {
245            warn!(
246                "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
247            );
248            opts.grpc.runtime_size = runtime_size;
249        }
250
251        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
252            warn!(
253                "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
254            );
255            opts.grpc.max_recv_message_size = max_recv_message_size;
256        }
257
258        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
259            warn!(
260                "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
261            );
262            opts.grpc.max_send_message_size = max_send_message_size;
263        }
264
265        if let Some(node_id) = self.node_id {
266            opts.node_id = Some(node_id);
267        }
268
269        if let Some(metasrv_addrs) = &self.metasrv_addrs {
270            opts.meta_client
271                .get_or_insert_with(MetaClientOptions::default)
272                .metasrv_addrs
273                .clone_from(metasrv_addrs);
274        }
275
276        ensure!(
277            opts.node_id.is_some(),
278            MissingConfigSnafu {
279                msg: "Missing node id option"
280            }
281        );
282
283        if let Some(data_home) = &self.data_home {
284            opts.storage.data_home.clone_from(data_home);
285        }
286
287        // `wal_dir` only affects raft-engine config.
288        if let Some(wal_dir) = &self.wal_dir
289            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
290        {
291            if raft_engine_config
292                .dir
293                .as_ref()
294                .is_some_and(|original_dir| original_dir != wal_dir)
295            {
296                info!("The wal dir of raft-engine is altered to {wal_dir}");
297            }
298            raft_engine_config.dir.replace(wal_dir.clone());
299        }
300
301        // If the logging dir is not set, use the default logs dir in the data home.
302        if opts.logging.dir.is_empty() {
303            opts.logging.dir = Path::new(&opts.storage.data_home)
304                .join(DEFAULT_LOGGING_DIR)
305                .to_string_lossy()
306                .to_string();
307        }
308
309        if let Some(http_addr) = &self.http_addr {
310            opts.http.addr.clone_from(http_addr);
311        }
312
313        if let Some(http_timeout) = self.http_timeout {
314            opts.http.timeout = Duration::from_secs(http_timeout)
315        }
316
317        // Disable dashboard in datanode.
318        opts.http.disable_dashboard = true;
319
320        Ok(())
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use std::assert_matches::assert_matches;
327    use std::io::Write;
328    use std::time::Duration;
329
330    use common_config::ENV_VAR_SEP;
331    use common_test_util::temp_dir::create_named_temp_file;
332    use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
333    use servers::heartbeat_options::HeartbeatOptions;
334
335    use super::*;
336    use crate::options::GlobalOptions;
337
338    #[test]
339    fn test_deprecated_cli_options() {
340        common_telemetry::init_default_ut_logging();
341        let mut file = create_named_temp_file();
342        let toml_str = r#"
343            enable_memory_catalog = false
344            node_id = 42
345
346            rpc_addr = "127.0.0.1:4001"
347            rpc_hostname = "192.168.0.1"
348            [grpc]
349            bind_addr = "127.0.0.1:3001"
350            server_addr = "127.0.0.1"
351            runtime_size = 8
352        "#;
353        write!(file, "{}", toml_str).unwrap();
354
355        let cmd = StartCommand {
356            config_file: Some(file.path().to_str().unwrap().to_string()),
357            ..Default::default()
358        };
359
360        let options = cmd.load_options(&Default::default()).unwrap().component;
361        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
362        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
363    }
364
365    #[test]
366    fn test_read_from_config_file() {
367        let mut file = create_named_temp_file();
368        let toml_str = r#"
369            enable_memory_catalog = false
370            node_id = 42
371
372            [grpc]
373            addr = "127.0.0.1:3001"
374            hostname = "127.0.0.1"
375            runtime_size = 8
376
377            [heartbeat]
378            interval = "300ms"
379
380            [meta_client]
381            metasrv_addrs = ["127.0.0.1:3002"]
382            timeout = "3s"
383            connect_timeout = "5s"
384            ddl_timeout = "10s"
385            tcp_nodelay = true
386
387            [wal]
388            provider = "raft_engine"
389            dir = "/other/wal"
390            file_size = "1GB"
391            purge_threshold = "50GB"
392            purge_interval = "10m"
393            read_batch_size = 128
394            sync_write = false
395
396            [storage]
397            data_home = "./greptimedb_data/"
398            type = "File"
399
400            [[storage.providers]]
401            type = "Gcs"
402            bucket = "foo"
403            endpoint = "bar"
404
405            [[storage.providers]]
406            type = "S3"
407            bucket = "foo"
408
409            [logging]
410            level = "debug"
411            dir = "./greptimedb_data/test/logs"
412        "#;
413        write!(file, "{}", toml_str).unwrap();
414
415        let cmd = StartCommand {
416            config_file: Some(file.path().to_str().unwrap().to_string()),
417            ..Default::default()
418        };
419
420        let options = cmd.load_options(&Default::default()).unwrap().component;
421
422        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
423        assert_eq!(Some(42), options.node_id);
424
425        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
426            unreachable!()
427        };
428        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
429        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
430        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
431        assert_eq!(
432            1024 * 1024 * 1024 * 50,
433            raft_engine_config.purge_threshold.0
434        );
435        assert!(!raft_engine_config.sync_write);
436
437        let HeartbeatOptions {
438            interval: heart_beat_interval,
439            ..
440        } = options.heartbeat;
441
442        assert_eq!(300, heart_beat_interval.as_millis());
443
444        let MetaClientOptions {
445            metasrv_addrs: metasrv_addr,
446            timeout,
447            connect_timeout,
448            ddl_timeout,
449            tcp_nodelay,
450            ..
451        } = options.meta_client.unwrap();
452
453        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
454        assert_eq!(5000, connect_timeout.as_millis());
455        assert_eq!(10000, ddl_timeout.as_millis());
456        assert_eq!(3000, timeout.as_millis());
457        assert!(tcp_nodelay);
458        assert_eq!("./greptimedb_data/", options.storage.data_home);
459        assert!(matches!(
460            &options.storage.store,
461            ObjectStoreConfig::File(FileConfig { .. })
462        ));
463        assert_eq!(options.storage.providers.len(), 2);
464        assert!(matches!(
465            options.storage.providers[0],
466            ObjectStoreConfig::Gcs(GcsConfig { .. })
467        ));
468        assert!(matches!(
469            options.storage.providers[1],
470            ObjectStoreConfig::S3(S3Config { .. })
471        ));
472
473        assert_eq!("debug", options.logging.level.unwrap());
474        assert_eq!(
475            "./greptimedb_data/test/logs".to_string(),
476            options.logging.dir
477        );
478    }
479
480    #[test]
481    fn test_try_from_cmd() {
482        assert!(
483            (StartCommand {
484                metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
485                ..Default::default()
486            })
487            .load_options(&GlobalOptions::default())
488            .is_err()
489        );
490
491        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
492        assert!(
493            (StartCommand {
494                node_id: Some(42),
495                ..Default::default()
496            })
497            .load_options(&GlobalOptions::default())
498            .is_ok()
499        );
500    }
501
502    #[test]
503    fn test_load_log_options_from_cli() {
504        let mut cmd = StartCommand::default();
505
506        let result = cmd.load_options(&GlobalOptions {
507            log_dir: Some("./greptimedb_data/test/logs".to_string()),
508            log_level: Some("debug".to_string()),
509
510            #[cfg(feature = "tokio-console")]
511            tokio_console_addr: None,
512        });
513        // Missing node_id.
514        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
515
516        cmd.node_id = Some(42);
517
518        let options = cmd
519            .load_options(&GlobalOptions {
520                log_dir: Some("./greptimedb_data/test/logs".to_string()),
521                log_level: Some("debug".to_string()),
522
523                #[cfg(feature = "tokio-console")]
524                tokio_console_addr: None,
525            })
526            .unwrap()
527            .component;
528
529        let logging_opt = options.logging;
530        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
531        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
532    }
533
534    #[test]
535    fn test_config_precedence_order() {
536        let mut file = create_named_temp_file();
537        let toml_str = r#"
538            enable_memory_catalog = false
539            node_id = 42
540            rpc_addr = "127.0.0.1:3001"
541            rpc_runtime_size = 8
542            rpc_hostname = "10.103.174.219"
543
544            [meta_client]
545            timeout = "3s"
546            connect_timeout = "5s"
547            tcp_nodelay = true
548
549            [wal]
550            provider = "raft_engine"
551            file_size = "1GB"
552            purge_threshold = "50GB"
553            purge_interval = "5m"
554            sync_write = false
555
556            [storage]
557            type = "File"
558            data_home = "./greptimedb_data/"
559
560            [logging]
561            level = "debug"
562            dir = "./greptimedb_data/test/logs"
563        "#;
564        write!(file, "{}", toml_str).unwrap();
565
566        let env_prefix = "DATANODE_UT";
567        temp_env::with_vars(
568            [
569                (
570                    // wal.purge_interval = 1m
571                    [
572                        env_prefix.to_string(),
573                        "wal".to_uppercase(),
574                        "purge_interval".to_uppercase(),
575                    ]
576                    .join(ENV_VAR_SEP),
577                    Some("1m"),
578                ),
579                (
580                    // wal.read_batch_size = 100
581                    [
582                        env_prefix.to_string(),
583                        "wal".to_uppercase(),
584                        "read_batch_size".to_uppercase(),
585                    ]
586                    .join(ENV_VAR_SEP),
587                    Some("100"),
588                ),
589                (
590                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
591                    [
592                        env_prefix.to_string(),
593                        "meta_client".to_uppercase(),
594                        "metasrv_addrs".to_uppercase(),
595                    ]
596                    .join(ENV_VAR_SEP),
597                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
598                ),
599            ],
600            || {
601                let command = StartCommand {
602                    config_file: Some(file.path().to_str().unwrap().to_string()),
603                    wal_dir: Some("/other/wal/dir".to_string()),
604                    env_prefix: env_prefix.to_string(),
605                    ..Default::default()
606                };
607
608                let opts = command.load_options(&Default::default()).unwrap().component;
609
610                // Should be read from env, env > default values.
611                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
612                    unreachable!()
613                };
614                assert_eq!(raft_engine_config.read_batch_size, 100);
615                assert_eq!(
616                    opts.meta_client.unwrap().metasrv_addrs,
617                    vec![
618                        "127.0.0.1:3001".to_string(),
619                        "127.0.0.1:3002".to_string(),
620                        "127.0.0.1:3003".to_string()
621                    ]
622                );
623
624                // Should be read from config file, config file > env > default values.
625                assert_eq!(
626                    raft_engine_config.purge_interval,
627                    Duration::from_secs(60 * 5)
628                );
629
630                // Should be read from cli, cli > config file > env > default values.
631                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
632
633                // Should be default value.
634                assert_eq!(
635                    opts.http.addr,
636                    DatanodeOptions::default().component.http.addr
637                );
638                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
639            },
640        );
641    }
642}