cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_config::Configurable;
22use common_telemetry::logging::TracingOptions;
23use common_telemetry::{info, warn};
24use common_wal::config::DatanodeWalConfig;
25use datanode::datanode::Datanode;
26use meta_client::MetaClientOptions;
27use snafu::{ensure, ResultExt};
28use tracing_appender::non_blocking::WorkerGuard;
29
30use crate::datanode::builder::InstanceBuilder;
31use crate::error::{
32    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
33};
34use crate::options::{GlobalOptions, GreptimeOptions};
35use crate::App;
36
37pub const APP_NAME: &str = "greptime-datanode";
38
39type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
40
41pub struct Instance {
42    datanode: Datanode,
43
44    // Keep the logging guard to prevent the worker from being dropped.
45    _guard: Vec<WorkerGuard>,
46}
47
48impl Instance {
49    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
50        Self {
51            datanode,
52            _guard: guard,
53        }
54    }
55
56    pub fn datanode(&self) -> &Datanode {
57        &self.datanode
58    }
59
60    /// allow customizing datanode for downstream projects
61    pub fn datanode_mut(&mut self) -> &mut Datanode {
62        &mut self.datanode
63    }
64}
65
66#[async_trait]
67impl App for Instance {
68    fn name(&self) -> &str {
69        APP_NAME
70    }
71
72    async fn start(&mut self) -> Result<()> {
73        plugins::start_datanode_plugins(self.datanode.plugins())
74            .await
75            .context(StartDatanodeSnafu)?;
76
77        self.datanode.start().await.context(StartDatanodeSnafu)
78    }
79
80    async fn stop(&mut self) -> Result<()> {
81        self.datanode
82            .shutdown()
83            .await
84            .context(ShutdownDatanodeSnafu)
85    }
86}
87
88#[derive(Parser)]
89pub struct Command {
90    #[clap(subcommand)]
91    subcmd: SubCommand,
92}
93
94impl Command {
95    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
96        self.subcmd.build_with(builder).await
97    }
98
99    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
100        match &self.subcmd {
101            SubCommand::Start(cmd) => cmd.load_options(global_options),
102        }
103    }
104}
105
106#[derive(Parser)]
107enum SubCommand {
108    Start(StartCommand),
109}
110
111impl SubCommand {
112    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
113        match self {
114            SubCommand::Start(cmd) => {
115                info!("Building datanode with {:#?}", cmd);
116                builder.build().await
117            }
118        }
119    }
120}
121
122#[derive(Debug, Parser, Default)]
123struct StartCommand {
124    #[clap(long)]
125    node_id: Option<u64>,
126    /// The address to bind the gRPC server.
127    #[clap(long, alias = "rpc-addr")]
128    rpc_bind_addr: Option<String>,
129    /// The address advertised to the metasrv, and used for connections from outside the host.
130    /// If left empty or unset, the server will automatically use the IP address of the first network interface
131    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
132    #[clap(long, alias = "rpc-hostname")]
133    rpc_server_addr: Option<String>,
134    #[clap(long, value_delimiter = ',', num_args = 1..)]
135    metasrv_addrs: Option<Vec<String>>,
136    #[clap(short, long)]
137    config_file: Option<String>,
138    #[clap(long)]
139    data_home: Option<String>,
140    #[clap(long)]
141    wal_dir: Option<String>,
142    #[clap(long)]
143    http_addr: Option<String>,
144    #[clap(long)]
145    http_timeout: Option<u64>,
146    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
147    env_prefix: String,
148}
149
150impl StartCommand {
151    fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
152        let mut opts = DatanodeOptions::load_layered_options(
153            self.config_file.as_deref(),
154            self.env_prefix.as_ref(),
155        )
156        .context(LoadLayeredConfigSnafu)?;
157
158        self.merge_with_cli_options(global_options, &mut opts)?;
159        opts.component.sanitize();
160
161        Ok(opts)
162    }
163
164    // The precedence order is: cli > config file > environment variables > default values.
165    #[allow(deprecated)]
166    fn merge_with_cli_options(
167        &self,
168        global_options: &GlobalOptions,
169        opts: &mut DatanodeOptions,
170    ) -> Result<()> {
171        let opts = &mut opts.component;
172
173        if let Some(dir) = &global_options.log_dir {
174            opts.logging.dir.clone_from(dir);
175        }
176
177        if global_options.log_level.is_some() {
178            opts.logging.level.clone_from(&global_options.log_level);
179        }
180
181        opts.tracing = TracingOptions {
182            #[cfg(feature = "tokio-console")]
183            tokio_console_addr: global_options.tokio_console_addr.clone(),
184        };
185
186        if let Some(addr) = &self.rpc_bind_addr {
187            opts.grpc.bind_addr.clone_from(addr);
188        } else if let Some(addr) = &opts.rpc_addr {
189            warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
190            opts.grpc.bind_addr.clone_from(addr);
191        }
192
193        if let Some(server_addr) = &self.rpc_server_addr {
194            opts.grpc.server_addr.clone_from(server_addr);
195        } else if let Some(server_addr) = &opts.rpc_hostname {
196            warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
197            opts.grpc.server_addr.clone_from(server_addr);
198        }
199
200        if let Some(runtime_size) = opts.rpc_runtime_size {
201            warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
202            opts.grpc.runtime_size = runtime_size;
203        }
204
205        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
206            warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
207            opts.grpc.max_recv_message_size = max_recv_message_size;
208        }
209
210        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
211            warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
212            opts.grpc.max_send_message_size = max_send_message_size;
213        }
214
215        if let Some(node_id) = self.node_id {
216            opts.node_id = Some(node_id);
217        }
218
219        if let Some(metasrv_addrs) = &self.metasrv_addrs {
220            opts.meta_client
221                .get_or_insert_with(MetaClientOptions::default)
222                .metasrv_addrs
223                .clone_from(metasrv_addrs);
224        }
225
226        ensure!(
227            opts.node_id.is_some(),
228            MissingConfigSnafu {
229                msg: "Missing node id option"
230            }
231        );
232
233        if let Some(data_home) = &self.data_home {
234            opts.storage.data_home.clone_from(data_home);
235        }
236
237        // `wal_dir` only affects raft-engine config.
238        if let Some(wal_dir) = &self.wal_dir
239            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
240        {
241            if raft_engine_config
242                .dir
243                .as_ref()
244                .is_some_and(|original_dir| original_dir != wal_dir)
245            {
246                info!("The wal dir of raft-engine is altered to {wal_dir}");
247            }
248            raft_engine_config.dir.replace(wal_dir.clone());
249        }
250
251        if let Some(http_addr) = &self.http_addr {
252            opts.http.addr.clone_from(http_addr);
253        }
254
255        if let Some(http_timeout) = self.http_timeout {
256            opts.http.timeout = Duration::from_secs(http_timeout)
257        }
258
259        // Disable dashboard in datanode.
260        opts.http.disable_dashboard = true;
261
262        Ok(())
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use std::assert_matches::assert_matches;
269    use std::io::Write;
270    use std::time::Duration;
271
272    use common_config::ENV_VAR_SEP;
273    use common_test_util::temp_dir::create_named_temp_file;
274    use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
275    use servers::heartbeat_options::HeartbeatOptions;
276
277    use super::*;
278    use crate::options::GlobalOptions;
279
280    #[test]
281    fn test_deprecated_cli_options() {
282        common_telemetry::init_default_ut_logging();
283        let mut file = create_named_temp_file();
284        let toml_str = r#"
285            enable_memory_catalog = false
286            node_id = 42
287
288            rpc_addr = "127.0.0.1:4001"
289            rpc_hostname = "192.168.0.1"
290            [grpc]
291            bind_addr = "127.0.0.1:3001"
292            server_addr = "127.0.0.1"
293            runtime_size = 8
294        "#;
295        write!(file, "{}", toml_str).unwrap();
296
297        let cmd = StartCommand {
298            config_file: Some(file.path().to_str().unwrap().to_string()),
299            ..Default::default()
300        };
301
302        let options = cmd.load_options(&Default::default()).unwrap().component;
303        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
304        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
305    }
306
307    #[test]
308    fn test_read_from_config_file() {
309        let mut file = create_named_temp_file();
310        let toml_str = r#"
311            enable_memory_catalog = false
312            node_id = 42
313
314            [grpc]
315            addr = "127.0.0.1:3001"
316            hostname = "127.0.0.1"
317            runtime_size = 8
318
319            [heartbeat]
320            interval = "300ms"
321
322            [meta_client]
323            metasrv_addrs = ["127.0.0.1:3002"]
324            timeout = "3s"
325            connect_timeout = "5s"
326            ddl_timeout = "10s"
327            tcp_nodelay = true
328
329            [wal]
330            provider = "raft_engine"
331            dir = "/other/wal"
332            file_size = "1GB"
333            purge_threshold = "50GB"
334            purge_interval = "10m"
335            read_batch_size = 128
336            sync_write = false
337
338            [storage]
339            data_home = "./greptimedb_data/"
340            type = "File"
341
342            [[storage.providers]]
343            type = "Gcs"
344            bucket = "foo"
345            endpoint = "bar"
346
347            [[storage.providers]]
348            type = "S3"
349            bucket = "foo"
350
351            [logging]
352            level = "debug"
353            dir = "./greptimedb_data/test/logs"
354        "#;
355        write!(file, "{}", toml_str).unwrap();
356
357        let cmd = StartCommand {
358            config_file: Some(file.path().to_str().unwrap().to_string()),
359            ..Default::default()
360        };
361
362        let options = cmd.load_options(&Default::default()).unwrap().component;
363
364        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
365        assert_eq!(Some(42), options.node_id);
366
367        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
368            unreachable!()
369        };
370        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
371        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
372        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
373        assert_eq!(
374            1024 * 1024 * 1024 * 50,
375            raft_engine_config.purge_threshold.0
376        );
377        assert!(!raft_engine_config.sync_write);
378
379        let HeartbeatOptions {
380            interval: heart_beat_interval,
381            ..
382        } = options.heartbeat;
383
384        assert_eq!(300, heart_beat_interval.as_millis());
385
386        let MetaClientOptions {
387            metasrv_addrs: metasrv_addr,
388            timeout,
389            connect_timeout,
390            ddl_timeout,
391            tcp_nodelay,
392            ..
393        } = options.meta_client.unwrap();
394
395        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
396        assert_eq!(5000, connect_timeout.as_millis());
397        assert_eq!(10000, ddl_timeout.as_millis());
398        assert_eq!(3000, timeout.as_millis());
399        assert!(tcp_nodelay);
400        assert_eq!("./greptimedb_data/", options.storage.data_home);
401        assert!(matches!(
402            &options.storage.store,
403            ObjectStoreConfig::File(FileConfig { .. })
404        ));
405        assert_eq!(options.storage.providers.len(), 2);
406        assert!(matches!(
407            options.storage.providers[0],
408            ObjectStoreConfig::Gcs(GcsConfig { .. })
409        ));
410        assert!(matches!(
411            options.storage.providers[1],
412            ObjectStoreConfig::S3(S3Config { .. })
413        ));
414
415        assert_eq!("debug", options.logging.level.unwrap());
416        assert_eq!(
417            "./greptimedb_data/test/logs".to_string(),
418            options.logging.dir
419        );
420    }
421
422    #[test]
423    fn test_try_from_cmd() {
424        assert!((StartCommand {
425            metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
426            ..Default::default()
427        })
428        .load_options(&GlobalOptions::default())
429        .is_err());
430
431        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
432        assert!((StartCommand {
433            node_id: Some(42),
434            ..Default::default()
435        })
436        .load_options(&GlobalOptions::default())
437        .is_ok());
438    }
439
440    #[test]
441    fn test_load_log_options_from_cli() {
442        let mut cmd = StartCommand::default();
443
444        let result = cmd.load_options(&GlobalOptions {
445            log_dir: Some("./greptimedb_data/test/logs".to_string()),
446            log_level: Some("debug".to_string()),
447
448            #[cfg(feature = "tokio-console")]
449            tokio_console_addr: None,
450        });
451        // Missing node_id.
452        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
453
454        cmd.node_id = Some(42);
455
456        let options = cmd
457            .load_options(&GlobalOptions {
458                log_dir: Some("./greptimedb_data/test/logs".to_string()),
459                log_level: Some("debug".to_string()),
460
461                #[cfg(feature = "tokio-console")]
462                tokio_console_addr: None,
463            })
464            .unwrap()
465            .component;
466
467        let logging_opt = options.logging;
468        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
469        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
470    }
471
472    #[test]
473    fn test_config_precedence_order() {
474        let mut file = create_named_temp_file();
475        let toml_str = r#"
476            enable_memory_catalog = false
477            node_id = 42
478            rpc_addr = "127.0.0.1:3001"
479            rpc_runtime_size = 8
480            rpc_hostname = "10.103.174.219"
481
482            [meta_client]
483            timeout = "3s"
484            connect_timeout = "5s"
485            tcp_nodelay = true
486
487            [wal]
488            provider = "raft_engine"
489            file_size = "1GB"
490            purge_threshold = "50GB"
491            purge_interval = "5m"
492            sync_write = false
493
494            [storage]
495            type = "File"
496            data_home = "./greptimedb_data/"
497
498            [logging]
499            level = "debug"
500            dir = "./greptimedb_data/test/logs"
501        "#;
502        write!(file, "{}", toml_str).unwrap();
503
504        let env_prefix = "DATANODE_UT";
505        temp_env::with_vars(
506            [
507                (
508                    // wal.purge_interval = 1m
509                    [
510                        env_prefix.to_string(),
511                        "wal".to_uppercase(),
512                        "purge_interval".to_uppercase(),
513                    ]
514                    .join(ENV_VAR_SEP),
515                    Some("1m"),
516                ),
517                (
518                    // wal.read_batch_size = 100
519                    [
520                        env_prefix.to_string(),
521                        "wal".to_uppercase(),
522                        "read_batch_size".to_uppercase(),
523                    ]
524                    .join(ENV_VAR_SEP),
525                    Some("100"),
526                ),
527                (
528                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
529                    [
530                        env_prefix.to_string(),
531                        "meta_client".to_uppercase(),
532                        "metasrv_addrs".to_uppercase(),
533                    ]
534                    .join(ENV_VAR_SEP),
535                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
536                ),
537            ],
538            || {
539                let command = StartCommand {
540                    config_file: Some(file.path().to_str().unwrap().to_string()),
541                    wal_dir: Some("/other/wal/dir".to_string()),
542                    env_prefix: env_prefix.to_string(),
543                    ..Default::default()
544                };
545
546                let opts = command.load_options(&Default::default()).unwrap().component;
547
548                // Should be read from env, env > default values.
549                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
550                    unreachable!()
551                };
552                assert_eq!(raft_engine_config.read_batch_size, 100);
553                assert_eq!(
554                    opts.meta_client.unwrap().metasrv_addrs,
555                    vec![
556                        "127.0.0.1:3001".to_string(),
557                        "127.0.0.1:3002".to_string(),
558                        "127.0.0.1:3003".to_string()
559                    ]
560                );
561
562                // Should be read from config file, config file > env > default values.
563                assert_eq!(
564                    raft_engine_config.purge_interval,
565                    Duration::from_secs(60 * 5)
566                );
567
568                // Should be read from cli, cli > config file > env > default values.
569                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
570
571                // Should be default value.
572                assert_eq!(
573                    opts.http.addr,
574                    DatanodeOptions::default().component.http.addr
575                );
576                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
577            },
578        );
579    }
580}