1use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use catalog::memory::MemoryCatalogManager;
22use common_base::Plugins;
23use common_error::ext::BoxedError;
24use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
25use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
26use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29use common_meta::wal_options_allocator::prepare_wal_options;
30pub use common_procedure::options::ProcedureConfig;
31use common_telemetry::{error, info, warn};
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use common_wal::config::DatanodeWalConfig;
35use file_engine::engine::FileRegionEngine;
36use futures_util::TryStreamExt;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{default_index_file, GlobalIndexCollector};
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::MitoEngine;
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::QueryEngineFactory;
47use servers::export_metrics::ExportMetricsTask;
48use servers::server::ServerHandlers;
49use servers::Mode;
50use snafu::{ensure, OptionExt, ResultExt};
51use store_api::path_utils::{region_dir, WAL_DIR};
52use store_api::region_engine::{RegionEngineRef, RegionRole};
53use store_api::region_request::RegionOpenRequest;
54use store_api::storage::RegionId;
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60 self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61 MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
62 ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65 new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
66 RegionServerEventReceiver,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::region_server::{DummyTableProviderFactory, RegionServer};
71use crate::store::{self, new_object_store_without_cache};
72
73pub struct Datanode {
75 services: ServerHandlers,
76 heartbeat_task: Option<HeartbeatTask>,
77 region_event_receiver: Option<RegionServerEventReceiver>,
78 region_server: RegionServer,
79 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
80 leases_notifier: Option<Arc<Notify>>,
81 plugins: Plugins,
82 export_metrics_task: Option<ExportMetricsTask>,
83}
84
85impl Datanode {
86 pub async fn start(&mut self) -> Result<()> {
87 info!("Starting datanode instance...");
88
89 self.start_heartbeat().await?;
90 self.wait_coordinated().await;
91
92 self.start_telemetry();
93
94 if let Some(t) = self.export_metrics_task.as_ref() {
95 t.start(None).context(StartServerSnafu)?
96 }
97
98 self.services.start_all().await.context(StartServerSnafu)
99 }
100
101 pub fn server_handlers(&self) -> &ServerHandlers {
102 &self.services
103 }
104
105 pub fn start_telemetry(&self) {
106 if let Err(e) = self.greptimedb_telemetry_task.start() {
107 warn!(e; "Failed to start telemetry task!");
108 }
109 }
110
111 pub async fn start_heartbeat(&mut self) -> Result<()> {
112 if let Some(task) = &self.heartbeat_task {
113 let receiver = self.region_event_receiver.take().unwrap();
115
116 task.start(receiver, self.leases_notifier.clone()).await?;
117 }
118 Ok(())
119 }
120
121 pub async fn wait_coordinated(&mut self) {
123 if let Some(notifier) = self.leases_notifier.take() {
124 notifier.notified().await;
125 }
126 }
127
128 pub fn setup_services(&mut self, services: ServerHandlers) {
129 self.services = services;
130 }
131
132 pub async fn shutdown(&self) -> Result<()> {
133 self.services
134 .shutdown_all()
135 .await
136 .context(ShutdownServerSnafu)?;
137
138 let _ = self.greptimedb_telemetry_task.stop().await;
139 if let Some(heartbeat_task) = &self.heartbeat_task {
140 heartbeat_task
141 .close()
142 .map_err(BoxedError::new)
143 .context(ShutdownInstanceSnafu)?;
144 }
145 self.region_server.stop().await?;
146 Ok(())
147 }
148
149 pub fn region_server(&self) -> RegionServer {
150 self.region_server.clone()
151 }
152
153 pub fn plugins(&self) -> Plugins {
154 self.plugins.clone()
155 }
156}
157
158pub struct DatanodeBuilder {
159 opts: DatanodeOptions,
160 mode: Mode,
161 plugins: Plugins,
162 meta_client: Option<MetaClientRef>,
163 kv_backend: Option<KvBackendRef>,
164 cache_registry: Option<Arc<LayeredCacheRegistry>>,
165}
166
167impl DatanodeBuilder {
168 pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
171 Self {
172 opts,
173 mode,
174 plugins,
175 meta_client: None,
176 kv_backend: None,
177 cache_registry: None,
178 }
179 }
180
181 pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
182 Self {
183 meta_client: Some(meta_client),
184 ..self
185 }
186 }
187
188 pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
189 Self {
190 cache_registry: Some(cache_registry),
191 ..self
192 }
193 }
194
195 pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
196 Self {
197 kv_backend: Some(kv_backend),
198 ..self
199 }
200 }
201
202 pub async fn build(mut self) -> Result<Datanode> {
203 let mode = &self.mode;
204 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
205
206 let meta_client = self.meta_client.take();
207
208 let controlled_by_metasrv = meta_client.is_some();
212
213 let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
214
215 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
217 let (tx, rx) = new_region_server_event_channel();
218 (Box::new(tx) as _, Some(rx))
219 } else {
220 (Box::new(NoopRegionServerEventListener) as _, None)
221 };
222
223 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
224 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
225 let table_id_schema_cache: TableSchemaCacheRef =
226 cache_registry.get().context(MissingCacheSnafu)?;
227
228 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
229 table_id_schema_cache,
230 schema_cache,
231 ));
232 let region_server = self
233 .new_region_server(schema_metadata_manager, region_event_listener)
234 .await?;
235
236 let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
237 let table_values = datanode_table_manager
238 .tables(node_id)
239 .try_collect::<Vec<_>>()
240 .await
241 .context(GetMetadataSnafu)?;
242
243 let open_all_regions = open_all_regions(
244 region_server.clone(),
245 table_values,
246 !controlled_by_metasrv,
247 self.opts.init_regions_parallelism,
248 );
249
250 if self.opts.init_regions_in_background {
251 common_runtime::spawn_global(async move {
253 if let Err(err) = open_all_regions.await {
254 error!(err; "Failed to open regions during the startup.");
255 }
256 });
257 } else {
258 open_all_regions.await?;
259 }
260
261 let heartbeat_task = if let Some(meta_client) = meta_client {
262 Some(
263 HeartbeatTask::try_new(
264 &self.opts,
265 region_server.clone(),
266 meta_client,
267 cache_registry,
268 self.plugins.clone(),
269 )
270 .await?,
271 )
272 } else {
273 None
274 };
275
276 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
277 Some(self.opts.storage.data_home.clone()),
278 mode,
279 self.opts.enable_telemetry,
280 )
281 .await;
282
283 let leases_notifier =
284 if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
285 Some(Arc::new(Notify::new()))
286 } else {
287 None
288 };
289
290 let export_metrics_task =
291 ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
292 .context(StartServerSnafu)?;
293
294 Ok(Datanode {
295 services: ServerHandlers::default(),
296 heartbeat_task,
297 region_server,
298 greptimedb_telemetry_task,
299 region_event_receiver,
300 leases_notifier,
301 plugins: self.plugins.clone(),
302 export_metrics_task,
303 })
304 }
305
306 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
308 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
309 let default_name = cfg.store.config_name();
310 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
311 for store in &cfg.providers {
312 object_store_manager.add(
313 store.config_name(),
314 store::new_object_store(store.clone(), &cfg.data_home).await?,
315 );
316 }
317 Ok(Arc::new(object_store_manager))
318 }
319
320 #[cfg(test)]
321 async fn initialize_region_server(
323 &self,
324 region_server: &RegionServer,
325 kv_backend: KvBackendRef,
326 open_with_writable: bool,
327 ) -> Result<()> {
328 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
329
330 let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
331 let table_values = datanode_table_manager
332 .tables(node_id)
333 .try_collect::<Vec<_>>()
334 .await
335 .context(GetMetadataSnafu)?;
336
337 open_all_regions(
338 region_server.clone(),
339 table_values,
340 open_with_writable,
341 self.opts.init_regions_parallelism,
342 )
343 .await
344 }
345
346 async fn new_region_server(
347 &self,
348 schema_metadata_manager: SchemaMetadataManagerRef,
349 event_listener: RegionServerEventListenerRef,
350 ) -> Result<RegionServer> {
351 let opts: &DatanodeOptions = &self.opts;
352
353 let query_engine_factory = QueryEngineFactory::new_with_plugins(
354 MemoryCatalogManager::with_default_setup(),
356 None,
357 None,
358 None,
359 None,
360 false,
361 self.plugins.clone(),
362 opts.query.clone(),
363 );
364 let query_engine = query_engine_factory.query_engine();
365
366 let table_provider_factory = Arc::new(DummyTableProviderFactory);
367 let mut region_server = RegionServer::with_table_provider(
368 query_engine,
369 common_runtime::global_runtime(),
370 event_listener,
371 table_provider_factory,
372 opts.max_concurrent_queries,
373 Duration::from_millis(100),
375 );
376
377 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
378 let engines = Self::build_store_engines(
379 opts,
380 object_store_manager,
381 schema_metadata_manager,
382 self.plugins.clone(),
383 )
384 .await?;
385 for engine in engines {
386 region_server.register_engine(engine);
387 }
388
389 Ok(region_server)
390 }
391
392 async fn build_store_engines(
396 opts: &DatanodeOptions,
397 object_store_manager: ObjectStoreManagerRef,
398 schema_metadata_manager: SchemaMetadataManagerRef,
399 plugins: Plugins,
400 ) -> Result<Vec<RegionEngineRef>> {
401 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
402 let mut mito_engine_config = MitoConfig::default();
403 let mut file_engine_config = file_engine::config::EngineConfig::default();
404
405 for engine in &opts.region_engine {
406 match engine {
407 RegionEngineConfig::Mito(config) => {
408 mito_engine_config = config.clone();
409 }
410 RegionEngineConfig::File(config) => {
411 file_engine_config = config.clone();
412 }
413 RegionEngineConfig::Metric(metric_config) => {
414 metric_engine_config = metric_config.clone();
415 }
416 }
417 }
418
419 let mito_engine = Self::build_mito_engine(
420 opts,
421 object_store_manager.clone(),
422 mito_engine_config,
423 schema_metadata_manager.clone(),
424 plugins.clone(),
425 )
426 .await?;
427
428 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
429 .context(BuildMetricEngineSnafu)?;
430
431 let file_engine = FileRegionEngine::new(
432 file_engine_config,
433 object_store_manager.default_object_store().clone(), );
435
436 Ok(vec![
437 Arc::new(mito_engine) as _,
438 Arc::new(metric_engine) as _,
439 Arc::new(file_engine) as _,
440 ])
441 }
442
443 async fn build_mito_engine(
445 opts: &DatanodeOptions,
446 object_store_manager: ObjectStoreManagerRef,
447 mut config: MitoConfig,
448 schema_metadata_manager: SchemaMetadataManagerRef,
449 plugins: Plugins,
450 ) -> Result<MitoEngine> {
451 if opts.storage.is_object_storage() {
452 config.enable_write_cache = true;
454 info!("Configured 'enable_write_cache=true' for mito engine.");
455 }
456
457 let mito_engine = match &opts.wal {
458 DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
459 &opts.storage.data_home,
460 config,
461 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
462 .await?,
463 object_store_manager,
464 schema_metadata_manager,
465 plugins,
466 )
467 .await
468 .context(BuildMitoEngineSnafu)?,
469 DatanodeWalConfig::Kafka(kafka_config) => {
470 if kafka_config.create_index && opts.node_id.is_none() {
471 warn!("The WAL index creation only available in distributed mode.")
472 }
473 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
474 {
475 let operator = new_object_store_without_cache(
476 &opts.storage.store,
477 &opts.storage.data_home,
478 )
479 .await?;
480 let path = default_index_file(opts.node_id.unwrap());
481 Some(Self::build_global_index_collector(
482 kafka_config.dump_index_interval,
483 operator,
484 path,
485 ))
486 } else {
487 None
488 };
489
490 MitoEngine::new(
491 &opts.storage.data_home,
492 config,
493 Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
494 object_store_manager,
495 schema_metadata_manager,
496 plugins,
497 )
498 .await
499 .context(BuildMitoEngineSnafu)?
500 }
501 };
502 Ok(mito_engine)
503 }
504
505 async fn build_raft_engine_log_store(
507 data_home: &str,
508 config: &RaftEngineConfig,
509 ) -> Result<Arc<RaftEngineLogStore>> {
510 let data_home = normalize_dir(data_home);
511 let wal_dir = match &config.dir {
512 Some(dir) => dir.clone(),
513 None => format!("{}{WAL_DIR}", data_home),
514 };
515
516 fs::create_dir_all(Path::new(&wal_dir))
518 .await
519 .context(CreateDirSnafu { dir: &wal_dir })?;
520 info!(
521 "Creating raft-engine logstore with config: {:?} and storage path: {}",
522 config, &wal_dir
523 );
524 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
525 .await
526 .map_err(Box::new)
527 .context(OpenLogStoreSnafu)?;
528
529 Ok(Arc::new(logstore))
530 }
531
532 async fn build_kafka_log_store(
534 config: &DatanodeKafkaConfig,
535 global_index_collector: Option<GlobalIndexCollector>,
536 ) -> Result<Arc<KafkaLogStore>> {
537 KafkaLogStore::try_new(config, global_index_collector)
538 .await
539 .map_err(Box::new)
540 .context(OpenLogStoreSnafu)
541 .map(Arc::new)
542 }
543
544 fn build_global_index_collector(
546 dump_index_interval: Duration,
547 operator: object_store::ObjectStore,
548 path: String,
549 ) -> GlobalIndexCollector {
550 GlobalIndexCollector::new(dump_index_interval, operator, path)
551 }
552}
553
554async fn open_all_regions(
556 region_server: RegionServer,
557 table_values: Vec<DatanodeTableValue>,
558 open_with_writable: bool,
559 init_regions_parallelism: usize,
560) -> Result<()> {
561 let mut regions = vec![];
562 for table_value in table_values {
563 for region_number in table_value.regions {
564 let mut region_options = table_value.region_info.region_options.clone();
566 prepare_wal_options(
567 &mut region_options,
568 RegionId::new(table_value.table_id, region_number),
569 &table_value.region_info.region_wal_options,
570 );
571
572 regions.push((
573 RegionId::new(table_value.table_id, region_number),
574 table_value.region_info.engine.clone(),
575 table_value.region_info.region_storage_path.clone(),
576 region_options,
577 ));
578 }
579 }
580 let num_regions = regions.len();
581 info!("going to open {} region(s)", num_regions);
582
583 let mut region_requests = Vec::with_capacity(regions.len());
584 for (region_id, engine, store_path, options) in regions {
585 let region_dir = region_dir(&store_path, region_id);
586 region_requests.push((
587 region_id,
588 RegionOpenRequest {
589 engine,
590 region_dir,
591 options,
592 skip_wal_replay: false,
593 },
594 ));
595 }
596
597 let open_regions = region_server
598 .handle_batch_open_requests(init_regions_parallelism, region_requests)
599 .await?;
600 ensure!(
601 open_regions.len() == num_regions,
602 error::UnexpectedSnafu {
603 violated: format!(
604 "Expected to open {} of regions, only {} of regions has opened",
605 num_regions,
606 open_regions.len()
607 )
608 }
609 );
610
611 for region_id in open_regions {
612 if open_with_writable {
613 if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
614 error!(
615 e; "failed to convert region {region_id} to leader"
616 );
617 }
618 }
619 }
620 info!("all regions are opened");
621
622 Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627 use std::assert_matches::assert_matches;
628 use std::collections::{BTreeMap, HashMap};
629 use std::sync::Arc;
630
631 use cache::build_datanode_cache_registry;
632 use common_base::Plugins;
633 use common_meta::cache::LayeredCacheRegistryBuilder;
634 use common_meta::key::datanode_table::DatanodeTableManager;
635 use common_meta::kv_backend::memory::MemoryKvBackend;
636 use common_meta::kv_backend::KvBackendRef;
637 use mito2::engine::MITO_ENGINE_NAME;
638 use servers::Mode;
639 use store_api::region_request::RegionRequest;
640 use store_api::storage::RegionId;
641
642 use crate::config::DatanodeOptions;
643 use crate::datanode::DatanodeBuilder;
644 use crate::tests::{mock_region_server, MockRegionEngine};
645
646 async fn setup_table_datanode(kv: &KvBackendRef) {
647 let mgr = DatanodeTableManager::new(kv.clone());
648 let txn = mgr
649 .build_create_txn(
650 1028,
651 MITO_ENGINE_NAME,
652 "foo/bar/weny",
653 HashMap::from([("foo".to_string(), "bar".to_string())]),
654 HashMap::default(),
655 BTreeMap::from([(0, vec![0, 1, 2])]),
656 )
657 .unwrap();
658
659 let r = kv.txn(txn).await.unwrap();
660 assert!(r.succeeded);
661 }
662
663 #[tokio::test]
664 async fn test_initialize_region_server() {
665 common_telemetry::init_default_ut_logging();
666 let mut mock_region_server = mock_region_server();
667 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
668
669 mock_region_server.register_engine(mock_region.clone());
670
671 let kv_backend = Arc::new(MemoryKvBackend::new());
672 let layered_cache_registry = Arc::new(
673 LayeredCacheRegistryBuilder::default()
674 .add_cache_registry(build_datanode_cache_registry(kv_backend))
675 .build(),
676 );
677
678 let builder = DatanodeBuilder::new(
679 DatanodeOptions {
680 node_id: Some(0),
681 ..Default::default()
682 },
683 Plugins::default(),
684 Mode::Standalone,
685 )
686 .with_cache_registry(layered_cache_registry);
687
688 let kv = Arc::new(MemoryKvBackend::default()) as _;
689 setup_table_datanode(&kv).await;
690
691 builder
692 .initialize_region_server(&mock_region_server, kv.clone(), false)
693 .await
694 .unwrap();
695
696 for i in 0..3 {
697 let (region_id, req) = mock_region_handler.recv().await.unwrap();
698 assert_eq!(region_id, RegionId::new(1028, i));
699 if let RegionRequest::Open(req) = req {
700 assert_eq!(
701 req.options,
702 HashMap::from([("foo".to_string(), "bar".to_string())])
703 )
704 } else {
705 unreachable!()
706 }
707 }
708
709 assert_matches!(
710 mock_region_handler.try_recv(),
711 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
712 );
713 }
714}