1pub mod builder;
16
17use std::path::Path;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_config::Configurable;
23use common_telemetry::logging::{TracingOptions, DEFAULT_LOGGING_DIR};
24use common_telemetry::{info, warn};
25use common_wal::config::DatanodeWalConfig;
26use datanode::datanode::Datanode;
27use meta_client::MetaClientOptions;
28use snafu::{ensure, ResultExt};
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::datanode::builder::InstanceBuilder;
32use crate::error::{
33 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
34};
35use crate::options::{GlobalOptions, GreptimeOptions};
36use crate::App;
37
38pub const APP_NAME: &str = "greptime-datanode";
39
40type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
41
42pub struct Instance {
43 datanode: Datanode,
44
45 _guard: Vec<WorkerGuard>,
47}
48
49impl Instance {
50 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
51 Self {
52 datanode,
53 _guard: guard,
54 }
55 }
56
57 pub fn datanode(&self) -> &Datanode {
58 &self.datanode
59 }
60
61 pub fn datanode_mut(&mut self) -> &mut Datanode {
63 &mut self.datanode
64 }
65}
66
67#[async_trait]
68impl App for Instance {
69 fn name(&self) -> &str {
70 APP_NAME
71 }
72
73 async fn start(&mut self) -> Result<()> {
74 plugins::start_datanode_plugins(self.datanode.plugins())
75 .await
76 .context(StartDatanodeSnafu)?;
77
78 self.datanode.start().await.context(StartDatanodeSnafu)
79 }
80
81 async fn stop(&mut self) -> Result<()> {
82 self.datanode
83 .shutdown()
84 .await
85 .context(ShutdownDatanodeSnafu)
86 }
87}
88
89#[derive(Parser)]
90pub struct Command {
91 #[clap(subcommand)]
92 subcmd: SubCommand,
93}
94
95impl Command {
96 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
97 self.subcmd.build_with(builder).await
98 }
99
100 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
101 match &self.subcmd {
102 SubCommand::Start(cmd) => cmd.load_options(global_options),
103 }
104 }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109 Start(StartCommand),
110}
111
112impl SubCommand {
113 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
114 match self {
115 SubCommand::Start(cmd) => {
116 info!("Building datanode with {:#?}", cmd);
117 builder.build().await
118 }
119 }
120 }
121}
122
123#[derive(Debug, Parser, Default)]
124struct StartCommand {
125 #[clap(long)]
126 node_id: Option<u64>,
127 #[clap(long, alias = "rpc-addr")]
129 rpc_bind_addr: Option<String>,
130 #[clap(long, alias = "rpc-hostname")]
134 rpc_server_addr: Option<String>,
135 #[clap(long, value_delimiter = ',', num_args = 1..)]
136 metasrv_addrs: Option<Vec<String>>,
137 #[clap(short, long)]
138 config_file: Option<String>,
139 #[clap(long)]
140 data_home: Option<String>,
141 #[clap(long)]
142 wal_dir: Option<String>,
143 #[clap(long)]
144 http_addr: Option<String>,
145 #[clap(long)]
146 http_timeout: Option<u64>,
147 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
148 env_prefix: String,
149}
150
151impl StartCommand {
152 fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
153 let mut opts = DatanodeOptions::load_layered_options(
154 self.config_file.as_deref(),
155 self.env_prefix.as_ref(),
156 )
157 .context(LoadLayeredConfigSnafu)?;
158
159 self.merge_with_cli_options(global_options, &mut opts)?;
160 opts.component.sanitize();
161
162 Ok(opts)
163 }
164
165 #[allow(deprecated)]
167 fn merge_with_cli_options(
168 &self,
169 global_options: &GlobalOptions,
170 opts: &mut DatanodeOptions,
171 ) -> Result<()> {
172 let opts = &mut opts.component;
173
174 if let Some(dir) = &global_options.log_dir {
175 opts.logging.dir.clone_from(dir);
176 }
177
178 if global_options.log_level.is_some() {
179 opts.logging.level.clone_from(&global_options.log_level);
180 }
181
182 opts.tracing = TracingOptions {
183 #[cfg(feature = "tokio-console")]
184 tokio_console_addr: global_options.tokio_console_addr.clone(),
185 };
186
187 if let Some(addr) = &self.rpc_bind_addr {
188 opts.grpc.bind_addr.clone_from(addr);
189 } else if let Some(addr) = &opts.rpc_addr {
190 warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
191 opts.grpc.bind_addr.clone_from(addr);
192 }
193
194 if let Some(server_addr) = &self.rpc_server_addr {
195 opts.grpc.server_addr.clone_from(server_addr);
196 } else if let Some(server_addr) = &opts.rpc_hostname {
197 warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
198 opts.grpc.server_addr.clone_from(server_addr);
199 }
200
201 if let Some(runtime_size) = opts.rpc_runtime_size {
202 warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
203 opts.grpc.runtime_size = runtime_size;
204 }
205
206 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
207 warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
208 opts.grpc.max_recv_message_size = max_recv_message_size;
209 }
210
211 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
212 warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
213 opts.grpc.max_send_message_size = max_send_message_size;
214 }
215
216 if let Some(node_id) = self.node_id {
217 opts.node_id = Some(node_id);
218 }
219
220 if let Some(metasrv_addrs) = &self.metasrv_addrs {
221 opts.meta_client
222 .get_or_insert_with(MetaClientOptions::default)
223 .metasrv_addrs
224 .clone_from(metasrv_addrs);
225 }
226
227 ensure!(
228 opts.node_id.is_some(),
229 MissingConfigSnafu {
230 msg: "Missing node id option"
231 }
232 );
233
234 if let Some(data_home) = &self.data_home {
235 opts.storage.data_home.clone_from(data_home);
236 }
237
238 if let Some(wal_dir) = &self.wal_dir
240 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
241 {
242 if raft_engine_config
243 .dir
244 .as_ref()
245 .is_some_and(|original_dir| original_dir != wal_dir)
246 {
247 info!("The wal dir of raft-engine is altered to {wal_dir}");
248 }
249 raft_engine_config.dir.replace(wal_dir.clone());
250 }
251
252 if opts.logging.dir.is_empty() {
254 opts.logging.dir = Path::new(&opts.storage.data_home)
255 .join(DEFAULT_LOGGING_DIR)
256 .to_string_lossy()
257 .to_string();
258 }
259
260 if let Some(http_addr) = &self.http_addr {
261 opts.http.addr.clone_from(http_addr);
262 }
263
264 if let Some(http_timeout) = self.http_timeout {
265 opts.http.timeout = Duration::from_secs(http_timeout)
266 }
267
268 opts.http.disable_dashboard = true;
270
271 Ok(())
272 }
273}
274
275#[cfg(test)]
276mod tests {
277 use std::assert_matches::assert_matches;
278 use std::io::Write;
279 use std::time::Duration;
280
281 use common_config::ENV_VAR_SEP;
282 use common_test_util::temp_dir::create_named_temp_file;
283 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
284 use servers::heartbeat_options::HeartbeatOptions;
285
286 use super::*;
287 use crate::options::GlobalOptions;
288
289 #[test]
290 fn test_deprecated_cli_options() {
291 common_telemetry::init_default_ut_logging();
292 let mut file = create_named_temp_file();
293 let toml_str = r#"
294 enable_memory_catalog = false
295 node_id = 42
296
297 rpc_addr = "127.0.0.1:4001"
298 rpc_hostname = "192.168.0.1"
299 [grpc]
300 bind_addr = "127.0.0.1:3001"
301 server_addr = "127.0.0.1"
302 runtime_size = 8
303 "#;
304 write!(file, "{}", toml_str).unwrap();
305
306 let cmd = StartCommand {
307 config_file: Some(file.path().to_str().unwrap().to_string()),
308 ..Default::default()
309 };
310
311 let options = cmd.load_options(&Default::default()).unwrap().component;
312 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
313 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
314 }
315
316 #[test]
317 fn test_read_from_config_file() {
318 let mut file = create_named_temp_file();
319 let toml_str = r#"
320 enable_memory_catalog = false
321 node_id = 42
322
323 [grpc]
324 addr = "127.0.0.1:3001"
325 hostname = "127.0.0.1"
326 runtime_size = 8
327
328 [heartbeat]
329 interval = "300ms"
330
331 [meta_client]
332 metasrv_addrs = ["127.0.0.1:3002"]
333 timeout = "3s"
334 connect_timeout = "5s"
335 ddl_timeout = "10s"
336 tcp_nodelay = true
337
338 [wal]
339 provider = "raft_engine"
340 dir = "/other/wal"
341 file_size = "1GB"
342 purge_threshold = "50GB"
343 purge_interval = "10m"
344 read_batch_size = 128
345 sync_write = false
346
347 [storage]
348 data_home = "./greptimedb_data/"
349 type = "File"
350
351 [[storage.providers]]
352 type = "Gcs"
353 bucket = "foo"
354 endpoint = "bar"
355
356 [[storage.providers]]
357 type = "S3"
358 bucket = "foo"
359
360 [logging]
361 level = "debug"
362 dir = "./greptimedb_data/test/logs"
363 "#;
364 write!(file, "{}", toml_str).unwrap();
365
366 let cmd = StartCommand {
367 config_file: Some(file.path().to_str().unwrap().to_string()),
368 ..Default::default()
369 };
370
371 let options = cmd.load_options(&Default::default()).unwrap().component;
372
373 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
374 assert_eq!(Some(42), options.node_id);
375
376 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
377 unreachable!()
378 };
379 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
380 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
381 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
382 assert_eq!(
383 1024 * 1024 * 1024 * 50,
384 raft_engine_config.purge_threshold.0
385 );
386 assert!(!raft_engine_config.sync_write);
387
388 let HeartbeatOptions {
389 interval: heart_beat_interval,
390 ..
391 } = options.heartbeat;
392
393 assert_eq!(300, heart_beat_interval.as_millis());
394
395 let MetaClientOptions {
396 metasrv_addrs: metasrv_addr,
397 timeout,
398 connect_timeout,
399 ddl_timeout,
400 tcp_nodelay,
401 ..
402 } = options.meta_client.unwrap();
403
404 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
405 assert_eq!(5000, connect_timeout.as_millis());
406 assert_eq!(10000, ddl_timeout.as_millis());
407 assert_eq!(3000, timeout.as_millis());
408 assert!(tcp_nodelay);
409 assert_eq!("./greptimedb_data/", options.storage.data_home);
410 assert!(matches!(
411 &options.storage.store,
412 ObjectStoreConfig::File(FileConfig { .. })
413 ));
414 assert_eq!(options.storage.providers.len(), 2);
415 assert!(matches!(
416 options.storage.providers[0],
417 ObjectStoreConfig::Gcs(GcsConfig { .. })
418 ));
419 assert!(matches!(
420 options.storage.providers[1],
421 ObjectStoreConfig::S3(S3Config { .. })
422 ));
423
424 assert_eq!("debug", options.logging.level.unwrap());
425 assert_eq!(
426 "./greptimedb_data/test/logs".to_string(),
427 options.logging.dir
428 );
429 }
430
431 #[test]
432 fn test_try_from_cmd() {
433 assert!((StartCommand {
434 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
435 ..Default::default()
436 })
437 .load_options(&GlobalOptions::default())
438 .is_err());
439
440 assert!((StartCommand {
442 node_id: Some(42),
443 ..Default::default()
444 })
445 .load_options(&GlobalOptions::default())
446 .is_ok());
447 }
448
449 #[test]
450 fn test_load_log_options_from_cli() {
451 let mut cmd = StartCommand::default();
452
453 let result = cmd.load_options(&GlobalOptions {
454 log_dir: Some("./greptimedb_data/test/logs".to_string()),
455 log_level: Some("debug".to_string()),
456
457 #[cfg(feature = "tokio-console")]
458 tokio_console_addr: None,
459 });
460 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
462
463 cmd.node_id = Some(42);
464
465 let options = cmd
466 .load_options(&GlobalOptions {
467 log_dir: Some("./greptimedb_data/test/logs".to_string()),
468 log_level: Some("debug".to_string()),
469
470 #[cfg(feature = "tokio-console")]
471 tokio_console_addr: None,
472 })
473 .unwrap()
474 .component;
475
476 let logging_opt = options.logging;
477 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
478 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
479 }
480
481 #[test]
482 fn test_config_precedence_order() {
483 let mut file = create_named_temp_file();
484 let toml_str = r#"
485 enable_memory_catalog = false
486 node_id = 42
487 rpc_addr = "127.0.0.1:3001"
488 rpc_runtime_size = 8
489 rpc_hostname = "10.103.174.219"
490
491 [meta_client]
492 timeout = "3s"
493 connect_timeout = "5s"
494 tcp_nodelay = true
495
496 [wal]
497 provider = "raft_engine"
498 file_size = "1GB"
499 purge_threshold = "50GB"
500 purge_interval = "5m"
501 sync_write = false
502
503 [storage]
504 type = "File"
505 data_home = "./greptimedb_data/"
506
507 [logging]
508 level = "debug"
509 dir = "./greptimedb_data/test/logs"
510 "#;
511 write!(file, "{}", toml_str).unwrap();
512
513 let env_prefix = "DATANODE_UT";
514 temp_env::with_vars(
515 [
516 (
517 [
519 env_prefix.to_string(),
520 "wal".to_uppercase(),
521 "purge_interval".to_uppercase(),
522 ]
523 .join(ENV_VAR_SEP),
524 Some("1m"),
525 ),
526 (
527 [
529 env_prefix.to_string(),
530 "wal".to_uppercase(),
531 "read_batch_size".to_uppercase(),
532 ]
533 .join(ENV_VAR_SEP),
534 Some("100"),
535 ),
536 (
537 [
539 env_prefix.to_string(),
540 "meta_client".to_uppercase(),
541 "metasrv_addrs".to_uppercase(),
542 ]
543 .join(ENV_VAR_SEP),
544 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
545 ),
546 ],
547 || {
548 let command = StartCommand {
549 config_file: Some(file.path().to_str().unwrap().to_string()),
550 wal_dir: Some("/other/wal/dir".to_string()),
551 env_prefix: env_prefix.to_string(),
552 ..Default::default()
553 };
554
555 let opts = command.load_options(&Default::default()).unwrap().component;
556
557 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
559 unreachable!()
560 };
561 assert_eq!(raft_engine_config.read_batch_size, 100);
562 assert_eq!(
563 opts.meta_client.unwrap().metasrv_addrs,
564 vec![
565 "127.0.0.1:3001".to_string(),
566 "127.0.0.1:3002".to_string(),
567 "127.0.0.1:3003".to_string()
568 ]
569 );
570
571 assert_eq!(
573 raft_engine_config.purge_interval,
574 Duration::from_secs(60 * 5)
575 );
576
577 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
579
580 assert_eq!(
582 opts.http.addr,
583 DatanodeOptions::default().component.http.addr
584 );
585 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
586 },
587 );
588 }
589}