1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_telemetry::{error, info, warn};
31use common_wal::config::DatanodeWalConfig;
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use file_engine::engine::FileRegionEngine;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::kafka::{GlobalIndexCollector, default_index_file};
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use meta_client::MetaClientRef;
39use metric_engine::engine::MetricEngine;
40use mito2::config::MitoConfig;
41use mito2::engine::{MitoEngine, MitoEngineBuilder};
42use mito2::region::opener::PartitionExprFetcherRef;
43use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::QueryEngineFactory;
47use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
48use servers::export_metrics::ExportMetricsTask;
49use servers::server::ServerHandlers;
50use snafu::{OptionExt, ResultExt, ensure};
51use store_api::path_utils::WAL_DIR;
52use store_api::region_engine::{
53 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
54};
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60 self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61 MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
62 ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
66 new_region_server_event_channel,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
71use crate::region_server::{DummyTableProviderFactory, RegionServer};
72use crate::store::{self, new_object_store_without_cache};
73use crate::utils::{RegionOpenRequests, build_region_open_requests};
74
75pub struct Datanode {
77 services: ServerHandlers,
78 heartbeat_task: Option<HeartbeatTask>,
79 region_event_receiver: Option<RegionServerEventReceiver>,
80 region_server: RegionServer,
81 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
82 leases_notifier: Option<Arc<Notify>>,
83 plugins: Plugins,
84 export_metrics_task: Option<ExportMetricsTask>,
85}
86
87impl Datanode {
88 pub async fn start(&mut self) -> Result<()> {
89 info!("Starting datanode instance...");
90
91 self.start_heartbeat().await?;
92 self.wait_coordinated().await;
93
94 self.start_telemetry();
95
96 if let Some(t) = self.export_metrics_task.as_ref() {
97 t.start(None).context(StartServerSnafu)?
98 }
99
100 self.services.start_all().await.context(StartServerSnafu)
101 }
102
103 pub fn server_handlers(&self) -> &ServerHandlers {
104 &self.services
105 }
106
107 pub fn start_telemetry(&self) {
108 if let Err(e) = self.greptimedb_telemetry_task.start() {
109 warn!(e; "Failed to start telemetry task!");
110 }
111 }
112
113 pub async fn start_heartbeat(&mut self) -> Result<()> {
114 if let Some(task) = &self.heartbeat_task {
115 let receiver = self.region_event_receiver.take().unwrap();
117
118 task.start(receiver, self.leases_notifier.clone()).await?;
119 }
120 Ok(())
121 }
122
123 pub async fn wait_coordinated(&mut self) {
125 if let Some(notifier) = self.leases_notifier.take() {
126 notifier.notified().await;
127 }
128 }
129
130 pub fn setup_services(&mut self, services: ServerHandlers) {
131 self.services = services;
132 }
133
134 pub async fn shutdown(&mut self) -> Result<()> {
135 self.services
136 .shutdown_all()
137 .await
138 .context(ShutdownServerSnafu)?;
139
140 let _ = self.greptimedb_telemetry_task.stop().await;
141 if let Some(heartbeat_task) = &self.heartbeat_task {
142 heartbeat_task
143 .close()
144 .map_err(BoxedError::new)
145 .context(ShutdownInstanceSnafu)?;
146 }
147 self.region_server.stop().await?;
148 Ok(())
149 }
150
151 pub fn region_server(&self) -> RegionServer {
152 self.region_server.clone()
153 }
154
155 pub fn plugins(&self) -> Plugins {
156 self.plugins.clone()
157 }
158}
159
160pub struct DatanodeBuilder {
161 opts: DatanodeOptions,
162 table_provider_factory: Option<TableProviderFactoryRef>,
163 plugins: Plugins,
164 meta_client: Option<MetaClientRef>,
165 kv_backend: KvBackendRef,
166 cache_registry: Option<Arc<LayeredCacheRegistry>>,
167 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
168 #[cfg(feature = "enterprise")]
169 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
170}
171
172impl DatanodeBuilder {
173 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
174 Self {
175 opts,
176 table_provider_factory: None,
177 plugins,
178 meta_client: None,
179 kv_backend,
180 cache_registry: None,
181 #[cfg(feature = "enterprise")]
182 extension_range_provider_factory: None,
183 topic_stats_reporter: None,
184 }
185 }
186
187 pub fn options(&self) -> &DatanodeOptions {
188 &self.opts
189 }
190
191 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
192 self.meta_client = Some(client);
193 self
194 }
195
196 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
197 self.cache_registry = Some(registry);
198 self
199 }
200
201 pub fn kv_backend(&self) -> &KvBackendRef {
202 &self.kv_backend
203 }
204
205 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
206 self.table_provider_factory = Some(factory);
207 self
208 }
209
210 #[cfg(feature = "enterprise")]
211 pub fn with_extension_range_provider(
212 &mut self,
213 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
214 ) -> &mut Self {
215 self.extension_range_provider_factory = Some(extension_range_provider_factory);
216 self
217 }
218
219 pub async fn build(mut self) -> Result<Datanode> {
220 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
221
222 let meta_client = self.meta_client.take();
223
224 let controlled_by_metasrv = meta_client.is_some();
228
229 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
231 let (tx, rx) = new_region_server_event_channel();
232 (Box::new(tx) as _, Some(rx))
233 } else {
234 (Box::new(NoopRegionServerEventListener) as _, None)
235 };
236
237 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
238 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
239 let table_id_schema_cache: TableSchemaCacheRef =
240 cache_registry.get().context(MissingCacheSnafu)?;
241
242 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
243 table_id_schema_cache,
244 schema_cache,
245 ));
246 let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
247 let region_server = self
248 .new_region_server(
249 schema_metadata_manager,
250 region_event_listener,
251 file_ref_manager,
252 )
253 .await?;
254
255 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
257 let is_recovery_mode = runtime_switch_manager
258 .recovery_mode()
259 .await
260 .context(GetMetadataSnafu)?;
261
262 let region_open_requests =
263 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
264 let open_all_regions = open_all_regions(
265 region_server.clone(),
266 region_open_requests,
267 !controlled_by_metasrv,
268 self.opts.init_regions_parallelism,
269 is_recovery_mode,
271 );
272
273 if self.opts.init_regions_in_background {
274 common_runtime::spawn_global(async move {
276 if let Err(err) = open_all_regions.await {
277 error!(err; "Failed to open regions during the startup.");
278 }
279 });
280 } else {
281 open_all_regions.await?;
282 }
283
284 let heartbeat_task = if let Some(meta_client) = meta_client {
285 Some(
286 HeartbeatTask::try_new(
287 &self.opts,
288 region_server.clone(),
289 meta_client,
290 cache_registry,
291 self.plugins.clone(),
292 )
293 .await?,
294 )
295 } else {
296 None
297 };
298
299 let is_standalone = heartbeat_task.is_none();
300 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
301 Some(self.opts.storage.data_home.clone()),
302 is_standalone && self.opts.enable_telemetry,
303 )
304 .await;
305
306 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
307 Some(Arc::new(Notify::new()))
308 } else {
309 None
310 };
311
312 let export_metrics_task =
313 ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
314 .context(StartServerSnafu)?;
315
316 Ok(Datanode {
317 services: ServerHandlers::default(),
318 heartbeat_task,
319 region_server,
320 greptimedb_telemetry_task,
321 region_event_receiver,
322 leases_notifier,
323 plugins: self.plugins.clone(),
324 export_metrics_task,
325 })
326 }
327
328 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
330 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
331 let default_name = cfg.store.config_name();
332 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
333 for store in &cfg.providers {
334 object_store_manager.add(
335 store.config_name(),
336 store::new_object_store(store.clone(), &cfg.data_home).await?,
337 );
338 }
339 Ok(Arc::new(object_store_manager))
340 }
341
342 #[cfg(test)]
343 async fn initialize_region_server(
345 &self,
346 region_server: &RegionServer,
347 open_with_writable: bool,
348 ) -> Result<()> {
349 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
350
351 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
353 let is_recovery_mode = runtime_switch_manager
354 .recovery_mode()
355 .await
356 .context(GetMetadataSnafu)?;
357 let region_open_requests =
358 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
359
360 open_all_regions(
361 region_server.clone(),
362 region_open_requests,
363 open_with_writable,
364 self.opts.init_regions_parallelism,
365 is_recovery_mode,
366 )
367 .await
368 }
369
370 async fn new_region_server(
371 &mut self,
372 schema_metadata_manager: SchemaMetadataManagerRef,
373 event_listener: RegionServerEventListenerRef,
374 file_ref_manager: FileReferenceManagerRef,
375 ) -> Result<RegionServer> {
376 let opts: &DatanodeOptions = &self.opts;
377
378 let query_engine_factory = QueryEngineFactory::new_with_plugins(
379 DummyCatalogManager::arc(),
381 None,
382 None,
383 None,
384 None,
385 None,
386 false,
387 self.plugins.clone(),
388 opts.query.clone(),
389 );
390 let query_engine = query_engine_factory.query_engine();
391
392 let table_provider_factory = self
393 .table_provider_factory
394 .clone()
395 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
396
397 let mut region_server = RegionServer::with_table_provider(
398 query_engine,
399 common_runtime::global_runtime(),
400 event_listener,
401 table_provider_factory,
402 opts.max_concurrent_queries,
403 Duration::from_millis(100),
405 opts.grpc.flight_compression,
406 );
407
408 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
409 let engines = self
410 .build_store_engines(
411 object_store_manager,
412 schema_metadata_manager,
413 file_ref_manager,
414 self.plugins.clone(),
415 )
416 .await?;
417 for engine in engines {
418 region_server.register_engine(engine);
419 }
420 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
421 region_server.set_topic_stats_reporter(topic_stats_reporter);
422 }
423
424 Ok(region_server)
425 }
426
427 async fn build_store_engines(
431 &mut self,
432 object_store_manager: ObjectStoreManagerRef,
433 schema_metadata_manager: SchemaMetadataManagerRef,
434 file_ref_manager: FileReferenceManagerRef,
435 plugins: Plugins,
436 ) -> Result<Vec<RegionEngineRef>> {
437 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
438 let mut mito_engine_config = MitoConfig::default();
439 let mut file_engine_config = file_engine::config::EngineConfig::default();
440
441 for engine in &self.opts.region_engine {
442 match engine {
443 RegionEngineConfig::Mito(config) => {
444 mito_engine_config = config.clone();
445 }
446 RegionEngineConfig::File(config) => {
447 file_engine_config = config.clone();
448 }
449 RegionEngineConfig::Metric(metric_config) => {
450 metric_engine_config = metric_config.clone();
451 }
452 }
453 }
454
455 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
457 let mito_engine = self
458 .build_mito_engine(
459 object_store_manager.clone(),
460 mito_engine_config,
461 schema_metadata_manager.clone(),
462 file_ref_manager.clone(),
463 fetcher.clone(),
464 plugins.clone(),
465 )
466 .await?;
467
468 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
469 .context(BuildMetricEngineSnafu)?;
470
471 let file_engine = FileRegionEngine::new(
472 file_engine_config,
473 object_store_manager.default_object_store().clone(), );
475
476 Ok(vec![
477 Arc::new(mito_engine) as _,
478 Arc::new(metric_engine) as _,
479 Arc::new(file_engine) as _,
480 ])
481 }
482
483 async fn build_mito_engine(
485 &mut self,
486 object_store_manager: ObjectStoreManagerRef,
487 mut config: MitoConfig,
488 schema_metadata_manager: SchemaMetadataManagerRef,
489 file_ref_manager: FileReferenceManagerRef,
490 partition_expr_fetcher: PartitionExprFetcherRef,
491 plugins: Plugins,
492 ) -> Result<MitoEngine> {
493 let opts = &self.opts;
494 if opts.storage.is_object_storage() {
495 config.enable_write_cache = true;
497 info!("Configured 'enable_write_cache=true' for mito engine.");
498 }
499
500 let mito_engine = match &opts.wal {
501 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
502 let log_store =
503 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
504 .await?;
505
506 let builder = MitoEngineBuilder::new(
507 &opts.storage.data_home,
508 config,
509 log_store,
510 object_store_manager,
511 schema_metadata_manager,
512 file_ref_manager,
513 partition_expr_fetcher.clone(),
514 plugins,
515 );
516
517 #[cfg(feature = "enterprise")]
518 let builder = builder.with_extension_range_provider_factory(
519 self.extension_range_provider_factory.take(),
520 );
521
522 builder.try_build().await.context(BuildMitoEngineSnafu)?
523 }
524 DatanodeWalConfig::Kafka(kafka_config) => {
525 if kafka_config.create_index && opts.node_id.is_none() {
526 warn!("The WAL index creation only available in distributed mode.")
527 }
528 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
529 {
530 let operator = new_object_store_without_cache(
531 &opts.storage.store,
532 &opts.storage.data_home,
533 )
534 .await?;
535 let path = default_index_file(opts.node_id.unwrap());
536 Some(Self::build_global_index_collector(
537 kafka_config.dump_index_interval,
538 operator,
539 path,
540 ))
541 } else {
542 None
543 };
544
545 let log_store =
546 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
547 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
548 let builder = MitoEngineBuilder::new(
549 &opts.storage.data_home,
550 config,
551 log_store,
552 object_store_manager,
553 schema_metadata_manager,
554 file_ref_manager,
555 partition_expr_fetcher,
556 plugins,
557 );
558
559 #[cfg(feature = "enterprise")]
560 let builder = builder.with_extension_range_provider_factory(
561 self.extension_range_provider_factory.take(),
562 );
563
564 builder.try_build().await.context(BuildMitoEngineSnafu)?
565 }
566 };
567 Ok(mito_engine)
568 }
569
570 async fn build_raft_engine_log_store(
572 data_home: &str,
573 config: &RaftEngineConfig,
574 ) -> Result<Arc<RaftEngineLogStore>> {
575 let data_home = normalize_dir(data_home);
576 let wal_dir = match &config.dir {
577 Some(dir) => dir.clone(),
578 None => format!("{}{WAL_DIR}", data_home),
579 };
580
581 fs::create_dir_all(Path::new(&wal_dir))
583 .await
584 .context(CreateDirSnafu { dir: &wal_dir })?;
585 info!(
586 "Creating raft-engine logstore with config: {:?} and storage path: {}",
587 config, &wal_dir
588 );
589 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
590 .await
591 .map_err(Box::new)
592 .context(OpenLogStoreSnafu)?;
593
594 Ok(Arc::new(logstore))
595 }
596
597 async fn build_kafka_log_store(
599 config: &DatanodeKafkaConfig,
600 global_index_collector: Option<GlobalIndexCollector>,
601 ) -> Result<Arc<KafkaLogStore>> {
602 KafkaLogStore::try_new(config, global_index_collector)
603 .await
604 .map_err(Box::new)
605 .context(OpenLogStoreSnafu)
606 .map(Arc::new)
607 }
608
609 fn build_global_index_collector(
611 dump_index_interval: Duration,
612 operator: object_store::ObjectStore,
613 path: String,
614 ) -> GlobalIndexCollector {
615 GlobalIndexCollector::new(dump_index_interval, operator, path)
616 }
617}
618
619async fn open_all_regions(
621 region_server: RegionServer,
622 region_open_requests: RegionOpenRequests,
623 open_with_writable: bool,
624 init_regions_parallelism: usize,
625 ignore_nonexistent_region: bool,
626) -> Result<()> {
627 let RegionOpenRequests {
628 leader_regions,
629 #[cfg(feature = "enterprise")]
630 follower_regions,
631 } = region_open_requests;
632
633 let leader_region_num = leader_regions.len();
634 info!("going to open {} region(s)", leader_region_num);
635 let now = Instant::now();
636 let open_regions = region_server
637 .handle_batch_open_requests(
638 init_regions_parallelism,
639 leader_regions,
640 ignore_nonexistent_region,
641 )
642 .await?;
643 info!(
644 "Opened {} regions in {:?}",
645 open_regions.len(),
646 now.elapsed()
647 );
648 if !ignore_nonexistent_region {
649 ensure!(
650 open_regions.len() == leader_region_num,
651 error::UnexpectedSnafu {
652 violated: format!(
653 "Expected to open {} of regions, only {} of regions has opened",
654 leader_region_num,
655 open_regions.len()
656 )
657 }
658 );
659 } else if open_regions.len() != leader_region_num {
660 warn!(
661 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
662 leader_region_num,
663 open_regions.len()
664 );
665 }
666
667 for region_id in open_regions {
668 if open_with_writable {
669 let res = region_server.set_region_role(region_id, RegionRole::Leader);
670 match res {
671 Ok(_) => {
672 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
674 .set_region_role_state_gracefully(
675 region_id,
676 SettableRegionRoleState::Leader,
677 )
678 .await?
679 {
680 error!(err; "failed to convert region {region_id} to leader");
681 }
682 }
683 Err(e) => {
684 error!(e; "failed to convert region {region_id} to leader");
685 }
686 }
687 }
688 }
689
690 #[cfg(feature = "enterprise")]
691 if !follower_regions.is_empty() {
692 use tokio::time::Instant;
693
694 let follower_region_num = follower_regions.len();
695 info!("going to open {} follower region(s)", follower_region_num);
696
697 let now = Instant::now();
698 let open_regions = region_server
699 .handle_batch_open_requests(
700 init_regions_parallelism,
701 follower_regions,
702 ignore_nonexistent_region,
703 )
704 .await?;
705 info!(
706 "Opened {} follower regions in {:?}",
707 open_regions.len(),
708 now.elapsed()
709 );
710
711 if !ignore_nonexistent_region {
712 ensure!(
713 open_regions.len() == follower_region_num,
714 error::UnexpectedSnafu {
715 violated: format!(
716 "Expected to open {} of follower regions, only {} of regions has opened",
717 follower_region_num,
718 open_regions.len()
719 )
720 }
721 );
722 } else if open_regions.len() != follower_region_num {
723 warn!(
724 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
725 follower_region_num,
726 open_regions.len()
727 );
728 }
729 }
730
731 info!("all regions are opened");
732
733 Ok(())
734}
735
736#[cfg(test)]
737mod tests {
738 use std::assert_matches::assert_matches;
739 use std::collections::{BTreeMap, HashMap};
740 use std::sync::Arc;
741
742 use cache::build_datanode_cache_registry;
743 use common_base::Plugins;
744 use common_meta::cache::LayeredCacheRegistryBuilder;
745 use common_meta::key::RegionRoleSet;
746 use common_meta::key::datanode_table::DatanodeTableManager;
747 use common_meta::kv_backend::KvBackendRef;
748 use common_meta::kv_backend::memory::MemoryKvBackend;
749 use mito2::engine::MITO_ENGINE_NAME;
750 use store_api::region_request::RegionRequest;
751 use store_api::storage::RegionId;
752
753 use crate::config::DatanodeOptions;
754 use crate::datanode::DatanodeBuilder;
755 use crate::tests::{MockRegionEngine, mock_region_server};
756
757 async fn setup_table_datanode(kv: &KvBackendRef) {
758 let mgr = DatanodeTableManager::new(kv.clone());
759 let txn = mgr
760 .build_create_txn(
761 1028,
762 MITO_ENGINE_NAME,
763 "foo/bar/weny",
764 HashMap::from([("foo".to_string(), "bar".to_string())]),
765 HashMap::default(),
766 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
767 )
768 .unwrap();
769
770 let r = kv.txn(txn).await.unwrap();
771 assert!(r.succeeded);
772 }
773
774 #[tokio::test]
775 async fn test_initialize_region_server() {
776 common_telemetry::init_default_ut_logging();
777 let mut mock_region_server = mock_region_server();
778 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
779
780 mock_region_server.register_engine(mock_region.clone());
781
782 let kv_backend = Arc::new(MemoryKvBackend::new());
783 let layered_cache_registry = Arc::new(
784 LayeredCacheRegistryBuilder::default()
785 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
786 .build(),
787 );
788
789 let mut builder = DatanodeBuilder::new(
790 DatanodeOptions {
791 node_id: Some(0),
792 ..Default::default()
793 },
794 Plugins::default(),
795 kv_backend.clone(),
796 );
797 builder.with_cache_registry(layered_cache_registry);
798 setup_table_datanode(&(kv_backend as _)).await;
799
800 builder
801 .initialize_region_server(&mock_region_server, false)
802 .await
803 .unwrap();
804
805 for i in 0..3 {
806 let (region_id, req) = mock_region_handler.recv().await.unwrap();
807 assert_eq!(region_id, RegionId::new(1028, i));
808 if let RegionRequest::Open(req) = req {
809 assert_eq!(
810 req.options,
811 HashMap::from([("foo".to_string(), "bar".to_string())])
812 )
813 } else {
814 unreachable!()
815 }
816 }
817
818 assert_matches!(
819 mock_region_handler.try_recv(),
820 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
821 );
822 }
823}