1pub mod builder;
16
17use std::path::Path;
18use std::time::Duration;
19
20use async_trait::async_trait;
21use clap::Parser;
22use common_config::Configurable;
23use common_telemetry::logging::{DEFAULT_LOGGING_DIR, TracingOptions};
24use common_telemetry::{info, warn};
25use common_wal::config::DatanodeWalConfig;
26use datanode::datanode::Datanode;
27use meta_client::MetaClientOptions;
28use snafu::{ResultExt, ensure};
29use tracing_appender::non_blocking::WorkerGuard;
30
31use crate::App;
32use crate::datanode::builder::InstanceBuilder;
33use crate::error::{
34 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
35};
36use crate::options::{GlobalOptions, GreptimeOptions};
37
38pub const APP_NAME: &str = "greptime-datanode";
39
40type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
41
42pub struct Instance {
43 datanode: Datanode,
44
45 _guard: Vec<WorkerGuard>,
47}
48
49impl Instance {
50 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
51 Self {
52 datanode,
53 _guard: guard,
54 }
55 }
56
57 pub fn datanode(&self) -> &Datanode {
58 &self.datanode
59 }
60
61 pub fn datanode_mut(&mut self) -> &mut Datanode {
63 &mut self.datanode
64 }
65}
66
67#[async_trait]
68impl App for Instance {
69 fn name(&self) -> &str {
70 APP_NAME
71 }
72
73 async fn start(&mut self) -> Result<()> {
74 plugins::start_datanode_plugins(self.datanode.plugins())
75 .await
76 .context(StartDatanodeSnafu)?;
77
78 self.datanode.start().await.context(StartDatanodeSnafu)
79 }
80
81 async fn stop(&mut self) -> Result<()> {
82 self.datanode
83 .shutdown()
84 .await
85 .context(ShutdownDatanodeSnafu)
86 }
87}
88
89#[derive(Parser)]
90pub struct Command {
91 #[clap(subcommand)]
92 subcmd: SubCommand,
93}
94
95impl Command {
96 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
97 self.subcmd.build_with(builder).await
98 }
99
100 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
101 match &self.subcmd {
102 SubCommand::Start(cmd) => cmd.load_options(global_options),
103 }
104 }
105}
106
107#[derive(Parser)]
108enum SubCommand {
109 Start(StartCommand),
110}
111
112impl SubCommand {
113 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
114 match self {
115 SubCommand::Start(cmd) => {
116 info!("Building datanode with {:#?}", cmd);
117 builder.build().await
118 }
119 }
120 }
121}
122
123#[derive(Debug, Parser, Default)]
124struct StartCommand {
125 #[clap(long)]
126 node_id: Option<u64>,
127 #[clap(long, alias = "rpc-addr")]
129 rpc_bind_addr: Option<String>,
130 #[clap(long, alias = "rpc-hostname")]
134 rpc_server_addr: Option<String>,
135 #[clap(long, value_delimiter = ',', num_args = 1..)]
136 metasrv_addrs: Option<Vec<String>>,
137 #[clap(short, long)]
138 config_file: Option<String>,
139 #[clap(long)]
140 data_home: Option<String>,
141 #[clap(long)]
142 wal_dir: Option<String>,
143 #[clap(long)]
144 http_addr: Option<String>,
145 #[clap(long)]
146 http_timeout: Option<u64>,
147 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
148 env_prefix: String,
149}
150
151impl StartCommand {
152 fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
153 let mut opts = DatanodeOptions::load_layered_options(
154 self.config_file.as_deref(),
155 self.env_prefix.as_ref(),
156 )
157 .context(LoadLayeredConfigSnafu)?;
158
159 self.merge_with_cli_options(global_options, &mut opts)?;
160 opts.component.sanitize();
161
162 Ok(opts)
163 }
164
165 #[allow(deprecated)]
167 fn merge_with_cli_options(
168 &self,
169 global_options: &GlobalOptions,
170 opts: &mut DatanodeOptions,
171 ) -> Result<()> {
172 let opts = &mut opts.component;
173
174 if let Some(dir) = &global_options.log_dir {
175 opts.logging.dir.clone_from(dir);
176 }
177
178 if global_options.log_level.is_some() {
179 opts.logging.level.clone_from(&global_options.log_level);
180 }
181
182 opts.tracing = TracingOptions {
183 #[cfg(feature = "tokio-console")]
184 tokio_console_addr: global_options.tokio_console_addr.clone(),
185 };
186
187 if let Some(addr) = &self.rpc_bind_addr {
188 opts.grpc.bind_addr.clone_from(addr);
189 } else if let Some(addr) = &opts.rpc_addr {
190 warn!(
191 "Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead."
192 );
193 opts.grpc.bind_addr.clone_from(addr);
194 }
195
196 if let Some(server_addr) = &self.rpc_server_addr {
197 opts.grpc.server_addr.clone_from(server_addr);
198 } else if let Some(server_addr) = &opts.rpc_hostname {
199 warn!(
200 "Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead."
201 );
202 opts.grpc.server_addr.clone_from(server_addr);
203 }
204
205 if let Some(runtime_size) = opts.rpc_runtime_size {
206 warn!(
207 "Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead."
208 );
209 opts.grpc.runtime_size = runtime_size;
210 }
211
212 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
213 warn!(
214 "Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead."
215 );
216 opts.grpc.max_recv_message_size = max_recv_message_size;
217 }
218
219 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
220 warn!(
221 "Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead."
222 );
223 opts.grpc.max_send_message_size = max_send_message_size;
224 }
225
226 if let Some(node_id) = self.node_id {
227 opts.node_id = Some(node_id);
228 }
229
230 if let Some(metasrv_addrs) = &self.metasrv_addrs {
231 opts.meta_client
232 .get_or_insert_with(MetaClientOptions::default)
233 .metasrv_addrs
234 .clone_from(metasrv_addrs);
235 }
236
237 ensure!(
238 opts.node_id.is_some(),
239 MissingConfigSnafu {
240 msg: "Missing node id option"
241 }
242 );
243
244 if let Some(data_home) = &self.data_home {
245 opts.storage.data_home.clone_from(data_home);
246 }
247
248 if let Some(wal_dir) = &self.wal_dir
250 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
251 {
252 if raft_engine_config
253 .dir
254 .as_ref()
255 .is_some_and(|original_dir| original_dir != wal_dir)
256 {
257 info!("The wal dir of raft-engine is altered to {wal_dir}");
258 }
259 raft_engine_config.dir.replace(wal_dir.clone());
260 }
261
262 if opts.logging.dir.is_empty() {
264 opts.logging.dir = Path::new(&opts.storage.data_home)
265 .join(DEFAULT_LOGGING_DIR)
266 .to_string_lossy()
267 .to_string();
268 }
269
270 if let Some(http_addr) = &self.http_addr {
271 opts.http.addr.clone_from(http_addr);
272 }
273
274 if let Some(http_timeout) = self.http_timeout {
275 opts.http.timeout = Duration::from_secs(http_timeout)
276 }
277
278 opts.http.disable_dashboard = true;
280
281 Ok(())
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use std::assert_matches::assert_matches;
288 use std::io::Write;
289 use std::time::Duration;
290
291 use common_config::ENV_VAR_SEP;
292 use common_test_util::temp_dir::create_named_temp_file;
293 use object_store::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
294 use servers::heartbeat_options::HeartbeatOptions;
295
296 use super::*;
297 use crate::options::GlobalOptions;
298
299 #[test]
300 fn test_deprecated_cli_options() {
301 common_telemetry::init_default_ut_logging();
302 let mut file = create_named_temp_file();
303 let toml_str = r#"
304 enable_memory_catalog = false
305 node_id = 42
306
307 rpc_addr = "127.0.0.1:4001"
308 rpc_hostname = "192.168.0.1"
309 [grpc]
310 bind_addr = "127.0.0.1:3001"
311 server_addr = "127.0.0.1"
312 runtime_size = 8
313 "#;
314 write!(file, "{}", toml_str).unwrap();
315
316 let cmd = StartCommand {
317 config_file: Some(file.path().to_str().unwrap().to_string()),
318 ..Default::default()
319 };
320
321 let options = cmd.load_options(&Default::default()).unwrap().component;
322 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
323 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
324 }
325
326 #[test]
327 fn test_read_from_config_file() {
328 let mut file = create_named_temp_file();
329 let toml_str = r#"
330 enable_memory_catalog = false
331 node_id = 42
332
333 [grpc]
334 addr = "127.0.0.1:3001"
335 hostname = "127.0.0.1"
336 runtime_size = 8
337
338 [heartbeat]
339 interval = "300ms"
340
341 [meta_client]
342 metasrv_addrs = ["127.0.0.1:3002"]
343 timeout = "3s"
344 connect_timeout = "5s"
345 ddl_timeout = "10s"
346 tcp_nodelay = true
347
348 [wal]
349 provider = "raft_engine"
350 dir = "/other/wal"
351 file_size = "1GB"
352 purge_threshold = "50GB"
353 purge_interval = "10m"
354 read_batch_size = 128
355 sync_write = false
356
357 [storage]
358 data_home = "./greptimedb_data/"
359 type = "File"
360
361 [[storage.providers]]
362 type = "Gcs"
363 bucket = "foo"
364 endpoint = "bar"
365
366 [[storage.providers]]
367 type = "S3"
368 bucket = "foo"
369
370 [logging]
371 level = "debug"
372 dir = "./greptimedb_data/test/logs"
373 "#;
374 write!(file, "{}", toml_str).unwrap();
375
376 let cmd = StartCommand {
377 config_file: Some(file.path().to_str().unwrap().to_string()),
378 ..Default::default()
379 };
380
381 let options = cmd.load_options(&Default::default()).unwrap().component;
382
383 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
384 assert_eq!(Some(42), options.node_id);
385
386 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
387 unreachable!()
388 };
389 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
390 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
391 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
392 assert_eq!(
393 1024 * 1024 * 1024 * 50,
394 raft_engine_config.purge_threshold.0
395 );
396 assert!(!raft_engine_config.sync_write);
397
398 let HeartbeatOptions {
399 interval: heart_beat_interval,
400 ..
401 } = options.heartbeat;
402
403 assert_eq!(300, heart_beat_interval.as_millis());
404
405 let MetaClientOptions {
406 metasrv_addrs: metasrv_addr,
407 timeout,
408 connect_timeout,
409 ddl_timeout,
410 tcp_nodelay,
411 ..
412 } = options.meta_client.unwrap();
413
414 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
415 assert_eq!(5000, connect_timeout.as_millis());
416 assert_eq!(10000, ddl_timeout.as_millis());
417 assert_eq!(3000, timeout.as_millis());
418 assert!(tcp_nodelay);
419 assert_eq!("./greptimedb_data/", options.storage.data_home);
420 assert!(matches!(
421 &options.storage.store,
422 ObjectStoreConfig::File(FileConfig { .. })
423 ));
424 assert_eq!(options.storage.providers.len(), 2);
425 assert!(matches!(
426 options.storage.providers[0],
427 ObjectStoreConfig::Gcs(GcsConfig { .. })
428 ));
429 assert!(matches!(
430 options.storage.providers[1],
431 ObjectStoreConfig::S3(S3Config { .. })
432 ));
433
434 assert_eq!("debug", options.logging.level.unwrap());
435 assert_eq!(
436 "./greptimedb_data/test/logs".to_string(),
437 options.logging.dir
438 );
439 }
440
441 #[test]
442 fn test_try_from_cmd() {
443 assert!(
444 (StartCommand {
445 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
446 ..Default::default()
447 })
448 .load_options(&GlobalOptions::default())
449 .is_err()
450 );
451
452 assert!(
454 (StartCommand {
455 node_id: Some(42),
456 ..Default::default()
457 })
458 .load_options(&GlobalOptions::default())
459 .is_ok()
460 );
461 }
462
463 #[test]
464 fn test_load_log_options_from_cli() {
465 let mut cmd = StartCommand::default();
466
467 let result = cmd.load_options(&GlobalOptions {
468 log_dir: Some("./greptimedb_data/test/logs".to_string()),
469 log_level: Some("debug".to_string()),
470
471 #[cfg(feature = "tokio-console")]
472 tokio_console_addr: None,
473 });
474 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
476
477 cmd.node_id = Some(42);
478
479 let options = cmd
480 .load_options(&GlobalOptions {
481 log_dir: Some("./greptimedb_data/test/logs".to_string()),
482 log_level: Some("debug".to_string()),
483
484 #[cfg(feature = "tokio-console")]
485 tokio_console_addr: None,
486 })
487 .unwrap()
488 .component;
489
490 let logging_opt = options.logging;
491 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
492 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
493 }
494
495 #[test]
496 fn test_config_precedence_order() {
497 let mut file = create_named_temp_file();
498 let toml_str = r#"
499 enable_memory_catalog = false
500 node_id = 42
501 rpc_addr = "127.0.0.1:3001"
502 rpc_runtime_size = 8
503 rpc_hostname = "10.103.174.219"
504
505 [meta_client]
506 timeout = "3s"
507 connect_timeout = "5s"
508 tcp_nodelay = true
509
510 [wal]
511 provider = "raft_engine"
512 file_size = "1GB"
513 purge_threshold = "50GB"
514 purge_interval = "5m"
515 sync_write = false
516
517 [storage]
518 type = "File"
519 data_home = "./greptimedb_data/"
520
521 [logging]
522 level = "debug"
523 dir = "./greptimedb_data/test/logs"
524 "#;
525 write!(file, "{}", toml_str).unwrap();
526
527 let env_prefix = "DATANODE_UT";
528 temp_env::with_vars(
529 [
530 (
531 [
533 env_prefix.to_string(),
534 "wal".to_uppercase(),
535 "purge_interval".to_uppercase(),
536 ]
537 .join(ENV_VAR_SEP),
538 Some("1m"),
539 ),
540 (
541 [
543 env_prefix.to_string(),
544 "wal".to_uppercase(),
545 "read_batch_size".to_uppercase(),
546 ]
547 .join(ENV_VAR_SEP),
548 Some("100"),
549 ),
550 (
551 [
553 env_prefix.to_string(),
554 "meta_client".to_uppercase(),
555 "metasrv_addrs".to_uppercase(),
556 ]
557 .join(ENV_VAR_SEP),
558 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
559 ),
560 ],
561 || {
562 let command = StartCommand {
563 config_file: Some(file.path().to_str().unwrap().to_string()),
564 wal_dir: Some("/other/wal/dir".to_string()),
565 env_prefix: env_prefix.to_string(),
566 ..Default::default()
567 };
568
569 let opts = command.load_options(&Default::default()).unwrap().component;
570
571 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
573 unreachable!()
574 };
575 assert_eq!(raft_engine_config.read_batch_size, 100);
576 assert_eq!(
577 opts.meta_client.unwrap().metasrv_addrs,
578 vec![
579 "127.0.0.1:3001".to_string(),
580 "127.0.0.1:3002".to_string(),
581 "127.0.0.1:3003".to_string()
582 ]
583 );
584
585 assert_eq!(
587 raft_engine_config.purge_interval,
588 Duration::from_secs(60 * 5)
589 );
590
591 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
593
594 assert_eq!(
596 opts.http.addr,
597 DatanodeOptions::default().component.http.addr
598 );
599 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
600 },
601 );
602 }
603}