1pub mod builder;
16#[allow(clippy::print_stdout)]
17pub(crate) mod objbench;
18#[cfg(feature = "dev-tools")]
19#[allow(clippy::print_stdout)]
20mod parquetbench;
21#[allow(clippy::print_stdout)]
22mod scanbench;
23
24use std::path::Path;
25use std::time::Duration;
26
27use async_trait::async_trait;
28use clap::Parser;
29use common_config::Configurable;
30use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
31use common_telemetry::{info, warn};
32use common_wal::config::DatanodeWalConfig;
33use datanode::config::RegionEngineConfig;
34use datanode::datanode::Datanode;
35use meta_client::MetaClientOptions;
36use serde::{Deserialize, Serialize};
37use snafu::{ResultExt, ensure};
38use tracing_appender::non_blocking::WorkerGuard;
39
40use crate::App;
41use crate::datanode::builder::InstanceBuilder;
42use crate::datanode::objbench::ObjbenchCommand;
43#[cfg(feature = "dev-tools")]
44use crate::datanode::parquetbench::ParquetbenchCommand;
45use crate::datanode::scanbench::ScanbenchCommand;
46use crate::error::{
47 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
48};
49use crate::options::{GlobalOptions, GreptimeOptions};
50
51pub const APP_NAME: &str = "greptime-datanode";
52
53type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
54
55pub struct Instance {
56 datanode: Datanode,
57
58 _guard: Vec<WorkerGuard>,
60}
61
62impl Instance {
63 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
64 Self {
65 datanode,
66 _guard: guard,
67 }
68 }
69
70 pub fn datanode(&self) -> &Datanode {
71 &self.datanode
72 }
73
74 pub fn datanode_mut(&mut self) -> &mut Datanode {
76 &mut self.datanode
77 }
78}
79
80#[async_trait]
81impl App for Instance {
82 fn name(&self) -> &str {
83 APP_NAME
84 }
85
86 async fn start(&mut self) -> Result<()> {
87 plugins::start_datanode_plugins(&self.datanode)
88 .await
89 .context(StartDatanodeSnafu)?;
90
91 self.datanode.start().await.context(StartDatanodeSnafu)
92 }
93
94 async fn stop(&mut self) -> Result<()> {
95 self.datanode
96 .shutdown()
97 .await
98 .context(ShutdownDatanodeSnafu)
99 }
100}
101
102#[derive(Parser)]
103pub struct Command {
104 #[clap(subcommand)]
105 pub subcmd: SubCommand,
106}
107
108impl Command {
109 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
110 self.subcmd.build_with(builder).await
111 }
112
113 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
114 match &self.subcmd {
115 SubCommand::Start(cmd) => cmd.load_options(global_options),
116 SubCommand::Objbench(_) | SubCommand::Scanbench(_) => Self::default_bench_options(),
118 #[cfg(feature = "dev-tools")]
119 SubCommand::Parquetbench(_) => Self::default_bench_options(),
120 }
121 }
122
123 fn default_bench_options() -> Result<DatanodeOptions> {
126 let mut opts = datanode::config::DatanodeOptions::default();
127 opts.sanitize();
128 Ok(DatanodeOptions {
129 runtime: Default::default(),
130 plugins: Default::default(),
131 component: opts,
132 })
133 }
134}
135
136#[derive(Parser)]
137pub enum SubCommand {
138 Start(StartCommand),
139 Objbench(ObjbenchCommand),
141 Scanbench(ScanbenchCommand),
143 #[cfg(feature = "dev-tools")]
145 Parquetbench(ParquetbenchCommand),
146}
147
148impl SubCommand {
149 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
150 match self {
151 SubCommand::Start(cmd) => {
152 info!("Building datanode with {:#?}", cmd);
153 builder.build().await
154 }
155 SubCommand::Objbench(cmd) => {
156 cmd.run().await?;
157 std::process::exit(0);
158 }
159 SubCommand::Scanbench(cmd) => {
160 cmd.run().await?;
161 std::process::exit(0);
162 }
163 #[cfg(feature = "dev-tools")]
164 SubCommand::Parquetbench(cmd) => {
165 cmd.run().await?;
166 std::process::exit(0);
167 }
168 }
169 }
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
174#[serde(default)]
175pub struct StorageConfig {
176 pub data_home: String,
178 #[serde(flatten)]
179 pub store: object_store::config::ObjectStoreConfig,
180}
181
182#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
183#[serde(default)]
184struct StorageConfigWrapper {
185 storage: StorageConfig,
186 region_engine: Vec<RegionEngineConfig>,
187 #[serde(default, deserialize_with = "deserialize_wal_config")]
188 wal: DatanodeWalConfig,
189}
190
191fn deserialize_wal_config<'de, D>(
196 deserializer: D,
197) -> std::result::Result<DatanodeWalConfig, D::Error>
198where
199 D: serde::Deserializer<'de>,
200{
201 use serde::de::Error as _;
202
203 let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
204 if !table.contains_key("provider") {
205 table.insert(
206 "provider".to_string(),
207 toml::Value::String("raft_engine".to_string()),
208 );
209 }
210 DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
211}
212
213#[derive(Debug, Parser, Default)]
214pub struct StartCommand {
215 #[clap(long)]
216 node_id: Option<u64>,
217 #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
219 grpc_bind_addr: Option<String>,
220 #[clap(
224 long = "grpc-server-addr",
225 alias = "rpc-server-addr",
226 alias = "rpc-hostname"
227 )]
228 grpc_server_addr: Option<String>,
229 #[clap(long, value_delimiter = ',', num_args = 1..)]
230 metasrv_addrs: Option<Vec<String>>,
231 #[clap(short, long)]
232 config_file: Option<String>,
233 #[clap(long)]
234 data_home: Option<String>,
235 #[clap(long)]
236 wal_dir: Option<String>,
237 #[clap(long)]
238 http_addr: Option<String>,
239 #[clap(long)]
240 http_timeout: Option<u64>,
241 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
242 env_prefix: String,
243}
244
245impl StartCommand {
246 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
247 let mut opts = DatanodeOptions::load_layered_options(
248 self.config_file.as_deref(),
249 self.env_prefix.as_ref(),
250 )
251 .context(LoadLayeredConfigSnafu)?;
252
253 self.merge_with_cli_options(global_options, &mut opts)?;
254 opts.component.sanitize();
255
256 Ok(opts)
257 }
258
259 #[allow(deprecated)]
261 fn merge_with_cli_options(
262 &self,
263 global_options: &GlobalOptions,
264 opts: &mut DatanodeOptions,
265 ) -> Result<()> {
266 let opts = &mut opts.component;
267
268 if let Some(dir) = &global_options.log_dir {
269 opts.logging.dir.clone_from(dir);
270 }
271
272 if global_options.log_level.is_some() {
273 opts.logging.level.clone_from(&global_options.log_level);
274 }
275
276 opts.tracing = TracingOptions {
277 #[cfg(feature = "tokio-console")]
278 tokio_console_addr: global_options.tokio_console_addr.clone(),
279 };
280
281 if let Some(addr) = &self.grpc_bind_addr {
282 opts.grpc.bind_addr.clone_from(addr);
283 } else if let Some(addr) = &opts.rpc_addr {
284 warn!(
285 "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.bind_addr` instead."
286 );
287 opts.grpc.bind_addr.clone_from(addr);
288 }
289
290 if let Some(server_addr) = &self.grpc_server_addr {
291 opts.grpc.server_addr.clone_from(server_addr);
292 } else if let Some(server_addr) = &opts.rpc_hostname {
293 warn!(
294 "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.server_addr` instead."
295 );
296 opts.grpc.server_addr.clone_from(server_addr);
297 }
298
299 if let Some(runtime_size) = opts.rpc_runtime_size {
300 warn!(
301 "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
302 );
303 opts.grpc.runtime_size = runtime_size;
304 }
305
306 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
307 warn!(
308 "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
309 );
310 opts.grpc.max_recv_message_size = max_recv_message_size;
311 }
312
313 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
314 warn!(
315 "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
316 );
317 opts.grpc.max_send_message_size = max_send_message_size;
318 }
319
320 if let Some(node_id) = self.node_id {
321 opts.node_id = Some(node_id);
322 }
323
324 if let Some(metasrv_addrs) = &self.metasrv_addrs {
325 opts.meta_client
326 .get_or_insert_with(MetaClientOptions::default)
327 .metasrv_addrs
328 .clone_from(metasrv_addrs);
329 }
330
331 ensure!(
332 opts.node_id.is_some(),
333 MissingConfigSnafu {
334 msg: "Missing node id option"
335 }
336 );
337
338 if let Some(data_home) = &self.data_home {
339 opts.storage.data_home.clone_from(data_home);
340 }
341
342 if let Some(wal_dir) = &self.wal_dir
344 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
345 {
346 if raft_engine_config
347 .dir
348 .as_ref()
349 .is_some_and(|original_dir| original_dir != wal_dir)
350 {
351 info!("The wal dir of raft-engine is altered to {wal_dir}");
352 }
353 raft_engine_config.dir.replace(wal_dir.clone());
354 }
355
356 if opts.logging.dir.is_empty() {
358 opts.logging.dir = Path::new(&opts.storage.data_home)
359 .join(DEFAULT_LOGGING_DIR)
360 .to_string_lossy()
361 .to_string();
362 }
363
364 if let Some(http_addr) = &self.http_addr {
365 opts.http.addr.clone_from(http_addr);
366 }
367
368 if let Some(http_timeout) = self.http_timeout {
369 opts.http.timeout = Duration::from_secs(http_timeout)
370 }
371
372 opts.http.disable_dashboard = true;
374
375 Ok(())
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::assert_matches;
382 use std::io::Write;
383 use std::time::Duration;
384
385 use clap::{CommandFactory, Parser};
386 use common_config::ENV_VAR_SEP;
387 use common_test_util::temp_dir::create_named_temp_file;
388 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
389
390 use super::*;
391 use crate::options::GlobalOptions;
392
393 #[test]
394 fn test_deprecated_cli_options() {
395 common_telemetry::init_default_ut_logging();
396 let mut file = create_named_temp_file();
397 let toml_str = r#"
398 enable_memory_catalog = false
399 node_id = 42
400
401 rpc_addr = "127.0.0.1:4001"
402 rpc_hostname = "192.168.0.1"
403 [grpc]
404 bind_addr = "127.0.0.1:3001"
405 server_addr = "127.0.0.1"
406 runtime_size = 8
407 "#;
408 write!(file, "{}", toml_str).unwrap();
409
410 let cmd = StartCommand {
411 config_file: Some(file.path().to_str().unwrap().to_string()),
412 ..Default::default()
413 };
414
415 let options = cmd.load_options(&Default::default()).unwrap().component;
416 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
417 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
418 }
419
420 #[test]
421 fn test_read_from_config_file() {
422 let mut file = create_named_temp_file();
423 let toml_str = r#"
424 enable_memory_catalog = false
425 node_id = 42
426
427 [grpc]
428 bind_addr = "127.0.0.1:3001"
429 server_addr = "127.0.0.1"
430 runtime_size = 8
431
432 [meta_client]
433 metasrv_addrs = ["127.0.0.1:3002"]
434 timeout = "3s"
435 connect_timeout = "5s"
436 ddl_timeout = "10s"
437 tcp_nodelay = true
438
439 [wal]
440 provider = "raft_engine"
441 dir = "/other/wal"
442 file_size = "1GB"
443 purge_threshold = "50GB"
444 purge_interval = "10m"
445 read_batch_size = 128
446 sync_write = false
447
448 [storage]
449 data_home = "./greptimedb_data/"
450 type = "File"
451
452 [[storage.providers]]
453 type = "Gcs"
454 bucket = "foo"
455 endpoint = "bar"
456
457 [[storage.providers]]
458 type = "S3"
459 bucket = "foo"
460
461 [logging]
462 level = "debug"
463 dir = "./greptimedb_data/test/logs"
464 "#;
465 write!(file, "{}", toml_str).unwrap();
466
467 let cmd = StartCommand {
468 config_file: Some(file.path().to_str().unwrap().to_string()),
469 ..Default::default()
470 };
471
472 let options = cmd.load_options(&Default::default()).unwrap().component;
473
474 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
475 assert_eq!("127.0.0.1".to_string(), options.grpc.server_addr);
476 assert_eq!(Some(42), options.node_id);
477
478 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
479 unreachable!()
480 };
481 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
482 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
483 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
484 assert_eq!(
485 1024 * 1024 * 1024 * 50,
486 raft_engine_config.purge_threshold.0
487 );
488 assert!(!raft_engine_config.sync_write);
489
490 let MetaClientOptions {
491 metasrv_addrs: metasrv_addr,
492 timeout,
493 connect_timeout,
494 ddl_timeout,
495 tcp_nodelay,
496 ..
497 } = options.meta_client.unwrap();
498
499 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
500 assert_eq!(5000, connect_timeout.as_millis());
501 assert_eq!(10000, ddl_timeout.as_millis());
502 assert_eq!(3000, timeout.as_millis());
503 assert!(tcp_nodelay);
504 assert_eq!("./greptimedb_data/", options.storage.data_home);
505 assert!(matches!(
506 &options.storage.store,
507 ObjectStoreConfig::File(FileConfig { .. })
508 ));
509 assert_eq!(options.storage.providers.len(), 2);
510 assert!(matches!(
511 options.storage.providers[0],
512 ObjectStoreConfig::Gcs(GcsConfig { .. })
513 ));
514 assert!(matches!(
515 options.storage.providers[1],
516 ObjectStoreConfig::S3(S3Config { .. })
517 ));
518
519 assert_eq!("debug", options.logging.level.unwrap());
520 assert_eq!(
521 "./greptimedb_data/test/logs".to_string(),
522 options.logging.dir
523 );
524 }
525
526 #[test]
527 fn test_try_from_cmd() {
528 assert!(
529 (StartCommand {
530 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
531 ..Default::default()
532 })
533 .load_options(&GlobalOptions::default())
534 .is_err()
535 );
536
537 assert!(
539 (StartCommand {
540 node_id: Some(42),
541 ..Default::default()
542 })
543 .load_options(&GlobalOptions::default())
544 .is_ok()
545 );
546 }
547
548 #[test]
549 fn test_load_log_options_from_cli() {
550 let mut cmd = StartCommand::default();
551
552 let result = cmd.load_options(&GlobalOptions {
553 log_dir: Some("./greptimedb_data/test/logs".to_string()),
554 log_level: Some("debug".to_string()),
555
556 #[cfg(feature = "tokio-console")]
557 tokio_console_addr: None,
558 });
559 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
561
562 cmd.node_id = Some(42);
563
564 let options = cmd
565 .load_options(&GlobalOptions {
566 log_dir: Some("./greptimedb_data/test/logs".to_string()),
567 log_level: Some("debug".to_string()),
568
569 #[cfg(feature = "tokio-console")]
570 tokio_console_addr: None,
571 })
572 .unwrap()
573 .component;
574
575 let logging_opt = options.logging;
576 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
577 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
578 }
579
580 #[test]
581 fn test_config_precedence_order() {
582 let mut file = create_named_temp_file();
583 let toml_str = r#"
584 enable_memory_catalog = false
585 node_id = 42
586 rpc_addr = "127.0.0.1:3001"
587 rpc_runtime_size = 8
588 rpc_hostname = "10.103.174.219"
589
590 [meta_client]
591 timeout = "3s"
592 connect_timeout = "5s"
593 tcp_nodelay = true
594
595 [wal]
596 provider = "raft_engine"
597 file_size = "1GB"
598 purge_threshold = "50GB"
599 purge_interval = "5m"
600 sync_write = false
601
602 [storage]
603 type = "File"
604 data_home = "./greptimedb_data/"
605
606 [logging]
607 level = "debug"
608 dir = "./greptimedb_data/test/logs"
609 "#;
610 write!(file, "{}", toml_str).unwrap();
611
612 let env_prefix = "DATANODE_UT";
613 temp_env::with_vars(
614 [
615 (
616 [
618 env_prefix.to_string(),
619 "wal".to_uppercase(),
620 "purge_interval".to_uppercase(),
621 ]
622 .join(ENV_VAR_SEP),
623 Some("1m"),
624 ),
625 (
626 [
628 env_prefix.to_string(),
629 "wal".to_uppercase(),
630 "read_batch_size".to_uppercase(),
631 ]
632 .join(ENV_VAR_SEP),
633 Some("100"),
634 ),
635 (
636 [
638 env_prefix.to_string(),
639 "meta_client".to_uppercase(),
640 "metasrv_addrs".to_uppercase(),
641 ]
642 .join(ENV_VAR_SEP),
643 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
644 ),
645 ],
646 || {
647 let command = StartCommand {
648 config_file: Some(file.path().to_str().unwrap().to_string()),
649 wal_dir: Some("/other/wal/dir".to_string()),
650 env_prefix: env_prefix.to_string(),
651 ..Default::default()
652 };
653
654 let opts = command.load_options(&Default::default()).unwrap().component;
655
656 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
658 unreachable!()
659 };
660 assert_eq!(raft_engine_config.read_batch_size, 100);
661 assert_eq!(
662 opts.meta_client.unwrap().metasrv_addrs,
663 vec![
664 "127.0.0.1:3001".to_string(),
665 "127.0.0.1:3002".to_string(),
666 "127.0.0.1:3003".to_string()
667 ]
668 );
669
670 assert_eq!(
672 raft_engine_config.purge_interval,
673 Duration::from_secs(60 * 5)
674 );
675
676 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
678
679 assert_eq!(
681 opts.http.addr,
682 DatanodeOptions::default().component.http.addr
683 );
684 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
685 },
686 );
687 }
688
689 #[test]
690 fn test_parse_grpc_cli_aliases() {
691 let command = StartCommand::try_parse_from([
692 "datanode",
693 "--grpc-bind-addr",
694 "127.0.0.1:13001",
695 "--grpc-server-addr",
696 "10.0.0.1:13001",
697 ])
698 .unwrap();
699 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:13001"));
700 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:13001"));
701
702 let command = StartCommand::try_parse_from([
703 "datanode",
704 "--rpc-bind-addr",
705 "127.0.0.1:23001",
706 "--rpc-server-addr",
707 "10.0.0.2:23001",
708 ])
709 .unwrap();
710 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:23001"));
711 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:23001"));
712
713 let command = StartCommand::try_parse_from([
714 "datanode",
715 "--rpc-addr",
716 "127.0.0.1:33001",
717 "--rpc-hostname",
718 "10.0.0.3:33001",
719 ])
720 .unwrap();
721 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:33001"));
722 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:33001"));
723 }
724
725 #[test]
726 fn test_help_uses_grpc_option_names() {
727 let mut cmd = StartCommand::command();
728 let mut help = Vec::new();
729 cmd.write_long_help(&mut help).unwrap();
730 let help = String::from_utf8(help).unwrap();
731
732 assert!(help.contains("--grpc-bind-addr"));
733 assert!(help.contains("--grpc-server-addr"));
734 assert!(!help.contains("--rpc-bind-addr"));
735 assert!(!help.contains("--rpc-server-addr"));
736 assert!(!help.contains("--rpc-addr"));
737 assert!(!help.contains("--rpc-hostname"));
738 }
739}