1pub mod builder;
16#[allow(clippy::print_stdout)]
17mod objbench;
18
19use std::path::Path;
20use std::time::Duration;
21
22use async_trait::async_trait;
23use clap::Parser;
24use common_config::Configurable;
25use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
26use common_telemetry::{info, warn};
27use common_wal::config::DatanodeWalConfig;
28use datanode::config::RegionEngineConfig;
29use datanode::datanode::Datanode;
30use meta_client::MetaClientOptions;
31use serde::{Deserialize, Serialize};
32use snafu::{ResultExt, ensure};
33use tracing_appender::non_blocking::WorkerGuard;
34
35use crate::App;
36use crate::datanode::builder::InstanceBuilder;
37use crate::datanode::objbench::ObjbenchCommand;
38use crate::error::{
39 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
40};
41use crate::options::{GlobalOptions, GreptimeOptions};
42
43pub const APP_NAME: &str = "greptime-datanode";
44
45type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
46
47pub struct Instance {
48 datanode: Datanode,
49
50 _guard: Vec<WorkerGuard>,
52}
53
54impl Instance {
55 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
56 Self {
57 datanode,
58 _guard: guard,
59 }
60 }
61
62 pub fn datanode(&self) -> &Datanode {
63 &self.datanode
64 }
65
66 pub fn datanode_mut(&mut self) -> &mut Datanode {
68 &mut self.datanode
69 }
70}
71
72#[async_trait]
73impl App for Instance {
74 fn name(&self) -> &str {
75 APP_NAME
76 }
77
78 async fn start(&mut self) -> Result<()> {
79 plugins::start_datanode_plugins(self.datanode.plugins())
80 .await
81 .context(StartDatanodeSnafu)?;
82
83 self.datanode.start().await.context(StartDatanodeSnafu)
84 }
85
86 async fn stop(&mut self) -> Result<()> {
87 self.datanode
88 .shutdown()
89 .await
90 .context(ShutdownDatanodeSnafu)
91 }
92}
93
94#[derive(Parser)]
95pub struct Command {
96 #[clap(subcommand)]
97 pub subcmd: SubCommand,
98}
99
100impl Command {
101 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
102 self.subcmd.build_with(builder).await
103 }
104
105 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
106 match &self.subcmd {
107 SubCommand::Start(cmd) => cmd.load_options(global_options),
108 SubCommand::Objbench(_) => {
109 let mut opts = datanode::config::DatanodeOptions::default();
112 opts.sanitize();
113 Ok(DatanodeOptions {
114 runtime: Default::default(),
115 plugins: Default::default(),
116 component: opts,
117 })
118 }
119 }
120 }
121}
122
123#[derive(Parser)]
124pub enum SubCommand {
125 Start(StartCommand),
126 Objbench(ObjbenchCommand),
128}
129
130impl SubCommand {
131 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
132 match self {
133 SubCommand::Start(cmd) => {
134 info!("Building datanode with {:#?}", cmd);
135 builder.build().await
136 }
137 SubCommand::Objbench(cmd) => {
138 cmd.run().await?;
139 std::process::exit(0);
140 }
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
147#[serde(default)]
148pub struct StorageConfig {
149 pub data_home: String,
151 #[serde(flatten)]
152 pub store: object_store::config::ObjectStoreConfig,
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
156#[serde(default)]
157struct StorageConfigWrapper {
158 storage: StorageConfig,
159 region_engine: Vec<RegionEngineConfig>,
160}
161
162#[derive(Debug, Parser, Default)]
163pub struct StartCommand {
164 #[clap(long)]
165 node_id: Option<u64>,
166 #[clap(long, alias = "rpc-addr")]
168 rpc_bind_addr: Option<String>,
169 #[clap(long, alias = "rpc-hostname")]
173 rpc_server_addr: Option<String>,
174 #[clap(long, value_delimiter = ',', num_args = 1..)]
175 metasrv_addrs: Option<Vec<String>>,
176 #[clap(short, long)]
177 config_file: Option<String>,
178 #[clap(long)]
179 data_home: Option<String>,
180 #[clap(long)]
181 wal_dir: Option<String>,
182 #[clap(long)]
183 http_addr: Option<String>,
184 #[clap(long)]
185 http_timeout: Option<u64>,
186 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
187 env_prefix: String,
188}
189
190impl StartCommand {
191 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
192 let mut opts = DatanodeOptions::load_layered_options(
193 self.config_file.as_deref(),
194 self.env_prefix.as_ref(),
195 )
196 .context(LoadLayeredConfigSnafu)?;
197
198 self.merge_with_cli_options(global_options, &mut opts)?;
199 opts.component.sanitize();
200
201 Ok(opts)
202 }
203
204 #[allow(deprecated)]
206 fn merge_with_cli_options(
207 &self,
208 global_options: &GlobalOptions,
209 opts: &mut DatanodeOptions,
210 ) -> Result<()> {
211 let opts = &mut opts.component;
212
213 if let Some(dir) = &global_options.log_dir {
214 opts.logging.dir.clone_from(dir);
215 }
216
217 if global_options.log_level.is_some() {
218 opts.logging.level.clone_from(&global_options.log_level);
219 }
220
221 opts.tracing = TracingOptions {
222 #[cfg(feature = "tokio-console")]
223 tokio_console_addr: global_options.tokio_console_addr.clone(),
224 };
225
226 if let Some(addr) = &self.rpc_bind_addr {
227 opts.grpc.bind_addr.clone_from(addr);
228 } else if let Some(addr) = &opts.rpc_addr {
229 warn!(
230 "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
231 );
232 opts.grpc.bind_addr.clone_from(addr);
233 }
234
235 if let Some(server_addr) = &self.rpc_server_addr {
236 opts.grpc.server_addr.clone_from(server_addr);
237 } else if let Some(server_addr) = &opts.rpc_hostname {
238 warn!(
239 "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
240 );
241 opts.grpc.server_addr.clone_from(server_addr);
242 }
243
244 if let Some(runtime_size) = opts.rpc_runtime_size {
245 warn!(
246 "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
247 );
248 opts.grpc.runtime_size = runtime_size;
249 }
250
251 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
252 warn!(
253 "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
254 );
255 opts.grpc.max_recv_message_size = max_recv_message_size;
256 }
257
258 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
259 warn!(
260 "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
261 );
262 opts.grpc.max_send_message_size = max_send_message_size;
263 }
264
265 if let Some(node_id) = self.node_id {
266 opts.node_id = Some(node_id);
267 }
268
269 if let Some(metasrv_addrs) = &self.metasrv_addrs {
270 opts.meta_client
271 .get_or_insert_with(MetaClientOptions::default)
272 .metasrv_addrs
273 .clone_from(metasrv_addrs);
274 }
275
276 ensure!(
277 opts.node_id.is_some(),
278 MissingConfigSnafu {
279 msg: "Missing node id option"
280 }
281 );
282
283 if let Some(data_home) = &self.data_home {
284 opts.storage.data_home.clone_from(data_home);
285 }
286
287 if let Some(wal_dir) = &self.wal_dir
289 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
290 {
291 if raft_engine_config
292 .dir
293 .as_ref()
294 .is_some_and(|original_dir| original_dir != wal_dir)
295 {
296 info!("The wal dir of raft-engine is altered to {wal_dir}");
297 }
298 raft_engine_config.dir.replace(wal_dir.clone());
299 }
300
301 if opts.logging.dir.is_empty() {
303 opts.logging.dir = Path::new(&opts.storage.data_home)
304 .join(DEFAULT_LOGGING_DIR)
305 .to_string_lossy()
306 .to_string();
307 }
308
309 if let Some(http_addr) = &self.http_addr {
310 opts.http.addr.clone_from(http_addr);
311 }
312
313 if let Some(http_timeout) = self.http_timeout {
314 opts.http.timeout = Duration::from_secs(http_timeout)
315 }
316
317 opts.http.disable_dashboard = true;
319
320 Ok(())
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use std::assert_matches::assert_matches;
327 use std::io::Write;
328 use std::time::Duration;
329
330 use common_config::ENV_VAR_SEP;
331 use common_test_util::temp_dir::create_named_temp_file;
332 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
333 use servers::heartbeat_options::HeartbeatOptions;
334
335 use super::*;
336 use crate::options::GlobalOptions;
337
338 #[test]
339 fn test_deprecated_cli_options() {
340 common_telemetry::init_default_ut_logging();
341 let mut file = create_named_temp_file();
342 let toml_str = r#"
343 enable_memory_catalog = false
344 node_id = 42
345
346 rpc_addr = "127.0.0.1:4001"
347 rpc_hostname = "192.168.0.1"
348 [grpc]
349 bind_addr = "127.0.0.1:3001"
350 server_addr = "127.0.0.1"
351 runtime_size = 8
352 "#;
353 write!(file, "{}", toml_str).unwrap();
354
355 let cmd = StartCommand {
356 config_file: Some(file.path().to_str().unwrap().to_string()),
357 ..Default::default()
358 };
359
360 let options = cmd.load_options(&Default::default()).unwrap().component;
361 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
362 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
363 }
364
365 #[test]
366 fn test_read_from_config_file() {
367 let mut file = create_named_temp_file();
368 let toml_str = r#"
369 enable_memory_catalog = false
370 node_id = 42
371
372 [grpc]
373 addr = "127.0.0.1:3001"
374 hostname = "127.0.0.1"
375 runtime_size = 8
376
377 [heartbeat]
378 interval = "300ms"
379
380 [meta_client]
381 metasrv_addrs = ["127.0.0.1:3002"]
382 timeout = "3s"
383 connect_timeout = "5s"
384 ddl_timeout = "10s"
385 tcp_nodelay = true
386
387 [wal]
388 provider = "raft_engine"
389 dir = "/other/wal"
390 file_size = "1GB"
391 purge_threshold = "50GB"
392 purge_interval = "10m"
393 read_batch_size = 128
394 sync_write = false
395
396 [storage]
397 data_home = "./greptimedb_data/"
398 type = "File"
399
400 [[storage.providers]]
401 type = "Gcs"
402 bucket = "foo"
403 endpoint = "bar"
404
405 [[storage.providers]]
406 type = "S3"
407 bucket = "foo"
408
409 [logging]
410 level = "debug"
411 dir = "./greptimedb_data/test/logs"
412 "#;
413 write!(file, "{}", toml_str).unwrap();
414
415 let cmd = StartCommand {
416 config_file: Some(file.path().to_str().unwrap().to_string()),
417 ..Default::default()
418 };
419
420 let options = cmd.load_options(&Default::default()).unwrap().component;
421
422 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
423 assert_eq!(Some(42), options.node_id);
424
425 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
426 unreachable!()
427 };
428 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
429 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
430 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
431 assert_eq!(
432 1024 * 1024 * 1024 * 50,
433 raft_engine_config.purge_threshold.0
434 );
435 assert!(!raft_engine_config.sync_write);
436
437 let HeartbeatOptions {
438 interval: heart_beat_interval,
439 ..
440 } = options.heartbeat;
441
442 assert_eq!(300, heart_beat_interval.as_millis());
443
444 let MetaClientOptions {
445 metasrv_addrs: metasrv_addr,
446 timeout,
447 connect_timeout,
448 ddl_timeout,
449 tcp_nodelay,
450 ..
451 } = options.meta_client.unwrap();
452
453 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
454 assert_eq!(5000, connect_timeout.as_millis());
455 assert_eq!(10000, ddl_timeout.as_millis());
456 assert_eq!(3000, timeout.as_millis());
457 assert!(tcp_nodelay);
458 assert_eq!("./greptimedb_data/", options.storage.data_home);
459 assert!(matches!(
460 &options.storage.store,
461 ObjectStoreConfig::File(FileConfig { .. })
462 ));
463 assert_eq!(options.storage.providers.len(), 2);
464 assert!(matches!(
465 options.storage.providers[0],
466 ObjectStoreConfig::Gcs(GcsConfig { .. })
467 ));
468 assert!(matches!(
469 options.storage.providers[1],
470 ObjectStoreConfig::S3(S3Config { .. })
471 ));
472
473 assert_eq!("debug", options.logging.level.unwrap());
474 assert_eq!(
475 "./greptimedb_data/test/logs".to_string(),
476 options.logging.dir
477 );
478 }
479
480 #[test]
481 fn test_try_from_cmd() {
482 assert!(
483 (StartCommand {
484 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
485 ..Default::default()
486 })
487 .load_options(&GlobalOptions::default())
488 .is_err()
489 );
490
491 assert!(
493 (StartCommand {
494 node_id: Some(42),
495 ..Default::default()
496 })
497 .load_options(&GlobalOptions::default())
498 .is_ok()
499 );
500 }
501
502 #[test]
503 fn test_load_log_options_from_cli() {
504 let mut cmd = StartCommand::default();
505
506 let result = cmd.load_options(&GlobalOptions {
507 log_dir: Some("./greptimedb_data/test/logs".to_string()),
508 log_level: Some("debug".to_string()),
509
510 #[cfg(feature = "tokio-console")]
511 tokio_console_addr: None,
512 });
513 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
515
516 cmd.node_id = Some(42);
517
518 let options = cmd
519 .load_options(&GlobalOptions {
520 log_dir: Some("./greptimedb_data/test/logs".to_string()),
521 log_level: Some("debug".to_string()),
522
523 #[cfg(feature = "tokio-console")]
524 tokio_console_addr: None,
525 })
526 .unwrap()
527 .component;
528
529 let logging_opt = options.logging;
530 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
531 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
532 }
533
534 #[test]
535 fn test_config_precedence_order() {
536 let mut file = create_named_temp_file();
537 let toml_str = r#"
538 enable_memory_catalog = false
539 node_id = 42
540 rpc_addr = "127.0.0.1:3001"
541 rpc_runtime_size = 8
542 rpc_hostname = "10.103.174.219"
543
544 [meta_client]
545 timeout = "3s"
546 connect_timeout = "5s"
547 tcp_nodelay = true
548
549 [wal]
550 provider = "raft_engine"
551 file_size = "1GB"
552 purge_threshold = "50GB"
553 purge_interval = "5m"
554 sync_write = false
555
556 [storage]
557 type = "File"
558 data_home = "./greptimedb_data/"
559
560 [logging]
561 level = "debug"
562 dir = "./greptimedb_data/test/logs"
563 "#;
564 write!(file, "{}", toml_str).unwrap();
565
566 let env_prefix = "DATANODE_UT";
567 temp_env::with_vars(
568 [
569 (
570 [
572 env_prefix.to_string(),
573 "wal".to_uppercase(),
574 "purge_interval".to_uppercase(),
575 ]
576 .join(ENV_VAR_SEP),
577 Some("1m"),
578 ),
579 (
580 [
582 env_prefix.to_string(),
583 "wal".to_uppercase(),
584 "read_batch_size".to_uppercase(),
585 ]
586 .join(ENV_VAR_SEP),
587 Some("100"),
588 ),
589 (
590 [
592 env_prefix.to_string(),
593 "meta_client".to_uppercase(),
594 "metasrv_addrs".to_uppercase(),
595 ]
596 .join(ENV_VAR_SEP),
597 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
598 ),
599 ],
600 || {
601 let command = StartCommand {
602 config_file: Some(file.path().to_str().unwrap().to_string()),
603 wal_dir: Some("/other/wal/dir".to_string()),
604 env_prefix: env_prefix.to_string(),
605 ..Default::default()
606 };
607
608 let opts = command.load_options(&Default::default()).unwrap().component;
609
610 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
612 unreachable!()
613 };
614 assert_eq!(raft_engine_config.read_batch_size, 100);
615 assert_eq!(
616 opts.meta_client.unwrap().metasrv_addrs,
617 vec![
618 "127.0.0.1:3001".to_string(),
619 "127.0.0.1:3002".to_string(),
620 "127.0.0.1:3003".to_string()
621 ]
622 );
623
624 assert_eq!(
626 raft_engine_config.purge_interval,
627 Duration::from_secs(60 * 5)
628 );
629
630 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
632
633 assert_eq!(
635 opts.http.addr,
636 DatanodeOptions::default().component.http.addr
637 );
638 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
639 },
640 );
641 }
642}