cmd/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod builder;
16
17use std::path::Path;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_config::Configurable;
23use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
24use common_telemetry::{info, warn};
25use common_wal::config::DatanodeWalConfig;
26use datanode::datanode::Datanode;
27use meta_client::MetaClientOptions;
28use snafu::{ResultExt, ensure};
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::App;
32use crate::datanode::builder::InstanceBuilder;
33use crate::error::{
34    LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
35};
36use crate::options::{GlobalOptions, GreptimeOptions};
37
38pub const APP_NAME: &str = "greptime-datanode";
39
40type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
41
42pub struct Instance {
43    datanode: Datanode,
44
45    // Keep the logging guard to prevent the worker from being dropped.
46    _guard: Vec<WorkerGuard>,
47}
48
49impl Instance {
50    pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
51        Self {
52            datanode,
53            _guard: guard,
54        }
55    }
56
57    pub fn datanode(&self) -> &Datanode {
58        &self.datanode
59    }
60
61    /// allow customizing datanode for downstream projects
62    pub fn datanode_mut(&mut self) -> &mut Datanode {
63        &mut self.datanode
64    }
65}
66
67#[async_trait]
68impl App for Instance {
69    fn name(&self) -> &str {
70        APP_NAME
71    }
72
73    async fn start(&mut self) -> Result<()> {
74        plugins::start_datanode_plugins(self.datanode.plugins())
75            .await
76            .context(StartDatanodeSnafu)?;
77
78        self.datanode.start().await.context(StartDatanodeSnafu)
79    }
80
81    async fn stop(&mut self) -> Result<()> {
82        self.datanode
83            .shutdown()
84            .await
85            .context(ShutdownDatanodeSnafu)
86    }
87}
88
89#[derive(Parser)]
90pub struct Command {
91    #[clap(subcommand)]
92    subcmd: SubCommand,
93}
94
95impl Command {
96    pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
97        self.subcmd.build_with(builder).await
98    }
99
100    pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
101        match &self.subcmd {
102            SubCommand::Start(cmd) => cmd.load_options(global_options),
103        }
104    }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109    Start(StartCommand),
110}
111
112impl SubCommand {
113    async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
114        match self {
115            SubCommand::Start(cmd) => {
116                info!("Building datanode with {:#?}", cmd);
117                builder.build().await
118            }
119        }
120    }
121}
122
123#[derive(Debug, Parser, Default)]
124struct StartCommand {
125    #[clap(long)]
126    node_id: Option<u64>,
127    /// The address to bind the gRPC server.
128    #[clap(long, alias = "rpc-addr")]
129    rpc_bind_addr: Option<String>,
130    /// The address advertised to the metasrv, and used for connections from outside the host.
131    /// If left empty or unset, the server will automatically use the IP address of the first network interface
132    /// on the host, with the same port number as the one specified in `rpc_bind_addr`.
133    #[clap(long, alias = "rpc-hostname")]
134    rpc_server_addr: Option<String>,
135    #[clap(long, value_delimiter = ',', num_args = 1..)]
136    metasrv_addrs: Option<Vec<String>>,
137    #[clap(short, long)]
138    config_file: Option<String>,
139    #[clap(long)]
140    data_home: Option<String>,
141    #[clap(long)]
142    wal_dir: Option<String>,
143    #[clap(long)]
144    http_addr: Option<String>,
145    #[clap(long)]
146    http_timeout: Option<u64>,
147    #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
148    env_prefix: String,
149}
150
151impl StartCommand {
152    fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
153        let mut opts = DatanodeOptions::load_layered_options(
154            self.config_file.as_deref(),
155            self.env_prefix.as_ref(),
156        )
157        .context(LoadLayeredConfigSnafu)?;
158
159        self.merge_with_cli_options(global_options, &mut opts)?;
160        opts.component.sanitize();
161
162        Ok(opts)
163    }
164
165    // The precedence order is: cli > config file > environment variables > default values.
166    #[allow(deprecated)]
167    fn merge_with_cli_options(
168        &self,
169        global_options: &GlobalOptions,
170        opts: &mut DatanodeOptions,
171    ) -> Result<()> {
172        let opts = &mut opts.component;
173
174        if let Some(dir) = &global_options.log_dir {
175            opts.logging.dir.clone_from(dir);
176        }
177
178        if global_options.log_level.is_some() {
179            opts.logging.level.clone_from(&global_options.log_level);
180        }
181
182        opts.tracing = TracingOptions {
183            #[cfg(feature = "tokio-console")]
184            tokio_console_addr: global_options.tokio_console_addr.clone(),
185        };
186
187        if let Some(addr) = &self.rpc_bind_addr {
188            opts.grpc.bind_addr.clone_from(addr);
189        } else if let Some(addr) = &opts.rpc_addr {
190            warn!(
191                "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
192            );
193            opts.grpc.bind_addr.clone_from(addr);
194        }
195
196        if let Some(server_addr) = &self.rpc_server_addr {
197            opts.grpc.server_addr.clone_from(server_addr);
198        } else if let Some(server_addr) = &opts.rpc_hostname {
199            warn!(
200                "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
201            );
202            opts.grpc.server_addr.clone_from(server_addr);
203        }
204
205        if let Some(runtime_size) = opts.rpc_runtime_size {
206            warn!(
207                "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
208            );
209            opts.grpc.runtime_size = runtime_size;
210        }
211
212        if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
213            warn!(
214                "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
215            );
216            opts.grpc.max_recv_message_size = max_recv_message_size;
217        }
218
219        if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
220            warn!(
221                "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
222            );
223            opts.grpc.max_send_message_size = max_send_message_size;
224        }
225
226        if let Some(node_id) = self.node_id {
227            opts.node_id = Some(node_id);
228        }
229
230        if let Some(metasrv_addrs) = &self.metasrv_addrs {
231            opts.meta_client
232                .get_or_insert_with(MetaClientOptions::default)
233                .metasrv_addrs
234                .clone_from(metasrv_addrs);
235        }
236
237        ensure!(
238            opts.node_id.is_some(),
239            MissingConfigSnafu {
240                msg: "Missing node id option"
241            }
242        );
243
244        if let Some(data_home) = &self.data_home {
245            opts.storage.data_home.clone_from(data_home);
246        }
247
248        // `wal_dir` only affects raft-engine config.
249        if let Some(wal_dir) = &self.wal_dir
250            && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
251        {
252            if raft_engine_config
253                .dir
254                .as_ref()
255                .is_some_and(|original_dir| original_dir != wal_dir)
256            {
257                info!("The wal dir of raft-engine is altered to {wal_dir}");
258            }
259            raft_engine_config.dir.replace(wal_dir.clone());
260        }
261
262        // If the logging dir is not set, use the default logs dir in the data home.
263        if opts.logging.dir.is_empty() {
264            opts.logging.dir = Path::new(&opts.storage.data_home)
265                .join(DEFAULT_LOGGING_DIR)
266                .to_string_lossy()
267                .to_string();
268        }
269
270        if let Some(http_addr) = &self.http_addr {
271            opts.http.addr.clone_from(http_addr);
272        }
273
274        if let Some(http_timeout) = self.http_timeout {
275            opts.http.timeout = Duration::from_secs(http_timeout)
276        }
277
278        // Disable dashboard in datanode.
279        opts.http.disable_dashboard = true;
280
281        Ok(())
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use std::assert_matches::assert_matches;
288    use std::io::Write;
289    use std::time::Duration;
290
291    use common_config::ENV_VAR_SEP;
292    use common_test_util::temp_dir::create_named_temp_file;
293    use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
294    use servers::heartbeat_options::HeartbeatOptions;
295
296    use super::*;
297    use crate::options::GlobalOptions;
298
299    #[test]
300    fn test_deprecated_cli_options() {
301        common_telemetry::init_default_ut_logging();
302        let mut file = create_named_temp_file();
303        let toml_str = r#"
304            enable_memory_catalog = false
305            node_id = 42
306
307            rpc_addr = "127.0.0.1:4001"
308            rpc_hostname = "192.168.0.1"
309            [grpc]
310            bind_addr = "127.0.0.1:3001"
311            server_addr = "127.0.0.1"
312            runtime_size = 8
313        "#;
314        write!(file, "{}", toml_str).unwrap();
315
316        let cmd = StartCommand {
317            config_file: Some(file.path().to_str().unwrap().to_string()),
318            ..Default::default()
319        };
320
321        let options = cmd.load_options(&Default::default()).unwrap().component;
322        assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
323        assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
324    }
325
326    #[test]
327    fn test_read_from_config_file() {
328        let mut file = create_named_temp_file();
329        let toml_str = r#"
330            enable_memory_catalog = false
331            node_id = 42
332
333            [grpc]
334            addr = "127.0.0.1:3001"
335            hostname = "127.0.0.1"
336            runtime_size = 8
337
338            [heartbeat]
339            interval = "300ms"
340
341            [meta_client]
342            metasrv_addrs = ["127.0.0.1:3002"]
343            timeout = "3s"
344            connect_timeout = "5s"
345            ddl_timeout = "10s"
346            tcp_nodelay = true
347
348            [wal]
349            provider = "raft_engine"
350            dir = "/other/wal"
351            file_size = "1GB"
352            purge_threshold = "50GB"
353            purge_interval = "10m"
354            read_batch_size = 128
355            sync_write = false
356
357            [storage]
358            data_home = "./greptimedb_data/"
359            type = "File"
360
361            [[storage.providers]]
362            type = "Gcs"
363            bucket = "foo"
364            endpoint = "bar"
365
366            [[storage.providers]]
367            type = "S3"
368            bucket = "foo"
369
370            [logging]
371            level = "debug"
372            dir = "./greptimedb_data/test/logs"
373        "#;
374        write!(file, "{}", toml_str).unwrap();
375
376        let cmd = StartCommand {
377            config_file: Some(file.path().to_str().unwrap().to_string()),
378            ..Default::default()
379        };
380
381        let options = cmd.load_options(&Default::default()).unwrap().component;
382
383        assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
384        assert_eq!(Some(42), options.node_id);
385
386        let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
387            unreachable!()
388        };
389        assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
390        assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
391        assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
392        assert_eq!(
393            1024 * 1024 * 1024 * 50,
394            raft_engine_config.purge_threshold.0
395        );
396        assert!(!raft_engine_config.sync_write);
397
398        let HeartbeatOptions {
399            interval: heart_beat_interval,
400            ..
401        } = options.heartbeat;
402
403        assert_eq!(300, heart_beat_interval.as_millis());
404
405        let MetaClientOptions {
406            metasrv_addrs: metasrv_addr,
407            timeout,
408            connect_timeout,
409            ddl_timeout,
410            tcp_nodelay,
411            ..
412        } = options.meta_client.unwrap();
413
414        assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
415        assert_eq!(5000, connect_timeout.as_millis());
416        assert_eq!(10000, ddl_timeout.as_millis());
417        assert_eq!(3000, timeout.as_millis());
418        assert!(tcp_nodelay);
419        assert_eq!("./greptimedb_data/", options.storage.data_home);
420        assert!(matches!(
421            &options.storage.store,
422            ObjectStoreConfig::File(FileConfig { .. })
423        ));
424        assert_eq!(options.storage.providers.len(), 2);
425        assert!(matches!(
426            options.storage.providers[0],
427            ObjectStoreConfig::Gcs(GcsConfig { .. })
428        ));
429        assert!(matches!(
430            options.storage.providers[1],
431            ObjectStoreConfig::S3(S3Config { .. })
432        ));
433
434        assert_eq!("debug", options.logging.level.unwrap());
435        assert_eq!(
436            "./greptimedb_data/test/logs".to_string(),
437            options.logging.dir
438        );
439    }
440
441    #[test]
442    fn test_try_from_cmd() {
443        assert!(
444            (StartCommand {
445                metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
446                ..Default::default()
447            })
448            .load_options(&GlobalOptions::default())
449            .is_err()
450        );
451
452        // Providing node_id but leave metasrv_addr absent is ok since metasrv_addr has default value
453        assert!(
454            (StartCommand {
455                node_id: Some(42),
456                ..Default::default()
457            })
458            .load_options(&GlobalOptions::default())
459            .is_ok()
460        );
461    }
462
463    #[test]
464    fn test_load_log_options_from_cli() {
465        let mut cmd = StartCommand::default();
466
467        let result = cmd.load_options(&GlobalOptions {
468            log_dir: Some("./greptimedb_data/test/logs".to_string()),
469            log_level: Some("debug".to_string()),
470
471            #[cfg(feature = "tokio-console")]
472            tokio_console_addr: None,
473        });
474        // Missing node_id.
475        assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
476
477        cmd.node_id = Some(42);
478
479        let options = cmd
480            .load_options(&GlobalOptions {
481                log_dir: Some("./greptimedb_data/test/logs".to_string()),
482                log_level: Some("debug".to_string()),
483
484                #[cfg(feature = "tokio-console")]
485                tokio_console_addr: None,
486            })
487            .unwrap()
488            .component;
489
490        let logging_opt = options.logging;
491        assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
492        assert_eq!("debug", logging_opt.level.as_ref().unwrap());
493    }
494
495    #[test]
496    fn test_config_precedence_order() {
497        let mut file = create_named_temp_file();
498        let toml_str = r#"
499            enable_memory_catalog = false
500            node_id = 42
501            rpc_addr = "127.0.0.1:3001"
502            rpc_runtime_size = 8
503            rpc_hostname = "10.103.174.219"
504
505            [meta_client]
506            timeout = "3s"
507            connect_timeout = "5s"
508            tcp_nodelay = true
509
510            [wal]
511            provider = "raft_engine"
512            file_size = "1GB"
513            purge_threshold = "50GB"
514            purge_interval = "5m"
515            sync_write = false
516
517            [storage]
518            type = "File"
519            data_home = "./greptimedb_data/"
520
521            [logging]
522            level = "debug"
523            dir = "./greptimedb_data/test/logs"
524        "#;
525        write!(file, "{}", toml_str).unwrap();
526
527        let env_prefix = "DATANODE_UT";
528        temp_env::with_vars(
529            [
530                (
531                    // wal.purge_interval = 1m
532                    [
533                        env_prefix.to_string(),
534                        "wal".to_uppercase(),
535                        "purge_interval".to_uppercase(),
536                    ]
537                    .join(ENV_VAR_SEP),
538                    Some("1m"),
539                ),
540                (
541                    // wal.read_batch_size = 100
542                    [
543                        env_prefix.to_string(),
544                        "wal".to_uppercase(),
545                        "read_batch_size".to_uppercase(),
546                    ]
547                    .join(ENV_VAR_SEP),
548                    Some("100"),
549                ),
550                (
551                    // meta_client.metasrv_addrs = 127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003
552                    [
553                        env_prefix.to_string(),
554                        "meta_client".to_uppercase(),
555                        "metasrv_addrs".to_uppercase(),
556                    ]
557                    .join(ENV_VAR_SEP),
558                    Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
559                ),
560            ],
561            || {
562                let command = StartCommand {
563                    config_file: Some(file.path().to_str().unwrap().to_string()),
564                    wal_dir: Some("/other/wal/dir".to_string()),
565                    env_prefix: env_prefix.to_string(),
566                    ..Default::default()
567                };
568
569                let opts = command.load_options(&Default::default()).unwrap().component;
570
571                // Should be read from env, env > default values.
572                let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
573                    unreachable!()
574                };
575                assert_eq!(raft_engine_config.read_batch_size, 100);
576                assert_eq!(
577                    opts.meta_client.unwrap().metasrv_addrs,
578                    vec![
579                        "127.0.0.1:3001".to_string(),
580                        "127.0.0.1:3002".to_string(),
581                        "127.0.0.1:3003".to_string()
582                    ]
583                );
584
585                // Should be read from config file, config file > env > default values.
586                assert_eq!(
587                    raft_engine_config.purge_interval,
588                    Duration::from_secs(60 * 5)
589                );
590
591                // Should be read from cli, cli > config file > env > default values.
592                assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
593
594                // Should be default value.
595                assert_eq!(
596                    opts.http.addr,
597                    DatanodeOptions::default().component.http.addr
598                );
599                assert_eq!(opts.grpc.server_addr, "10.103.174.219");
600            },
601        );
602    }
603}