1pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18#[allow(clippy::print_stdout)]
19mod scanbench;
20
21use std::path::Path;
22use std::time::Duration;
23
24use async_trait::async_trait;
25use clap::Parser;
26use common_config::Configurable;
27use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
28use common_telemetry::{info, warn};
29use common_wal::config::DatanodeWalConfig;
30use datanode::config::RegionEngineConfig;
31use datanode::datanode::Datanode;
32use meta_client::MetaClientOptions;
33use serde::{Deserialize, Serialize};
34use snafu::{ResultExt, ensure};
35use tracing_appender::non_blocking::WorkerGuard;
36
37use crate::App;
38use crate::datanode::builder::InstanceBuilder;
39use crate::datanode::objbench::ObjbenchCommand;
40use crate::datanode::scanbench::ScanbenchCommand;
41use crate::error::{
42 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
43};
44use crate::options::{GlobalOptions, GreptimeOptions};
45
46pub const APP_NAME: &str = "greptime-datanode";
47
48type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
49
50pub struct Instance {
51 datanode: Datanode,
52
53 _guard: Vec<WorkerGuard>,
55}
56
57impl Instance {
58 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
59 Self {
60 datanode,
61 _guard: guard,
62 }
63 }
64
65 pub fn datanode(&self) -> &Datanode {
66 &self.datanode
67 }
68
69 pub fn datanode_mut(&mut self) -> &mut Datanode {
71 &mut self.datanode
72 }
73}
74
75#[async_trait]
76impl App for Instance {
77 fn name(&self) -> &str {
78 APP_NAME
79 }
80
81 async fn start(&mut self) -> Result<()> {
82 plugins::start_datanode_plugins(self.datanode.plugins())
83 .await
84 .context(StartDatanodeSnafu)?;
85
86 self.datanode.start().await.context(StartDatanodeSnafu)
87 }
88
89 async fn stop(&mut self) -> Result<()> {
90 self.datanode
91 .shutdown()
92 .await
93 .context(ShutdownDatanodeSnafu)
94 }
95}
96
97#[derive(Parser)]
98pub struct Command {
99 #[clap(subcommand)]
100 pub subcmd: SubCommand,
101}
102
103impl Command {
104 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
105 self.subcmd.build_with(builder).await
106 }
107
108 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
109 match &self.subcmd {
110 SubCommand::Start(cmd) => cmd.load_options(global_options),
111 SubCommand::Objbench(_) | SubCommand::Scanbench(_) => {
112 let mut opts = datanode::config::DatanodeOptions::default();
115 opts.sanitize();
116 Ok(DatanodeOptions {
117 runtime: Default::default(),
118 plugins: Default::default(),
119 component: opts,
120 })
121 }
122 }
123 }
124}
125
126#[derive(Parser)]
127pub enum SubCommand {
128 Start(StartCommand),
129 Objbench(ObjbenchCommand),
131 Scanbench(ScanbenchCommand),
133}
134
135impl SubCommand {
136 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
137 match self {
138 SubCommand::Start(cmd) => {
139 info!("Building datanode with {:#?}", cmd);
140 builder.build().await
141 }
142 SubCommand::Objbench(cmd) => {
143 cmd.run().await?;
144 std::process::exit(0);
145 }
146 SubCommand::Scanbench(cmd) => {
147 cmd.run().await?;
148 std::process::exit(0);
149 }
150 }
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157pub struct StorageConfig {
158 pub data_home: String,
160 #[serde(flatten)]
161 pub store: object_store::config::ObjectStoreConfig,
162}
163
164#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
165#[serde(default)]
166struct StorageConfigWrapper {
167 storage: StorageConfig,
168 region_engine: Vec<RegionEngineConfig>,
169 #[serde(default, deserialize_with = "deserialize_wal_config")]
170 wal: DatanodeWalConfig,
171}
172
173fn deserialize_wal_config<'de, D>(
178 deserializer: D,
179) -> std::result::Result<DatanodeWalConfig, D::Error>
180where
181 D: serde::Deserializer<'de>,
182{
183 use serde::de::Error as _;
184
185 let mut table = <toml::value::Table as serde::Deserialize>::deserialize(deserializer)?;
186 if !table.contains_key("provider") {
187 table.insert(
188 "provider".to_string(),
189 toml::Value::String("raft_engine".to_string()),
190 );
191 }
192 DatanodeWalConfig::deserialize(toml::Value::Table(table)).map_err(D::Error::custom)
193}
194
195#[derive(Debug, Parser, Default)]
196pub struct StartCommand {
197 #[clap(long)]
198 node_id: Option<u64>,
199 #[clap(long, alias = "rpc-addr")]
201 rpc_bind_addr: Option<String>,
202 #[clap(long, alias = "rpc-hostname")]
206 rpc_server_addr: Option<String>,
207 #[clap(long, value_delimiter = ',', num_args = 1..)]
208 metasrv_addrs: Option<Vec<String>>,
209 #[clap(short, long)]
210 config_file: Option<String>,
211 #[clap(long)]
212 data_home: Option<String>,
213 #[clap(long)]
214 wal_dir: Option<String>,
215 #[clap(long)]
216 http_addr: Option<String>,
217 #[clap(long)]
218 http_timeout: Option<u64>,
219 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
220 env_prefix: String,
221}
222
223impl StartCommand {
224 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
225 let mut opts = DatanodeOptions::load_layered_options(
226 self.config_file.as_deref(),
227 self.env_prefix.as_ref(),
228 )
229 .context(LoadLayeredConfigSnafu)?;
230
231 self.merge_with_cli_options(global_options, &mut opts)?;
232 opts.component.sanitize();
233
234 Ok(opts)
235 }
236
237 #[allow(deprecated)]
239 fn merge_with_cli_options(
240 &self,
241 global_options: &GlobalOptions,
242 opts: &mut DatanodeOptions,
243 ) -> Result<()> {
244 let opts = &mut opts.component;
245
246 if let Some(dir) = &global_options.log_dir {
247 opts.logging.dir.clone_from(dir);
248 }
249
250 if global_options.log_level.is_some() {
251 opts.logging.level.clone_from(&global_options.log_level);
252 }
253
254 opts.tracing = TracingOptions {
255 #[cfg(feature = "tokio-console")]
256 tokio_console_addr: global_options.tokio_console_addr.clone(),
257 };
258
259 if let Some(addr) = &self.rpc_bind_addr {
260 opts.grpc.bind_addr.clone_from(addr);
261 } else if let Some(addr) = &opts.rpc_addr {
262 warn!(
263 "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
264 );
265 opts.grpc.bind_addr.clone_from(addr);
266 }
267
268 if let Some(server_addr) = &self.rpc_server_addr {
269 opts.grpc.server_addr.clone_from(server_addr);
270 } else if let Some(server_addr) = &opts.rpc_hostname {
271 warn!(
272 "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
273 );
274 opts.grpc.server_addr.clone_from(server_addr);
275 }
276
277 if let Some(runtime_size) = opts.rpc_runtime_size {
278 warn!(
279 "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
280 );
281 opts.grpc.runtime_size = runtime_size;
282 }
283
284 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
285 warn!(
286 "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
287 );
288 opts.grpc.max_recv_message_size = max_recv_message_size;
289 }
290
291 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
292 warn!(
293 "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
294 );
295 opts.grpc.max_send_message_size = max_send_message_size;
296 }
297
298 if let Some(node_id) = self.node_id {
299 opts.node_id = Some(node_id);
300 }
301
302 if let Some(metasrv_addrs) = &self.metasrv_addrs {
303 opts.meta_client
304 .get_or_insert_with(MetaClientOptions::default)
305 .metasrv_addrs
306 .clone_from(metasrv_addrs);
307 }
308
309 ensure!(
310 opts.node_id.is_some(),
311 MissingConfigSnafu {
312 msg: "Missing node id option"
313 }
314 );
315
316 if let Some(data_home) = &self.data_home {
317 opts.storage.data_home.clone_from(data_home);
318 }
319
320 if let Some(wal_dir) = &self.wal_dir
322 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
323 {
324 if raft_engine_config
325 .dir
326 .as_ref()
327 .is_some_and(|original_dir| original_dir != wal_dir)
328 {
329 info!("The wal dir of raft-engine is altered to {wal_dir}");
330 }
331 raft_engine_config.dir.replace(wal_dir.clone());
332 }
333
334 if opts.logging.dir.is_empty() {
336 opts.logging.dir = Path::new(&opts.storage.data_home)
337 .join(DEFAULT_LOGGING_DIR)
338 .to_string_lossy()
339 .to_string();
340 }
341
342 if let Some(http_addr) = &self.http_addr {
343 opts.http.addr.clone_from(http_addr);
344 }
345
346 if let Some(http_timeout) = self.http_timeout {
347 opts.http.timeout = Duration::from_secs(http_timeout)
348 }
349
350 opts.http.disable_dashboard = true;
352
353 Ok(())
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use std::assert_matches::assert_matches;
360 use std::io::Write;
361 use std::time::Duration;
362
363 use common_config::ENV_VAR_SEP;
364 use common_test_util::temp_dir::create_named_temp_file;
365 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
366
367 use super::*;
368 use crate::options::GlobalOptions;
369
370 #[test]
371 fn test_deprecated_cli_options() {
372 common_telemetry::init_default_ut_logging();
373 let mut file = create_named_temp_file();
374 let toml_str = r#"
375 enable_memory_catalog = false
376 node_id = 42
377
378 rpc_addr = "127.0.0.1:4001"
379 rpc_hostname = "192.168.0.1"
380 [grpc]
381 bind_addr = "127.0.0.1:3001"
382 server_addr = "127.0.0.1"
383 runtime_size = 8
384 "#;
385 write!(file, "{}", toml_str).unwrap();
386
387 let cmd = StartCommand {
388 config_file: Some(file.path().to_str().unwrap().to_string()),
389 ..Default::default()
390 };
391
392 let options = cmd.load_options(&Default::default()).unwrap().component;
393 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
394 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
395 }
396
397 #[test]
398 fn test_read_from_config_file() {
399 let mut file = create_named_temp_file();
400 let toml_str = r#"
401 enable_memory_catalog = false
402 node_id = 42
403
404 [grpc]
405 addr = "127.0.0.1:3001"
406 hostname = "127.0.0.1"
407 runtime_size = 8
408
409 [meta_client]
410 metasrv_addrs = ["127.0.0.1:3002"]
411 timeout = "3s"
412 connect_timeout = "5s"
413 ddl_timeout = "10s"
414 tcp_nodelay = true
415
416 [wal]
417 provider = "raft_engine"
418 dir = "/other/wal"
419 file_size = "1GB"
420 purge_threshold = "50GB"
421 purge_interval = "10m"
422 read_batch_size = 128
423 sync_write = false
424
425 [storage]
426 data_home = "./greptimedb_data/"
427 type = "File"
428
429 [[storage.providers]]
430 type = "Gcs"
431 bucket = "foo"
432 endpoint = "bar"
433
434 [[storage.providers]]
435 type = "S3"
436 bucket = "foo"
437
438 [logging]
439 level = "debug"
440 dir = "./greptimedb_data/test/logs"
441 "#;
442 write!(file, "{}", toml_str).unwrap();
443
444 let cmd = StartCommand {
445 config_file: Some(file.path().to_str().unwrap().to_string()),
446 ..Default::default()
447 };
448
449 let options = cmd.load_options(&Default::default()).unwrap().component;
450
451 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
452 assert_eq!(Some(42), options.node_id);
453
454 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
455 unreachable!()
456 };
457 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
458 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
459 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
460 assert_eq!(
461 1024 * 1024 * 1024 * 50,
462 raft_engine_config.purge_threshold.0
463 );
464 assert!(!raft_engine_config.sync_write);
465
466 let MetaClientOptions {
467 metasrv_addrs: metasrv_addr,
468 timeout,
469 connect_timeout,
470 ddl_timeout,
471 tcp_nodelay,
472 ..
473 } = options.meta_client.unwrap();
474
475 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
476 assert_eq!(5000, connect_timeout.as_millis());
477 assert_eq!(10000, ddl_timeout.as_millis());
478 assert_eq!(3000, timeout.as_millis());
479 assert!(tcp_nodelay);
480 assert_eq!("./greptimedb_data/", options.storage.data_home);
481 assert!(matches!(
482 &options.storage.store,
483 ObjectStoreConfig::File(FileConfig { .. })
484 ));
485 assert_eq!(options.storage.providers.len(), 2);
486 assert!(matches!(
487 options.storage.providers[0],
488 ObjectStoreConfig::Gcs(GcsConfig { .. })
489 ));
490 assert!(matches!(
491 options.storage.providers[1],
492 ObjectStoreConfig::S3(S3Config { .. })
493 ));
494
495 assert_eq!("debug", options.logging.level.unwrap());
496 assert_eq!(
497 "./greptimedb_data/test/logs".to_string(),
498 options.logging.dir
499 );
500 }
501
502 #[test]
503 fn test_try_from_cmd() {
504 assert!(
505 (StartCommand {
506 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
507 ..Default::default()
508 })
509 .load_options(&GlobalOptions::default())
510 .is_err()
511 );
512
513 assert!(
515 (StartCommand {
516 node_id: Some(42),
517 ..Default::default()
518 })
519 .load_options(&GlobalOptions::default())
520 .is_ok()
521 );
522 }
523
524 #[test]
525 fn test_load_log_options_from_cli() {
526 let mut cmd = StartCommand::default();
527
528 let result = cmd.load_options(&GlobalOptions {
529 log_dir: Some("./greptimedb_data/test/logs".to_string()),
530 log_level: Some("debug".to_string()),
531
532 #[cfg(feature = "tokio-console")]
533 tokio_console_addr: None,
534 });
535 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
537
538 cmd.node_id = Some(42);
539
540 let options = cmd
541 .load_options(&GlobalOptions {
542 log_dir: Some("./greptimedb_data/test/logs".to_string()),
543 log_level: Some("debug".to_string()),
544
545 #[cfg(feature = "tokio-console")]
546 tokio_console_addr: None,
547 })
548 .unwrap()
549 .component;
550
551 let logging_opt = options.logging;
552 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
553 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
554 }
555
556 #[test]
557 fn test_config_precedence_order() {
558 let mut file = create_named_temp_file();
559 let toml_str = r#"
560 enable_memory_catalog = false
561 node_id = 42
562 rpc_addr = "127.0.0.1:3001"
563 rpc_runtime_size = 8
564 rpc_hostname = "10.103.174.219"
565
566 [meta_client]
567 timeout = "3s"
568 connect_timeout = "5s"
569 tcp_nodelay = true
570
571 [wal]
572 provider = "raft_engine"
573 file_size = "1GB"
574 purge_threshold = "50GB"
575 purge_interval = "5m"
576 sync_write = false
577
578 [storage]
579 type = "File"
580 data_home = "./greptimedb_data/"
581
582 [logging]
583 level = "debug"
584 dir = "./greptimedb_data/test/logs"
585 "#;
586 write!(file, "{}", toml_str).unwrap();
587
588 let env_prefix = "DATANODE_UT";
589 temp_env::with_vars(
590 [
591 (
592 [
594 env_prefix.to_string(),
595 "wal".to_uppercase(),
596 "purge_interval".to_uppercase(),
597 ]
598 .join(ENV_VAR_SEP),
599 Some("1m"),
600 ),
601 (
602 [
604 env_prefix.to_string(),
605 "wal".to_uppercase(),
606 "read_batch_size".to_uppercase(),
607 ]
608 .join(ENV_VAR_SEP),
609 Some("100"),
610 ),
611 (
612 [
614 env_prefix.to_string(),
615 "meta_client".to_uppercase(),
616 "metasrv_addrs".to_uppercase(),
617 ]
618 .join(ENV_VAR_SEP),
619 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
620 ),
621 ],
622 || {
623 let command = StartCommand {
624 config_file: Some(file.path().to_str().unwrap().to_string()),
625 wal_dir: Some("/other/wal/dir".to_string()),
626 env_prefix: env_prefix.to_string(),
627 ..Default::default()
628 };
629
630 let opts = command.load_options(&Default::default()).unwrap().component;
631
632 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
634 unreachable!()
635 };
636 assert_eq!(raft_engine_config.read_batch_size, 100);
637 assert_eq!(
638 opts.meta_client.unwrap().metasrv_addrs,
639 vec![
640 "127.0.0.1:3001".to_string(),
641 "127.0.0.1:3002".to_string(),
642 "127.0.0.1:3003".to_string()
643 ]
644 );
645
646 assert_eq!(
648 raft_engine_config.purge_interval,
649 Duration::from_secs(60 * 5)
650 );
651
652 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
654
655 assert_eq!(
657 opts.http.addr,
658 DatanodeOptions::default().component.http.addr
659 );
660 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
661 },
662 );
663 }
664}