1use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use catalog::memory::MemoryCatalogManager;
22use common_base::Plugins;
23use common_error::ext::BoxedError;
24use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
25use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
26use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29use common_meta::wal_options_allocator::prepare_wal_options;
30pub use common_procedure::options::ProcedureConfig;
31use common_telemetry::{error, info, warn};
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use common_wal::config::DatanodeWalConfig;
35use file_engine::engine::FileRegionEngine;
36use futures_util::TryStreamExt;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{default_index_file, GlobalIndexCollector};
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::MitoEngine;
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::dummy_catalog::TableProviderFactoryRef;
47use query::QueryEngineFactory;
48use servers::export_metrics::ExportMetricsTask;
49use servers::server::ServerHandlers;
50use snafu::{ensure, OptionExt, ResultExt};
51use store_api::path_utils::{region_dir, WAL_DIR};
52use store_api::region_engine::{RegionEngineRef, RegionRole};
53use store_api::region_request::RegionOpenRequest;
54use store_api::storage::RegionId;
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60 self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61 MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
62 ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65 new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
66 RegionServerEventReceiver,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::region_server::{DummyTableProviderFactory, RegionServer};
71use crate::store::{self, new_object_store_without_cache};
72
73pub struct Datanode {
75 services: ServerHandlers,
76 heartbeat_task: Option<HeartbeatTask>,
77 region_event_receiver: Option<RegionServerEventReceiver>,
78 region_server: RegionServer,
79 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
80 leases_notifier: Option<Arc<Notify>>,
81 plugins: Plugins,
82 export_metrics_task: Option<ExportMetricsTask>,
83}
84
85impl Datanode {
86 pub async fn start(&mut self) -> Result<()> {
87 info!("Starting datanode instance...");
88
89 self.start_heartbeat().await?;
90 self.wait_coordinated().await;
91
92 self.start_telemetry();
93
94 if let Some(t) = self.export_metrics_task.as_ref() {
95 t.start(None).context(StartServerSnafu)?
96 }
97
98 self.services.start_all().await.context(StartServerSnafu)
99 }
100
101 pub fn server_handlers(&self) -> &ServerHandlers {
102 &self.services
103 }
104
105 pub fn start_telemetry(&self) {
106 if let Err(e) = self.greptimedb_telemetry_task.start() {
107 warn!(e; "Failed to start telemetry task!");
108 }
109 }
110
111 pub async fn start_heartbeat(&mut self) -> Result<()> {
112 if let Some(task) = &self.heartbeat_task {
113 let receiver = self.region_event_receiver.take().unwrap();
115
116 task.start(receiver, self.leases_notifier.clone()).await?;
117 }
118 Ok(())
119 }
120
121 pub async fn wait_coordinated(&mut self) {
123 if let Some(notifier) = self.leases_notifier.take() {
124 notifier.notified().await;
125 }
126 }
127
128 pub fn setup_services(&mut self, services: ServerHandlers) {
129 self.services = services;
130 }
131
132 pub async fn shutdown(&mut self) -> Result<()> {
133 self.services
134 .shutdown_all()
135 .await
136 .context(ShutdownServerSnafu)?;
137
138 let _ = self.greptimedb_telemetry_task.stop().await;
139 if let Some(heartbeat_task) = &self.heartbeat_task {
140 heartbeat_task
141 .close()
142 .map_err(BoxedError::new)
143 .context(ShutdownInstanceSnafu)?;
144 }
145 self.region_server.stop().await?;
146 Ok(())
147 }
148
149 pub fn region_server(&self) -> RegionServer {
150 self.region_server.clone()
151 }
152
153 pub fn plugins(&self) -> Plugins {
154 self.plugins.clone()
155 }
156}
157
158pub struct DatanodeBuilder {
159 opts: DatanodeOptions,
160 table_provider_factory: Option<TableProviderFactoryRef>,
161 plugins: Plugins,
162 meta_client: Option<MetaClientRef>,
163 kv_backend: KvBackendRef,
164 cache_registry: Option<Arc<LayeredCacheRegistry>>,
165}
166
167impl DatanodeBuilder {
168 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
169 Self {
170 opts,
171 table_provider_factory: None,
172 plugins,
173 meta_client: None,
174 kv_backend,
175 cache_registry: None,
176 }
177 }
178
179 pub fn options(&self) -> &DatanodeOptions {
180 &self.opts
181 }
182
183 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
184 self.meta_client = Some(client);
185 self
186 }
187
188 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
189 self.cache_registry = Some(registry);
190 self
191 }
192
193 pub fn kv_backend(&self) -> &KvBackendRef {
194 &self.kv_backend
195 }
196
197 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
198 self.table_provider_factory = Some(factory);
199 self
200 }
201
202 pub async fn build(mut self) -> Result<Datanode> {
203 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
204
205 let meta_client = self.meta_client.take();
206
207 let controlled_by_metasrv = meta_client.is_some();
211
212 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
214 let (tx, rx) = new_region_server_event_channel();
215 (Box::new(tx) as _, Some(rx))
216 } else {
217 (Box::new(NoopRegionServerEventListener) as _, None)
218 };
219
220 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
221 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
222 let table_id_schema_cache: TableSchemaCacheRef =
223 cache_registry.get().context(MissingCacheSnafu)?;
224
225 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
226 table_id_schema_cache,
227 schema_cache,
228 ));
229 let region_server = self
230 .new_region_server(schema_metadata_manager, region_event_listener)
231 .await?;
232
233 let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
234 let table_values = datanode_table_manager
235 .tables(node_id)
236 .try_collect::<Vec<_>>()
237 .await
238 .context(GetMetadataSnafu)?;
239
240 let open_all_regions = open_all_regions(
241 region_server.clone(),
242 table_values,
243 !controlled_by_metasrv,
244 self.opts.init_regions_parallelism,
245 );
246
247 if self.opts.init_regions_in_background {
248 common_runtime::spawn_global(async move {
250 if let Err(err) = open_all_regions.await {
251 error!(err; "Failed to open regions during the startup.");
252 }
253 });
254 } else {
255 open_all_regions.await?;
256 }
257
258 let heartbeat_task = if let Some(meta_client) = meta_client {
259 Some(
260 HeartbeatTask::try_new(
261 &self.opts,
262 region_server.clone(),
263 meta_client,
264 cache_registry,
265 self.plugins.clone(),
266 )
267 .await?,
268 )
269 } else {
270 None
271 };
272
273 let is_standalone = heartbeat_task.is_none();
274 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
275 Some(self.opts.storage.data_home.clone()),
276 is_standalone && self.opts.enable_telemetry,
277 )
278 .await;
279
280 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
281 Some(Arc::new(Notify::new()))
282 } else {
283 None
284 };
285
286 let export_metrics_task =
287 ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
288 .context(StartServerSnafu)?;
289
290 Ok(Datanode {
291 services: ServerHandlers::default(),
292 heartbeat_task,
293 region_server,
294 greptimedb_telemetry_task,
295 region_event_receiver,
296 leases_notifier,
297 plugins: self.plugins.clone(),
298 export_metrics_task,
299 })
300 }
301
302 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
304 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
305 let default_name = cfg.store.config_name();
306 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
307 for store in &cfg.providers {
308 object_store_manager.add(
309 store.config_name(),
310 store::new_object_store(store.clone(), &cfg.data_home).await?,
311 );
312 }
313 Ok(Arc::new(object_store_manager))
314 }
315
316 #[cfg(test)]
317 async fn initialize_region_server(
319 &self,
320 region_server: &RegionServer,
321 kv_backend: KvBackendRef,
322 open_with_writable: bool,
323 ) -> Result<()> {
324 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
325
326 let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
327 let table_values = datanode_table_manager
328 .tables(node_id)
329 .try_collect::<Vec<_>>()
330 .await
331 .context(GetMetadataSnafu)?;
332
333 open_all_regions(
334 region_server.clone(),
335 table_values,
336 open_with_writable,
337 self.opts.init_regions_parallelism,
338 )
339 .await
340 }
341
342 async fn new_region_server(
343 &self,
344 schema_metadata_manager: SchemaMetadataManagerRef,
345 event_listener: RegionServerEventListenerRef,
346 ) -> Result<RegionServer> {
347 let opts: &DatanodeOptions = &self.opts;
348
349 let query_engine_factory = QueryEngineFactory::new_with_plugins(
350 MemoryCatalogManager::with_default_setup(),
352 None,
353 None,
354 None,
355 None,
356 false,
357 self.plugins.clone(),
358 opts.query.clone(),
359 );
360 let query_engine = query_engine_factory.query_engine();
361
362 let table_provider_factory = self
363 .table_provider_factory
364 .clone()
365 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
366
367 let mut region_server = RegionServer::with_table_provider(
368 query_engine,
369 common_runtime::global_runtime(),
370 event_listener,
371 table_provider_factory,
372 opts.max_concurrent_queries,
373 Duration::from_millis(100),
375 );
376
377 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
378 let engines = Self::build_store_engines(
379 opts,
380 object_store_manager,
381 schema_metadata_manager,
382 self.plugins.clone(),
383 )
384 .await?;
385 for engine in engines {
386 region_server.register_engine(engine);
387 }
388
389 Ok(region_server)
390 }
391
392 async fn build_store_engines(
396 opts: &DatanodeOptions,
397 object_store_manager: ObjectStoreManagerRef,
398 schema_metadata_manager: SchemaMetadataManagerRef,
399 plugins: Plugins,
400 ) -> Result<Vec<RegionEngineRef>> {
401 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
402 let mut mito_engine_config = MitoConfig::default();
403 let mut file_engine_config = file_engine::config::EngineConfig::default();
404
405 for engine in &opts.region_engine {
406 match engine {
407 RegionEngineConfig::Mito(config) => {
408 mito_engine_config = config.clone();
409 }
410 RegionEngineConfig::File(config) => {
411 file_engine_config = config.clone();
412 }
413 RegionEngineConfig::Metric(metric_config) => {
414 metric_engine_config = metric_config.clone();
415 }
416 }
417 }
418
419 let mito_engine = Self::build_mito_engine(
420 opts,
421 object_store_manager.clone(),
422 mito_engine_config,
423 schema_metadata_manager.clone(),
424 plugins.clone(),
425 )
426 .await?;
427
428 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
429 .context(BuildMetricEngineSnafu)?;
430
431 let file_engine = FileRegionEngine::new(
432 file_engine_config,
433 object_store_manager.default_object_store().clone(), );
435
436 Ok(vec![
437 Arc::new(mito_engine) as _,
438 Arc::new(metric_engine) as _,
439 Arc::new(file_engine) as _,
440 ])
441 }
442
443 async fn build_mito_engine(
445 opts: &DatanodeOptions,
446 object_store_manager: ObjectStoreManagerRef,
447 mut config: MitoConfig,
448 schema_metadata_manager: SchemaMetadataManagerRef,
449 plugins: Plugins,
450 ) -> Result<MitoEngine> {
451 if opts.storage.is_object_storage() {
452 config.enable_write_cache = true;
454 info!("Configured 'enable_write_cache=true' for mito engine.");
455 }
456
457 let mito_engine = match &opts.wal {
458 DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
459 &opts.storage.data_home,
460 config,
461 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
462 .await?,
463 object_store_manager,
464 schema_metadata_manager,
465 plugins,
466 )
467 .await
468 .context(BuildMitoEngineSnafu)?,
469 DatanodeWalConfig::Kafka(kafka_config) => {
470 if kafka_config.create_index && opts.node_id.is_none() {
471 warn!("The WAL index creation only available in distributed mode.")
472 }
473 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
474 {
475 let operator = new_object_store_without_cache(
476 &opts.storage.store,
477 &opts.storage.data_home,
478 )
479 .await?;
480 let path = default_index_file(opts.node_id.unwrap());
481 Some(Self::build_global_index_collector(
482 kafka_config.dump_index_interval,
483 operator,
484 path,
485 ))
486 } else {
487 None
488 };
489
490 MitoEngine::new(
491 &opts.storage.data_home,
492 config,
493 Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
494 object_store_manager,
495 schema_metadata_manager,
496 plugins,
497 )
498 .await
499 .context(BuildMitoEngineSnafu)?
500 }
501 };
502 Ok(mito_engine)
503 }
504
505 async fn build_raft_engine_log_store(
507 data_home: &str,
508 config: &RaftEngineConfig,
509 ) -> Result<Arc<RaftEngineLogStore>> {
510 let data_home = normalize_dir(data_home);
511 let wal_dir = match &config.dir {
512 Some(dir) => dir.clone(),
513 None => format!("{}{WAL_DIR}", data_home),
514 };
515
516 fs::create_dir_all(Path::new(&wal_dir))
518 .await
519 .context(CreateDirSnafu { dir: &wal_dir })?;
520 info!(
521 "Creating raft-engine logstore with config: {:?} and storage path: {}",
522 config, &wal_dir
523 );
524 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
525 .await
526 .map_err(Box::new)
527 .context(OpenLogStoreSnafu)?;
528
529 Ok(Arc::new(logstore))
530 }
531
532 async fn build_kafka_log_store(
534 config: &DatanodeKafkaConfig,
535 global_index_collector: Option<GlobalIndexCollector>,
536 ) -> Result<Arc<KafkaLogStore>> {
537 KafkaLogStore::try_new(config, global_index_collector)
538 .await
539 .map_err(Box::new)
540 .context(OpenLogStoreSnafu)
541 .map(Arc::new)
542 }
543
544 fn build_global_index_collector(
546 dump_index_interval: Duration,
547 operator: object_store::ObjectStore,
548 path: String,
549 ) -> GlobalIndexCollector {
550 GlobalIndexCollector::new(dump_index_interval, operator, path)
551 }
552}
553
554async fn open_all_regions(
556 region_server: RegionServer,
557 table_values: Vec<DatanodeTableValue>,
558 open_with_writable: bool,
559 init_regions_parallelism: usize,
560) -> Result<()> {
561 let mut regions = vec![];
562 for table_value in table_values {
563 for region_number in table_value.regions {
564 let mut region_options = table_value.region_info.region_options.clone();
566 prepare_wal_options(
567 &mut region_options,
568 RegionId::new(table_value.table_id, region_number),
569 &table_value.region_info.region_wal_options,
570 );
571
572 regions.push((
573 RegionId::new(table_value.table_id, region_number),
574 table_value.region_info.engine.clone(),
575 table_value.region_info.region_storage_path.clone(),
576 region_options,
577 ));
578 }
579 }
580 let num_regions = regions.len();
581 info!("going to open {} region(s)", num_regions);
582
583 let mut region_requests = Vec::with_capacity(regions.len());
584 for (region_id, engine, store_path, options) in regions {
585 let region_dir = region_dir(&store_path, region_id);
586 region_requests.push((
587 region_id,
588 RegionOpenRequest {
589 engine,
590 region_dir,
591 options,
592 skip_wal_replay: false,
593 },
594 ));
595 }
596
597 let open_regions = region_server
598 .handle_batch_open_requests(init_regions_parallelism, region_requests)
599 .await?;
600 ensure!(
601 open_regions.len() == num_regions,
602 error::UnexpectedSnafu {
603 violated: format!(
604 "Expected to open {} of regions, only {} of regions has opened",
605 num_regions,
606 open_regions.len()
607 )
608 }
609 );
610
611 for region_id in open_regions {
612 if open_with_writable {
613 if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
614 error!(
615 e; "failed to convert region {region_id} to leader"
616 );
617 }
618 }
619 }
620 info!("all regions are opened");
621
622 Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627 use std::assert_matches::assert_matches;
628 use std::collections::{BTreeMap, HashMap};
629 use std::sync::Arc;
630
631 use cache::build_datanode_cache_registry;
632 use common_base::Plugins;
633 use common_meta::cache::LayeredCacheRegistryBuilder;
634 use common_meta::key::datanode_table::DatanodeTableManager;
635 use common_meta::kv_backend::memory::MemoryKvBackend;
636 use common_meta::kv_backend::KvBackendRef;
637 use mito2::engine::MITO_ENGINE_NAME;
638 use store_api::region_request::RegionRequest;
639 use store_api::storage::RegionId;
640
641 use crate::config::DatanodeOptions;
642 use crate::datanode::DatanodeBuilder;
643 use crate::tests::{mock_region_server, MockRegionEngine};
644
645 async fn setup_table_datanode(kv: &KvBackendRef) {
646 let mgr = DatanodeTableManager::new(kv.clone());
647 let txn = mgr
648 .build_create_txn(
649 1028,
650 MITO_ENGINE_NAME,
651 "foo/bar/weny",
652 HashMap::from([("foo".to_string(), "bar".to_string())]),
653 HashMap::default(),
654 BTreeMap::from([(0, vec![0, 1, 2])]),
655 )
656 .unwrap();
657
658 let r = kv.txn(txn).await.unwrap();
659 assert!(r.succeeded);
660 }
661
662 #[tokio::test]
663 async fn test_initialize_region_server() {
664 common_telemetry::init_default_ut_logging();
665 let mut mock_region_server = mock_region_server();
666 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
667
668 mock_region_server.register_engine(mock_region.clone());
669
670 let kv_backend = Arc::new(MemoryKvBackend::new());
671 let layered_cache_registry = Arc::new(
672 LayeredCacheRegistryBuilder::default()
673 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
674 .build(),
675 );
676
677 let mut builder = DatanodeBuilder::new(
678 DatanodeOptions {
679 node_id: Some(0),
680 ..Default::default()
681 },
682 Plugins::default(),
683 kv_backend,
684 );
685 builder.with_cache_registry(layered_cache_registry);
686
687 let kv = Arc::new(MemoryKvBackend::default()) as _;
688 setup_table_datanode(&kv).await;
689
690 builder
691 .initialize_region_server(&mock_region_server, kv.clone(), false)
692 .await
693 .unwrap();
694
695 for i in 0..3 {
696 let (region_id, req) = mock_region_handler.recv().await.unwrap();
697 assert_eq!(region_id, RegionId::new(1028, i));
698 if let RegionRequest::Open(req) = req {
699 assert_eq!(
700 req.options,
701 HashMap::from([("foo".to_string(), "bar".to_string())])
702 )
703 } else {
704 unreachable!()
705 }
706 }
707
708 assert_matches!(
709 mock_region_handler.try_recv(),
710 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
711 );
712 }
713}