1pub mod builder;
16
17use std::time::Duration;
18
19use async_trait::async_trait;
20use clap::Parser;
21use common_config::Configurable;
22use common_telemetry::logging::TracingOptions;
23use common_telemetry::{info, warn};
24use common_wal::config::DatanodeWalConfig;
25use datanode::datanode::Datanode;
26use meta_client::MetaClientOptions;
27use snafu::{ensure, ResultExt};
28use tracing_appender::non_blocking::WorkerGuard;
29
30use crate::datanode::builder::InstanceBuilder;
31use crate::error::{
32 LoadLayeredConfigSnafu, MissingConfigSnafu, Result, ShutdownDatanodeSnafu, StartDatanodeSnafu,
33};
34use crate::options::{GlobalOptions, GreptimeOptions};
35use crate::App;
36
37pub const APP_NAME: &str = "greptime-datanode";
38
39type DatanodeOptions = GreptimeOptions<datanode::config::DatanodeOptions>;
40
41pub struct Instance {
42 datanode: Datanode,
43
44 _guard: Vec<WorkerGuard>,
46}
47
48impl Instance {
49 pub fn new(datanode: Datanode, guard: Vec<WorkerGuard>) -> Self {
50 Self {
51 datanode,
52 _guard: guard,
53 }
54 }
55
56 pub fn datanode(&self) -> &Datanode {
57 &self.datanode
58 }
59
60 pub fn datanode_mut(&mut self) -> &mut Datanode {
62 &mut self.datanode
63 }
64}
65
66#[async_trait]
67impl App for Instance {
68 fn name(&self) -> &str {
69 APP_NAME
70 }
71
72 async fn start(&mut self) -> Result<()> {
73 plugins::start_datanode_plugins(self.datanode.plugins())
74 .await
75 .context(StartDatanodeSnafu)?;
76
77 self.datanode.start().await.context(StartDatanodeSnafu)
78 }
79
80 async fn stop(&mut self) -> Result<()> {
81 self.datanode
82 .shutdown()
83 .await
84 .context(ShutdownDatanodeSnafu)
85 }
86}
87
88#[derive(Parser)]
89pub struct Command {
90 #[clap(subcommand)]
91 subcmd: SubCommand,
92}
93
94impl Command {
95 pub async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
96 self.subcmd.build_with(builder).await
97 }
98
99 pub fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
100 match &self.subcmd {
101 SubCommand::Start(cmd) => cmd.load_options(global_options),
102 }
103 }
104}
105
106#[derive(Parser)]
107enum SubCommand {
108 Start(StartCommand),
109}
110
111impl SubCommand {
112 async fn build_with(&self, builder: InstanceBuilder) -> Result<Instance> {
113 match self {
114 SubCommand::Start(cmd) => {
115 info!("Building datanode with {:#?}", cmd);
116 builder.build().await
117 }
118 }
119 }
120}
121
122#[derive(Debug, Parser, Default)]
123struct StartCommand {
124 #[clap(long)]
125 node_id: Option<u64>,
126 #[clap(long, alias = "rpc-addr")]
128 rpc_bind_addr: Option<String>,
129 #[clap(long, alias = "rpc-hostname")]
133 rpc_server_addr: Option<String>,
134 #[clap(long, value_delimiter = ',', num_args = 1..)]
135 metasrv_addrs: Option<Vec<String>>,
136 #[clap(short, long)]
137 config_file: Option<String>,
138 #[clap(long)]
139 data_home: Option<String>,
140 #[clap(long)]
141 wal_dir: Option<String>,
142 #[clap(long)]
143 http_addr: Option<String>,
144 #[clap(long)]
145 http_timeout: Option<u64>,
146 #[clap(long, default_value = "GREPTIMEDB_DATANODE")]
147 env_prefix: String,
148}
149
150impl StartCommand {
151 fn load_options(&self, global_options: &GlobalOptions) -> Result<DatanodeOptions> {
152 let mut opts = DatanodeOptions::load_layered_options(
153 self.config_file.as_deref(),
154 self.env_prefix.as_ref(),
155 )
156 .context(LoadLayeredConfigSnafu)?;
157
158 self.merge_with_cli_options(global_options, &mut opts)?;
159 opts.component.sanitize();
160
161 Ok(opts)
162 }
163
164 #[allow(deprecated)]
166 fn merge_with_cli_options(
167 &self,
168 global_options: &GlobalOptions,
169 opts: &mut DatanodeOptions,
170 ) -> Result<()> {
171 let opts = &mut opts.component;
172
173 if let Some(dir) = &global_options.log_dir {
174 opts.logging.dir.clone_from(dir);
175 }
176
177 if global_options.log_level.is_some() {
178 opts.logging.level.clone_from(&global_options.log_level);
179 }
180
181 opts.tracing = TracingOptions {
182 #[cfg(feature = "tokio-console")]
183 tokio_console_addr: global_options.tokio_console_addr.clone(),
184 };
185
186 if let Some(addr) = &self.rpc_bind_addr {
187 opts.grpc.bind_addr.clone_from(addr);
188 } else if let Some(addr) = &opts.rpc_addr {
189 warn!("Use the deprecated attribute `DatanodeOptions.rpc_addr`, please use `grpc.addr` instead.");
190 opts.grpc.bind_addr.clone_from(addr);
191 }
192
193 if let Some(server_addr) = &self.rpc_server_addr {
194 opts.grpc.server_addr.clone_from(server_addr);
195 } else if let Some(server_addr) = &opts.rpc_hostname {
196 warn!("Use the deprecated attribute `DatanodeOptions.rpc_hostname`, please use `grpc.hostname` instead.");
197 opts.grpc.server_addr.clone_from(server_addr);
198 }
199
200 if let Some(runtime_size) = opts.rpc_runtime_size {
201 warn!("Use the deprecated attribute `DatanodeOptions.rpc_runtime_size`, please use `grpc.runtime_size` instead.");
202 opts.grpc.runtime_size = runtime_size;
203 }
204
205 if let Some(max_recv_message_size) = opts.rpc_max_recv_message_size {
206 warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_recv_message_size`, please use `grpc.max_recv_message_size` instead.");
207 opts.grpc.max_recv_message_size = max_recv_message_size;
208 }
209
210 if let Some(max_send_message_size) = opts.rpc_max_send_message_size {
211 warn!("Use the deprecated attribute `DatanodeOptions.rpc_max_send_message_size`, please use `grpc.max_send_message_size` instead.");
212 opts.grpc.max_send_message_size = max_send_message_size;
213 }
214
215 if let Some(node_id) = self.node_id {
216 opts.node_id = Some(node_id);
217 }
218
219 if let Some(metasrv_addrs) = &self.metasrv_addrs {
220 opts.meta_client
221 .get_or_insert_with(MetaClientOptions::default)
222 .metasrv_addrs
223 .clone_from(metasrv_addrs);
224 }
225
226 ensure!(
227 opts.node_id.is_some(),
228 MissingConfigSnafu {
229 msg: "Missing node id option"
230 }
231 );
232
233 if let Some(data_home) = &self.data_home {
234 opts.storage.data_home.clone_from(data_home);
235 }
236
237 if let Some(wal_dir) = &self.wal_dir
239 && let DatanodeWalConfig::RaftEngine(raft_engine_config) = &mut opts.wal
240 {
241 if raft_engine_config
242 .dir
243 .as_ref()
244 .is_some_and(|original_dir| original_dir != wal_dir)
245 {
246 info!("The wal dir of raft-engine is altered to {wal_dir}");
247 }
248 raft_engine_config.dir.replace(wal_dir.clone());
249 }
250
251 if let Some(http_addr) = &self.http_addr {
252 opts.http.addr.clone_from(http_addr);
253 }
254
255 if let Some(http_timeout) = self.http_timeout {
256 opts.http.timeout = Duration::from_secs(http_timeout)
257 }
258
259 opts.http.disable_dashboard = true;
261
262 Ok(())
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use std::assert_matches::assert_matches;
269 use std::io::Write;
270 use std::time::Duration;
271
272 use common_config::ENV_VAR_SEP;
273 use common_test_util::temp_dir::create_named_temp_file;
274 use datanode::config::{FileConfig, GcsConfig, ObjectStoreConfig, S3Config};
275 use servers::heartbeat_options::HeartbeatOptions;
276
277 use super::*;
278 use crate::options::GlobalOptions;
279
280 #[test]
281 fn test_deprecated_cli_options() {
282 common_telemetry::init_default_ut_logging();
283 let mut file = create_named_temp_file();
284 let toml_str = r#"
285 enable_memory_catalog = false
286 node_id = 42
287
288 rpc_addr = "127.0.0.1:4001"
289 rpc_hostname = "192.168.0.1"
290 [grpc]
291 bind_addr = "127.0.0.1:3001"
292 server_addr = "127.0.0.1"
293 runtime_size = 8
294 "#;
295 write!(file, "{}", toml_str).unwrap();
296
297 let cmd = StartCommand {
298 config_file: Some(file.path().to_str().unwrap().to_string()),
299 ..Default::default()
300 };
301
302 let options = cmd.load_options(&Default::default()).unwrap().component;
303 assert_eq!("127.0.0.1:4001".to_string(), options.grpc.bind_addr);
304 assert_eq!("192.168.0.1".to_string(), options.grpc.server_addr);
305 }
306
307 #[test]
308 fn test_read_from_config_file() {
309 let mut file = create_named_temp_file();
310 let toml_str = r#"
311 enable_memory_catalog = false
312 node_id = 42
313
314 [grpc]
315 addr = "127.0.0.1:3001"
316 hostname = "127.0.0.1"
317 runtime_size = 8
318
319 [heartbeat]
320 interval = "300ms"
321
322 [meta_client]
323 metasrv_addrs = ["127.0.0.1:3002"]
324 timeout = "3s"
325 connect_timeout = "5s"
326 ddl_timeout = "10s"
327 tcp_nodelay = true
328
329 [wal]
330 provider = "raft_engine"
331 dir = "/other/wal"
332 file_size = "1GB"
333 purge_threshold = "50GB"
334 purge_interval = "10m"
335 read_batch_size = 128
336 sync_write = false
337
338 [storage]
339 data_home = "./greptimedb_data/"
340 type = "File"
341
342 [[storage.providers]]
343 type = "Gcs"
344 bucket = "foo"
345 endpoint = "bar"
346
347 [[storage.providers]]
348 type = "S3"
349 bucket = "foo"
350
351 [logging]
352 level = "debug"
353 dir = "./greptimedb_data/test/logs"
354 "#;
355 write!(file, "{}", toml_str).unwrap();
356
357 let cmd = StartCommand {
358 config_file: Some(file.path().to_str().unwrap().to_string()),
359 ..Default::default()
360 };
361
362 let options = cmd.load_options(&Default::default()).unwrap().component;
363
364 assert_eq!("127.0.0.1:3001".to_string(), options.grpc.bind_addr);
365 assert_eq!(Some(42), options.node_id);
366
367 let DatanodeWalConfig::RaftEngine(raft_engine_config) = options.wal else {
368 unreachable!()
369 };
370 assert_eq!("/other/wal", raft_engine_config.dir.unwrap());
371 assert_eq!(Duration::from_secs(600), raft_engine_config.purge_interval);
372 assert_eq!(1024 * 1024 * 1024, raft_engine_config.file_size.0);
373 assert_eq!(
374 1024 * 1024 * 1024 * 50,
375 raft_engine_config.purge_threshold.0
376 );
377 assert!(!raft_engine_config.sync_write);
378
379 let HeartbeatOptions {
380 interval: heart_beat_interval,
381 ..
382 } = options.heartbeat;
383
384 assert_eq!(300, heart_beat_interval.as_millis());
385
386 let MetaClientOptions {
387 metasrv_addrs: metasrv_addr,
388 timeout,
389 connect_timeout,
390 ddl_timeout,
391 tcp_nodelay,
392 ..
393 } = options.meta_client.unwrap();
394
395 assert_eq!(vec!["127.0.0.1:3002".to_string()], metasrv_addr);
396 assert_eq!(5000, connect_timeout.as_millis());
397 assert_eq!(10000, ddl_timeout.as_millis());
398 assert_eq!(3000, timeout.as_millis());
399 assert!(tcp_nodelay);
400 assert_eq!("./greptimedb_data/", options.storage.data_home);
401 assert!(matches!(
402 &options.storage.store,
403 ObjectStoreConfig::File(FileConfig { .. })
404 ));
405 assert_eq!(options.storage.providers.len(), 2);
406 assert!(matches!(
407 options.storage.providers[0],
408 ObjectStoreConfig::Gcs(GcsConfig { .. })
409 ));
410 assert!(matches!(
411 options.storage.providers[1],
412 ObjectStoreConfig::S3(S3Config { .. })
413 ));
414
415 assert_eq!("debug", options.logging.level.unwrap());
416 assert_eq!(
417 "./greptimedb_data/test/logs".to_string(),
418 options.logging.dir
419 );
420 }
421
422 #[test]
423 fn test_try_from_cmd() {
424 assert!((StartCommand {
425 metasrv_addrs: Some(vec!["127.0.0.1:3002".to_string()]),
426 ..Default::default()
427 })
428 .load_options(&GlobalOptions::default())
429 .is_err());
430
431 assert!((StartCommand {
433 node_id: Some(42),
434 ..Default::default()
435 })
436 .load_options(&GlobalOptions::default())
437 .is_ok());
438 }
439
440 #[test]
441 fn test_load_log_options_from_cli() {
442 let mut cmd = StartCommand::default();
443
444 let result = cmd.load_options(&GlobalOptions {
445 log_dir: Some("./greptimedb_data/test/logs".to_string()),
446 log_level: Some("debug".to_string()),
447
448 #[cfg(feature = "tokio-console")]
449 tokio_console_addr: None,
450 });
451 assert_matches!(result, Err(crate::error::Error::MissingConfig { .. }));
453
454 cmd.node_id = Some(42);
455
456 let options = cmd
457 .load_options(&GlobalOptions {
458 log_dir: Some("./greptimedb_data/test/logs".to_string()),
459 log_level: Some("debug".to_string()),
460
461 #[cfg(feature = "tokio-console")]
462 tokio_console_addr: None,
463 })
464 .unwrap()
465 .component;
466
467 let logging_opt = options.logging;
468 assert_eq!("./greptimedb_data/test/logs", logging_opt.dir);
469 assert_eq!("debug", logging_opt.level.as_ref().unwrap());
470 }
471
472 #[test]
473 fn test_config_precedence_order() {
474 let mut file = create_named_temp_file();
475 let toml_str = r#"
476 enable_memory_catalog = false
477 node_id = 42
478 rpc_addr = "127.0.0.1:3001"
479 rpc_runtime_size = 8
480 rpc_hostname = "10.103.174.219"
481
482 [meta_client]
483 timeout = "3s"
484 connect_timeout = "5s"
485 tcp_nodelay = true
486
487 [wal]
488 provider = "raft_engine"
489 file_size = "1GB"
490 purge_threshold = "50GB"
491 purge_interval = "5m"
492 sync_write = false
493
494 [storage]
495 type = "File"
496 data_home = "./greptimedb_data/"
497
498 [logging]
499 level = "debug"
500 dir = "./greptimedb_data/test/logs"
501 "#;
502 write!(file, "{}", toml_str).unwrap();
503
504 let env_prefix = "DATANODE_UT";
505 temp_env::with_vars(
506 [
507 (
508 [
510 env_prefix.to_string(),
511 "wal".to_uppercase(),
512 "purge_interval".to_uppercase(),
513 ]
514 .join(ENV_VAR_SEP),
515 Some("1m"),
516 ),
517 (
518 [
520 env_prefix.to_string(),
521 "wal".to_uppercase(),
522 "read_batch_size".to_uppercase(),
523 ]
524 .join(ENV_VAR_SEP),
525 Some("100"),
526 ),
527 (
528 [
530 env_prefix.to_string(),
531 "meta_client".to_uppercase(),
532 "metasrv_addrs".to_uppercase(),
533 ]
534 .join(ENV_VAR_SEP),
535 Some("127.0.0.1:3001,127.0.0.1:3002,127.0.0.1:3003"),
536 ),
537 ],
538 || {
539 let command = StartCommand {
540 config_file: Some(file.path().to_str().unwrap().to_string()),
541 wal_dir: Some("/other/wal/dir".to_string()),
542 env_prefix: env_prefix.to_string(),
543 ..Default::default()
544 };
545
546 let opts = command.load_options(&Default::default()).unwrap().component;
547
548 let DatanodeWalConfig::RaftEngine(raft_engine_config) = opts.wal else {
550 unreachable!()
551 };
552 assert_eq!(raft_engine_config.read_batch_size, 100);
553 assert_eq!(
554 opts.meta_client.unwrap().metasrv_addrs,
555 vec![
556 "127.0.0.1:3001".to_string(),
557 "127.0.0.1:3002".to_string(),
558 "127.0.0.1:3003".to_string()
559 ]
560 );
561
562 assert_eq!(
564 raft_engine_config.purge_interval,
565 Duration::from_secs(60 * 5)
566 );
567
568 assert_eq!(raft_engine_config.dir.unwrap(), "/other/wal/dir");
570
571 assert_eq!(
573 opts.http.addr,
574 DatanodeOptions::default().component.http.addr
575 );
576 assert_eq!(opts.grpc.server_addr, "10.103.174.219");
577 },
578 );
579 }
580}