cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::path::Path;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_config::Configurable;
23use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
24use common_telemetry::{info, warn};
25use common_wal::config::DatanodeWalConfig;
26use datanode::datanode::Datanode;
27use meta_client::MetaClientOptions;
28use snafu::{ensure, ResultExt};
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::datanode::builder::InstanceBuilder;
32use crate::error::{
33    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
34};
35use crate::options::{GlobalOptions, GreptimeOptions};
36use crate::App;
37
38pub const APP_NAME: &str = "greptime-datanode";
39
40type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
41
42pub struct Instance {
43    datanode: Datanode,
44
45    // Keep the logging guard to prevent the worker from being dropped.
46    _guard: Vec<WorkerGuard>,
47}
48
49impl Instance {
50    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
51        Self {
52            datanode,
53            _guard: guard,
54        }
55    }
56
57    pub fn datanode(&self) -> &Datanode {
58        &self.datanode
59    }
60
61    /// allow customizing datanode for downstream projects
62    pub fn datanode_mut(&mut self) -> &mut Datanode {
63        &mut self.datanode
64    }
65}
66
67#[async_trait]
68impl App for Instance {
69    fn name(&self) -> &str {
70        APP_NAME
71    }
72
73    async fn start(&mut self) -> Result<()> {
74        plugins::start_datanode_plugins(self.datanode.plugins())
75            .await
76            .context(StartDatanodeSnafu)?;
77
78        self.datanode.start().await.context(StartDatanodeSnafu)
79    }
80
81    async fn stop(&mut self) -> Result<()> {
82        self.datanode
83            .shutdown()
84            .await
85            .context(ShutdownDatanodeSnafu)
86    }
87}
88
89#[derive(Parser)]
90pub struct Command {
91    #[clap(subcommand)]
92    subcmd: SubCommand,
93}
94
95impl Command {
96    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
97        self.subcmd.build_with(builder).await
98    }
99
100    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
101        match &self.subcmd {
102            SubCommand::Start(cmd) => cmd.load_options(global_options),
103        }
104    }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109    Start(StartCommand),
110}
111
112impl SubCommand {
113    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
114        match self {
115            SubCommand::Start(cmd) => {
116                info!("Building datanode with {:#?}", cmd);
117                builder.build().await
118            }
119        }
120    }
121}
122
123#[derive(Debug, Parser, Default)]
124struct StartCommand {
125    #[clap(long)]
126    node_id: Option<u64>,
127    /// The address to bind the gRPC server.
128    #[clap(long, alias = "rpc-addr")]
129    rpc_bind_addr: Option<String>,
130    /// The address advertised to the metasrv, and used for connections from outside the host.
131    /// If left empty or unset, the server will automatically use the IP address of the first network interface
132    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
133    #[clap(long, alias = "rpc-hostname")]
134    rpc_server_addr: Option<String>,
135    #[clap(long, value_delimiter = ',', num_args = 1..)]
136    metasrv_addrs: Option<Vec<String>>,
137    #[clap(short, long)]
138    config_file: Option<String>,
139    #[clap(long)]
140    data_home: Option<String>,
141    #[clap(long)]
142    wal_dir: Option<String>,
143    #[clap(long)]
144    http_addr: Option<String>,
145    #[clap(long)]
146    http_timeout: Option<u64>,
147    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
148    env_prefix: String,
149}
150
151impl StartCommand {
152    fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
153        let mut opts = DatanodeOptions::load_layered_options(
154            self.config_file.as_deref(),
155            self.env_prefix.as_ref(),
156        )
157        .context(LoadLayeredConfigSnafu)?;
158
159        self.merge_with_cli_options(global_options, &mut opts)?;
160        opts.component.sanitize();
161
162        Ok(opts)
163    }
164
165    // The precedence order is: cli > config file > environment variables > default values.
166    #[allow(deprecated)]
167    fn merge_with_cli_options(
168        &self,
169        global_options: &GlobalOptions,
170        opts: &mut DatanodeOptions,
171    ) -> Result<()> {
172        let opts = &mut opts.component;
173
174        if let Some(dir) = &global_options.log_dir {
175            opts.logging.dir.clone_from(dir);
176        }
177
178        if global_options.log_level.is_some() {
179            opts.logging.level.clone_from(&global_options.log_level);
180        }
181
182        opts.tracing = TracingOptions {
183            #[cfg(feature = "tokio-console")]
184            tokio_console_addr: global_options.tokio_console_addr.clone(),
185        };
186
187        if let Some(addr) = &self.rpc_bind_addr {
188            opts.grpc.bind_addr.clone_from(addr);
189        } else if let Some(addr) = &opts.rpc_addr {
190            warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
191            opts.grpc.bind_addr.clone_from(addr);
192        }
193
194        if let Some(server_addr) = &self.rpc_server_addr {
195            opts.grpc.server_addr.clone_from(server_addr);
196        } else if let Some(server_addr) = &opts.rpc_hostname {
197            warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
198            opts.grpc.server_addr.clone_from(server_addr);
199        }
200
201        if let Some(runtime_size) = opts.rpc_runtime_size {
202            warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
203            opts.grpc.runtime_size = runtime_size;
204        }
205
206        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
207            warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
208            opts.grpc.max_recv_message_size = max_recv_message_size;
209        }
210
211        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
212            warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
213            opts.grpc.max_send_message_size = max_send_message_size;
214        }
215
216        if let Some(node_id) = self.node_id {
217            opts.node_id = Some(node_id);
218        }
219
220        if let Some(metasrv_addrs) = &self.metasrv_addrs {
221            opts.meta_client
222                .get_or_insert_with(MetaClientOptions::default)
223                .metasrv_addrs
224                .clone_from(metasrv_addrs);
225        }
226
227        ensure!(
228            opts.node_id.is_some(),
229            MissingConfigSnafu {
230                msg: "Missing node id option"
231            }
232        );
233
234        if let Some(data_home) = &self.data_home {
235            opts.storage.data_home.clone_from(data_home);
236        }
237
238        // `wal_dir` only affects raft-engine config.
239        if let Some(wal_dir) = &self.wal_dir
240            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
241        {
242            if raft_engine_config
243                .dir
244                .as_ref()
245                .is_some_and(|original_dir| original_dir != wal_dir)
246            {
247                info!("The wal dir of raft-engine is altered to {wal_dir}");
248            }
249            raft_engine_config.dir.replace(wal_dir.clone());
250        }
251
252        // If the logging dir is not set, use the default logs dir in the data home.
253        if opts.logging.dir.is_empty() {
254            opts.logging.dir = Path::new(&opts.storage.data_home)
255                .join(DEFAULT_LOGGING_DIR)
256                .to_string_lossy()
257                .to_string();
258        }
259
260        if let Some(http_addr) = &self.http_addr {
261            opts.http.addr.clone_from(http_addr);
262        }
263
264        if let Some(http_timeout) = self.http_timeout {
265            opts.http.timeout = Duration::from_secs(http_timeout)
266        }
267
268        // Disable dashboard in datanode.
269        opts.http.disable_dashboard = true;
270
271        Ok(())
272    }
273}
274
275#[cfg(test)]
276mod tests {
277    use std::assert_matches::assert_matches;
278    use std::io::Write;
279    use std::time::Duration;
280
281    use common_config::ENV_VAR_SEP;
282    use common_test_util::temp_dir::create_named_temp_file;
283    use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
284    use servers::heartbeat_options::HeartbeatOptions;
285
286    use super::*;
287    use crate::options::GlobalOptions;
288
289    #[test]
290    fn test_deprecated_cli_options() {
291        common_telemetry::init_default_ut_logging();
292        let mut file = create_named_temp_file();
293        let toml_str = r#"
294            enable_memory_catalog = false
295            node_id = 42
296
297            rpc_addr = "127.0.0.1:4001"
298            rpc_hostname = "192.168.0.1"
299            [grpc]
300            bind_addr = "127.0.0.1:3001"
301            server_addr = "127.0.0.1"
302            runtime_size = 8
303        "#;
304        write!(file, "{}", toml_str).unwrap();
305
306        let cmd = StartCommand {
307            config_file: Some(file.path().to_str().unwrap().to_string()),
308            ..Default::default()
309        };
310
311        let options = cmd.load_options(&Default::default()).unwrap().component;
312        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
313        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
314    }
315
316    #[test]
317    fn test_read_from_config_file() {
318        let mut file = create_named_temp_file();
319        let toml_str = r#"
320            enable_memory_catalog = false
321            node_id = 42
322
323            [grpc]
324            addr = "127.0.0.1:3001"
325            hostname = "127.0.0.1"
326            runtime_size = 8
327
328            [heartbeat]
329            interval = "300ms"
330
331            [meta_client]
332            metasrv_addrs = ["127.0.0.1:3002"]
333            timeout = "3s"
334            connect_timeout = "5s"
335            ddl_timeout = "10s"
336            tcp_nodelay = true
337
338            [wal]
339            provider = "raft_engine"
340            dir = "/other/wal"
341            file_size = "1GB"
342            purge_threshold = "50GB"
343            purge_interval = "10m"
344            read_batch_size = 128
345            sync_write = false
346
347            [storage]
348            data_home = "./greptimedb_data/"
349            type = "File"
350
351            [[storage.providers]]
352            type = "Gcs"
353            bucket = "foo"
354            endpoint = "bar"
355
356            [[storage.providers]]
357            type = "S3"
358            bucket = "foo"
359
360            [logging]
361            level = "debug"
362            dir = "./greptimedb_data/test/logs"
363        "#;
364        write!(file, "{}", toml_str).unwrap();
365
366        let cmd = StartCommand {
367            config_file: Some(file.path().to_str().unwrap().to_string()),
368            ..Default::default()
369        };
370
371        let options = cmd.load_options(&Default::default()).unwrap().component;
372
373        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
374        assert_eq!(Some(42), options.node_id);
375
376        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
377            unreachable!()
378        };
379        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
380        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
381        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
382        assert_eq!(
383            1024 * 1024 * 1024 * 50,
384            raft_engine_config.purge_threshold.0
385        );
386        assert!(!raft_engine_config.sync_write);
387
388        let HeartbeatOptions {
389            interval: heart_beat_interval,
390            ..
391        } = options.heartbeat;
392
393        assert_eq!(300, heart_beat_interval.as_millis());
394
395        let MetaClientOptions {
396            metasrv_addrs: metasrv_addr,
397            timeout,
398            connect_timeout,
399            ddl_timeout,
400            tcp_nodelay,
401            ..
402        } = options.meta_client.unwrap();
403
404        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
405        assert_eq!(5000, connect_timeout.as_millis());
406        assert_eq!(10000, ddl_timeout.as_millis());
407        assert_eq!(3000, timeout.as_millis());
408        assert!(tcp_nodelay);
409        assert_eq!("./greptimedb_data/", options.storage.data_home);
410        assert!(matches!(
411            &options.storage.store,
412            ObjectStoreConfig::File(FileConfig { .. })
413        ));
414        assert_eq!(options.storage.providers.len(), 2);
415        assert!(matches!(
416            options.storage.providers[0],
417            ObjectStoreConfig::Gcs(GcsConfig { .. })
418        ));
419        assert!(matches!(
420            options.storage.providers[1],
421            ObjectStoreConfig::S3(S3Config { .. })
422        ));
423
424        assert_eq!("debug", options.logging.level.unwrap());
425        assert_eq!(
426            "./greptimedb_data/test/logs".to_string(),
427            options.logging.dir
428        );
429    }
430
431    #[test]
432    fn test_try_from_cmd() {
433        assert!((StartCommand {
434            metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
435            ..Default::default()
436        })
437        .load_options(&GlobalOptions::default())
438        .is_err());
439
440        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
441        assert!((StartCommand {
442            node_id: Some(42),
443            ..Default::default()
444        })
445        .load_options(&GlobalOptions::default())
446        .is_ok());
447    }
448
449    #[test]
450    fn test_load_log_options_from_cli() {
451        let mut cmd = StartCommand::default();
452
453        let result = cmd.load_options(&GlobalOptions {
454            log_dir: Some("./greptimedb_data/test/logs".to_string()),
455            log_level: Some("debug".to_string()),
456
457            #[cfg(feature = "tokio-console")]
458            tokio_console_addr: None,
459        });
460        // Missing node_id.
461        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
462
463        cmd.node_id = Some(42);
464
465        let options = cmd
466            .load_options(&GlobalOptions {
467                log_dir: Some("./greptimedb_data/test/logs".to_string()),
468                log_level: Some("debug".to_string()),
469
470                #[cfg(feature = "tokio-console")]
471                tokio_console_addr: None,
472            })
473            .unwrap()
474            .component;
475
476        let logging_opt = options.logging;
477        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
478        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
479    }
480
481    #[test]
482    fn test_config_precedence_order() {
483        let mut file = create_named_temp_file();
484        let toml_str = r#"
485            enable_memory_catalog = false
486            node_id = 42
487            rpc_addr = "127.0.0.1:3001"
488            rpc_runtime_size = 8
489            rpc_hostname = "10.103.174.219"
490
491            [meta_client]
492            timeout = "3s"
493            connect_timeout = "5s"
494            tcp_nodelay = true
495
496            [wal]
497            provider = "raft_engine"
498            file_size = "1GB"
499            purge_threshold = "50GB"
500            purge_interval = "5m"
501            sync_write = false
502
503            [storage]
504            type = "File"
505            data_home = "./greptimedb_data/"
506
507            [logging]
508            level = "debug"
509            dir = "./greptimedb_data/test/logs"
510        "#;
511        write!(file, "{}", toml_str).unwrap();
512
513        let env_prefix = "DATANODE_UT";
514        temp_env::with_vars(
515            [
516                (
517                    // wal.purge_interval = 1m
518                    [
519                        env_prefix.to_string(),
520                        "wal".to_uppercase(),
521                        "purge_interval".to_uppercase(),
522                    ]
523                    .join(ENV_VAR_SEP),
524                    Some("1m"),
525                ),
526                (
527                    // wal.read_batch_size = 100
528                    [
529                        env_prefix.to_string(),
530                        "wal".to_uppercase(),
531                        "read_batch_size".to_uppercase(),
532                    ]
533                    .join(ENV_VAR_SEP),
534                    Some("100"),
535                ),
536                (
537                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
538                    [
539                        env_prefix.to_string(),
540                        "meta_client".to_uppercase(),
541                        "metasrv_addrs".to_uppercase(),
542                    ]
543                    .join(ENV_VAR_SEP),
544                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
545                ),
546            ],
547            || {
548                let command = StartCommand {
549                    config_file: Some(file.path().to_str().unwrap().to_string()),
550                    wal_dir: Some("/other/wal/dir".to_string()),
551                    env_prefix: env_prefix.to_string(),
552                    ..Default::default()
553                };
554
555                let opts = command.load_options(&Default::default()).unwrap().component;
556
557                // Should be read from env, env > default values.
558                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
559                    unreachable!()
560                };
561                assert_eq!(raft_engine_config.read_batch_size, 100);
562                assert_eq!(
563                    opts.meta_client.unwrap().metasrv_addrs,
564                    vec![
565                        "127.0.0.1:3001".to_string(),
566                        "127.0.0.1:3002".to_string(),
567                        "127.0.0.1:3003".to_string()
568                    ]
569                );
570
571                // Should be read from config file, config file > env > default values.
572                assert_eq!(
573                    raft_engine_config.purge_interval,
574                    Duration::from_secs(60 * 5)
575                );
576
577                // Should be read from cli, cli > config file > env > default values.
578                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
579
580                // Should be default value.
581                assert_eq!(
582                    opts.http.addr,
583                    DatanodeOptions::default().component.http.addr
584                );
585                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
586            },
587        );
588    }
589}