1pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18#[allow(clippy::print_stdout)]
19mod scanbench;
20
21use std::path::Path;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use clap::Parser;
26use common_config::Configurable;
27use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
28use common_telemetry::{info, warn};
29use common_wal::config::DatanodeWalConfig;
30use datanode::config::RegionEngineConfig;
31use datanode::datanode::Datanode;
32use meta_client::MetaClientOptions;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use tracing_appender::non_blocking::WorkerGuard;
36
37use crate::App;
38use crate::datanode::builder::InstanceBuilder;
39use crate::datanode::objbench::ObjbenchCommand;
40use crate::datanode::scanbench::ScanbenchCommand;
41use crate::error::{
42 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
43};
44use crate::options::{GlobalOptions, GreptimeOptions};
45
46pub const APP_NAME: &str = "greptime-datanode";
47
48type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
49
50pub struct Instance {
51 datanode: Datanode,
52
53 _guard: Vec<WorkerGuard>,
55}
56
57impl Instance {
58 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
59 Self {
60 datanode,
61 _guard: guard,
62 }
63 }
64
65 pub fn datanode(&self) -> &Datanode {
66 &self.datanode
67 }
68
69 pub fn datanode_mut(&mut self) -> &mut Datanode {
71 &mut self.datanode
72 }
73}
74
75#[async_trait]
76impl App for Instance {
77 fn name(&self) -> &str {
78 APP_NAME
79 }
80
81 async fn start(&mut self) -> Result<()> {
82 plugins::start_datanode_plugins(self.datanode.plugins())
83 .await
84 .context(StartDatanodeSnafu)?;
85
86 self.datanode.start().await.context(StartDatanodeSnafu)
87 }
88
89 async fn stop(&mut self) -> Result<()> {
90 self.datanode
91 .shutdown()
92 .await
93 .context(ShutdownDatanodeSnafu)
94 }
95}
96
97#[derive(Parser)]
98pub struct Command {
99 #[clap(subcommand)]
100 pub subcmd: SubCommand,
101}
102
103impl Command {
104 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
105 self.subcmd.build_with(builder).await
106 }
107
108 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
109 match &self.subcmd {
110 SubCommand::Start(cmd) => cmd.load_options(global_options),
111 SubCommand::Objbench(_) | SubCommand::Scanbench(_) => {
112 let mut opts = datanode::config::DatanodeOptions::default();
115 opts.sanitize();
116 Ok(DatanodeOptions {
117 runtime: Default::default(),
118 plugins: Default::default(),
119 component: opts,
120 })
121 }
122 }
123 }
124}
125
126#[derive(Parser)]
127pub enum SubCommand {
128 Start(StartCommand),
129 Objbench(ObjbenchCommand),
131 Scanbench(ScanbenchCommand),
133}
134
135impl SubCommand {
136 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
137 match self {
138 SubCommand::Start(cmd) => {
139 info!("Building datanode with {:#?}", cmd);
140 builder.build().await
141 }
142 SubCommand::Objbench(cmd) => {
143 cmd.run().await?;
144 std::process::exit(0);
145 }
146 SubCommand::Scanbench(cmd) => {
147 cmd.run().await?;
148 std::process::exit(0);
149 }
150 }
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct StorageConfig {
158 pub data_home: String,
160 #[serde(flatten)]
161 pub store: object_store::config::ObjectStoreConfig,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
165#[serde(default)]
166struct StorageConfigWrapper {
167 storage: StorageConfig,
168 region_engine: Vec<RegionEngineConfig>,
169 #[serde(default, deserialize_with = "deserialize_wal_config")]
170 wal: DatanodeWalConfig,
171}
172
173fn deserialize_wal_config<'de, D>(
178 deserializer: D,
179) -> std::result::Result<DatanodeWalConfig, D::Error>
180where
181 D: serde::Deserializer<'de>,
182{
183 use serde::de::Error as _;
184
185 let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
186 if !table.contains_key("provider") {
187 table.insert(
188 "provider".to_string(),
189 toml::Value::String("raft_engine".to_string()),
190 );
191 }
192 DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
193}
194
195#[derive(Debug, Parser, Default)]
196pub struct StartCommand {
197 #[clap(long)]
198 node_id: Option<u64>,
199 #[clap(long = "grpc-bind-addr", alias = "rpc-bind-addr", alias = "rpc-addr")]
201 grpc_bind_addr: Option<String>,
202 #[clap(
206 long = "grpc-server-addr",
207 alias = "rpc-server-addr",
208 alias = "rpc-hostname"
209 )]
210 grpc_server_addr: Option<String>,
211 #[clap(long, value_delimiter = ',', num_args = 1..)]
212 metasrv_addrs: Option<Vec<String>>,
213 #[clap(short, long)]
214 config_file: Option<String>,
215 #[clap(long)]
216 data_home: Option<String>,
217 #[clap(long)]
218 wal_dir: Option<String>,
219 #[clap(long)]
220 http_addr: Option<String>,
221 #[clap(long)]
222 http_timeout: Option<u64>,
223 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
224 env_prefix: String,
225}
226
227impl StartCommand {
228 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
229 let mut opts = DatanodeOptions::load_layered_options(
230 self.config_file.as_deref(),
231 self.env_prefix.as_ref(),
232 )
233 .context(LoadLayeredConfigSnafu)?;
234
235 self.merge_with_cli_options(global_options, &mut opts)?;
236 opts.component.sanitize();
237
238 Ok(opts)
239 }
240
241 #[allow(deprecated)]
243 fn merge_with_cli_options(
244 &self,
245 global_options: &GlobalOptions,
246 opts: &mut DatanodeOptions,
247 ) -> Result<()> {
248 let opts = &mut opts.component;
249
250 if let Some(dir) = &global_options.log_dir {
251 opts.logging.dir.clone_from(dir);
252 }
253
254 if global_options.log_level.is_some() {
255 opts.logging.level.clone_from(&global_options.log_level);
256 }
257
258 opts.tracing = TracingOptions {
259 #[cfg(feature = "tokio-console")]
260 tokio_console_addr: global_options.tokio_console_addr.clone(),
261 };
262
263 if let Some(addr) = &self.grpc_bind_addr {
264 opts.grpc.bind_addr.clone_from(addr);
265 } else if let Some(addr) = &opts.rpc_addr {
266 warn!(
267 "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.bind_addr` instead."
268 );
269 opts.grpc.bind_addr.clone_from(addr);
270 }
271
272 if let Some(server_addr) = &self.grpc_server_addr {
273 opts.grpc.server_addr.clone_from(server_addr);
274 } else if let Some(server_addr) = &opts.rpc_hostname {
275 warn!(
276 "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.server_addr` instead."
277 );
278 opts.grpc.server_addr.clone_from(server_addr);
279 }
280
281 if let Some(runtime_size) = opts.rpc_runtime_size {
282 warn!(
283 "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
284 );
285 opts.grpc.runtime_size = runtime_size;
286 }
287
288 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
289 warn!(
290 "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
291 );
292 opts.grpc.max_recv_message_size = max_recv_message_size;
293 }
294
295 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
296 warn!(
297 "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
298 );
299 opts.grpc.max_send_message_size = max_send_message_size;
300 }
301
302 if let Some(node_id) = self.node_id {
303 opts.node_id = Some(node_id);
304 }
305
306 if let Some(metasrv_addrs) = &self.metasrv_addrs {
307 opts.meta_client
308 .get_or_insert_with(MetaClientOptions::default)
309 .metasrv_addrs
310 .clone_from(metasrv_addrs);
311 }
312
313 ensure!(
314 opts.node_id.is_some(),
315 MissingConfigSnafu {
316 msg: "Missing node id option"
317 }
318 );
319
320 if let Some(data_home) = &self.data_home {
321 opts.storage.data_home.clone_from(data_home);
322 }
323
324 if let Some(wal_dir) = &self.wal_dir
326 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
327 {
328 if raft_engine_config
329 .dir
330 .as_ref()
331 .is_some_and(|original_dir| original_dir != wal_dir)
332 {
333 info!("The wal dir of raft-engine is altered to {wal_dir}");
334 }
335 raft_engine_config.dir.replace(wal_dir.clone());
336 }
337
338 if opts.logging.dir.is_empty() {
340 opts.logging.dir = Path::new(&opts.storage.data_home)
341 .join(DEFAULT_LOGGING_DIR)
342 .to_string_lossy()
343 .to_string();
344 }
345
346 if let Some(http_addr) = &self.http_addr {
347 opts.http.addr.clone_from(http_addr);
348 }
349
350 if let Some(http_timeout) = self.http_timeout {
351 opts.http.timeout = Duration::from_secs(http_timeout)
352 }
353
354 opts.http.disable_dashboard = true;
356
357 Ok(())
358 }
359}
360
361#[cfg(test)]
362mod tests {
363 use std::assert_matches;
364 use std::io::Write;
365 use std::time::Duration;
366
367 use clap::{CommandFactory, Parser};
368 use common_config::ENV_VAR_SEP;
369 use common_test_util::temp_dir::create_named_temp_file;
370 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
371
372 use super::*;
373 use crate::options::GlobalOptions;
374
375 #[test]
376 fn test_deprecated_cli_options() {
377 common_telemetry::init_default_ut_logging();
378 let mut file = create_named_temp_file();
379 let toml_str = r#"
380 enable_memory_catalog = false
381 node_id = 42
382
383 rpc_addr = "127.0.0.1:4001"
384 rpc_hostname = "192.168.0.1"
385 [grpc]
386 bind_addr = "127.0.0.1:3001"
387 server_addr = "127.0.0.1"
388 runtime_size = 8
389 "#;
390 write!(file, "{}", toml_str).unwrap();
391
392 let cmd = StartCommand {
393 config_file: Some(file.path().to_str().unwrap().to_string()),
394 ..Default::default()
395 };
396
397 let options = cmd.load_options(&Default::default()).unwrap().component;
398 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
399 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
400 }
401
402 #[test]
403 fn test_read_from_config_file() {
404 let mut file = create_named_temp_file();
405 let toml_str = r#"
406 enable_memory_catalog = false
407 node_id = 42
408
409 [grpc]
410 bind_addr = "127.0.0.1:3001"
411 server_addr = "127.0.0.1"
412 runtime_size = 8
413
414 [meta_client]
415 metasrv_addrs = ["127.0.0.1:3002"]
416 timeout = "3s"
417 connect_timeout = "5s"
418 ddl_timeout = "10s"
419 tcp_nodelay = true
420
421 [wal]
422 provider = "raft_engine"
423 dir = "/other/wal"
424 file_size = "1GB"
425 purge_threshold = "50GB"
426 purge_interval = "10m"
427 read_batch_size = 128
428 sync_write = false
429
430 [storage]
431 data_home = "./greptimedb_data/"
432 type = "File"
433
434 [[storage.providers]]
435 type = "Gcs"
436 bucket = "foo"
437 endpoint = "bar"
438
439 [[storage.providers]]
440 type = "S3"
441 bucket = "foo"
442
443 [logging]
444 level = "debug"
445 dir = "./greptimedb_data/test/logs"
446 "#;
447 write!(file, "{}", toml_str).unwrap();
448
449 let cmd = StartCommand {
450 config_file: Some(file.path().to_str().unwrap().to_string()),
451 ..Default::default()
452 };
453
454 let options = cmd.load_options(&Default::default()).unwrap().component;
455
456 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
457 assert_eq!("127.0.0.1".to_string(), options.grpc.server_addr);
458 assert_eq!(Some(42), options.node_id);
459
460 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
461 unreachable!()
462 };
463 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
464 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
465 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
466 assert_eq!(
467 1024 * 1024 * 1024 * 50,
468 raft_engine_config.purge_threshold.0
469 );
470 assert!(!raft_engine_config.sync_write);
471
472 let MetaClientOptions {
473 metasrv_addrs: metasrv_addr,
474 timeout,
475 connect_timeout,
476 ddl_timeout,
477 tcp_nodelay,
478 ..
479 } = options.meta_client.unwrap();
480
481 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
482 assert_eq!(5000, connect_timeout.as_millis());
483 assert_eq!(10000, ddl_timeout.as_millis());
484 assert_eq!(3000, timeout.as_millis());
485 assert!(tcp_nodelay);
486 assert_eq!("./greptimedb_data/", options.storage.data_home);
487 assert!(matches!(
488 &options.storage.store,
489 ObjectStoreConfig::File(FileConfig { .. })
490 ));
491 assert_eq!(options.storage.providers.len(), 2);
492 assert!(matches!(
493 options.storage.providers[0],
494 ObjectStoreConfig::Gcs(GcsConfig { .. })
495 ));
496 assert!(matches!(
497 options.storage.providers[1],
498 ObjectStoreConfig::S3(S3Config { .. })
499 ));
500
501 assert_eq!("debug", options.logging.level.unwrap());
502 assert_eq!(
503 "./greptimedb_data/test/logs".to_string(),
504 options.logging.dir
505 );
506 }
507
508 #[test]
509 fn test_try_from_cmd() {
510 assert!(
511 (StartCommand {
512 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
513 ..Default::default()
514 })
515 .load_options(&GlobalOptions::default())
516 .is_err()
517 );
518
519 assert!(
521 (StartCommand {
522 node_id: Some(42),
523 ..Default::default()
524 })
525 .load_options(&GlobalOptions::default())
526 .is_ok()
527 );
528 }
529
530 #[test]
531 fn test_load_log_options_from_cli() {
532 let mut cmd = StartCommand::default();
533
534 let result = cmd.load_options(&GlobalOptions {
535 log_dir: Some("./greptimedb_data/test/logs".to_string()),
536 log_level: Some("debug".to_string()),
537
538 #[cfg(feature = "tokio-console")]
539 tokio_console_addr: None,
540 });
541 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
543
544 cmd.node_id = Some(42);
545
546 let options = cmd
547 .load_options(&GlobalOptions {
548 log_dir: Some("./greptimedb_data/test/logs".to_string()),
549 log_level: Some("debug".to_string()),
550
551 #[cfg(feature = "tokio-console")]
552 tokio_console_addr: None,
553 })
554 .unwrap()
555 .component;
556
557 let logging_opt = options.logging;
558 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
559 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
560 }
561
562 #[test]
563 fn test_config_precedence_order() {
564 let mut file = create_named_temp_file();
565 let toml_str = r#"
566 enable_memory_catalog = false
567 node_id = 42
568 rpc_addr = "127.0.0.1:3001"
569 rpc_runtime_size = 8
570 rpc_hostname = "10.103.174.219"
571
572 [meta_client]
573 timeout = "3s"
574 connect_timeout = "5s"
575 tcp_nodelay = true
576
577 [wal]
578 provider = "raft_engine"
579 file_size = "1GB"
580 purge_threshold = "50GB"
581 purge_interval = "5m"
582 sync_write = false
583
584 [storage]
585 type = "File"
586 data_home = "./greptimedb_data/"
587
588 [logging]
589 level = "debug"
590 dir = "./greptimedb_data/test/logs"
591 "#;
592 write!(file, "{}", toml_str).unwrap();
593
594 let env_prefix = "DATANODE_UT";
595 temp_env::with_vars(
596 [
597 (
598 [
600 env_prefix.to_string(),
601 "wal".to_uppercase(),
602 "purge_interval".to_uppercase(),
603 ]
604 .join(ENV_VAR_SEP),
605 Some("1m"),
606 ),
607 (
608 [
610 env_prefix.to_string(),
611 "wal".to_uppercase(),
612 "read_batch_size".to_uppercase(),
613 ]
614 .join(ENV_VAR_SEP),
615 Some("100"),
616 ),
617 (
618 [
620 env_prefix.to_string(),
621 "meta_client".to_uppercase(),
622 "metasrv_addrs".to_uppercase(),
623 ]
624 .join(ENV_VAR_SEP),
625 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
626 ),
627 ],
628 || {
629 let command = StartCommand {
630 config_file: Some(file.path().to_str().unwrap().to_string()),
631 wal_dir: Some("/other/wal/dir".to_string()),
632 env_prefix: env_prefix.to_string(),
633 ..Default::default()
634 };
635
636 let opts = command.load_options(&Default::default()).unwrap().component;
637
638 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
640 unreachable!()
641 };
642 assert_eq!(raft_engine_config.read_batch_size, 100);
643 assert_eq!(
644 opts.meta_client.unwrap().metasrv_addrs,
645 vec![
646 "127.0.0.1:3001".to_string(),
647 "127.0.0.1:3002".to_string(),
648 "127.0.0.1:3003".to_string()
649 ]
650 );
651
652 assert_eq!(
654 raft_engine_config.purge_interval,
655 Duration::from_secs(60 * 5)
656 );
657
658 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
660
661 assert_eq!(
663 opts.http.addr,
664 DatanodeOptions::default().component.http.addr
665 );
666 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
667 },
668 );
669 }
670
671 #[test]
672 fn test_parse_grpc_cli_aliases() {
673 let command = StartCommand::try_parse_from([
674 "datanode",
675 "--grpc-bind-addr",
676 "127.0.0.1:13001",
677 "--grpc-server-addr",
678 "10.0.0.1:13001",
679 ])
680 .unwrap();
681 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:13001"));
682 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.1:13001"));
683
684 let command = StartCommand::try_parse_from([
685 "datanode",
686 "--rpc-bind-addr",
687 "127.0.0.1:23001",
688 "--rpc-server-addr",
689 "10.0.0.2:23001",
690 ])
691 .unwrap();
692 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:23001"));
693 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.2:23001"));
694
695 let command = StartCommand::try_parse_from([
696 "datanode",
697 "--rpc-addr",
698 "127.0.0.1:33001",
699 "--rpc-hostname",
700 "10.0.0.3:33001",
701 ])
702 .unwrap();
703 assert_eq!(command.grpc_bind_addr.as_deref(), Some("127.0.0.1:33001"));
704 assert_eq!(command.grpc_server_addr.as_deref(), Some("10.0.0.3:33001"));
705 }
706
707 #[test]
708 fn test_help_uses_grpc_option_names() {
709 let mut cmd = StartCommand::command();
710 let mut help = Vec::new();
711 cmd.write_long_help(&mut help).unwrap();
712 let help = String::from_utf8(help).unwrap();
713
714 assert!(help.contains("--grpc-bind-addr"));
715 assert!(help.contains("--grpc-server-addr"));
716 assert!(!help.contains("--rpc-bind-addr"));
717 assert!(!help.contains("--rpc-server-addr"));
718 assert!(!help.contains("--rpc-addr"));
719 assert!(!help.contains("--rpc-hostname"));
720 }
721}