1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_telemetry::{error, info, warn};
31use common_wal::config::kafka::DatanodeKafkaConfig;
32use common_wal::config::raft_engine::RaftEngineConfig;
33use common_wal::config::DatanodeWalConfig;
34use file_engine::engine::FileRegionEngine;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::kafka::{default_index_file, GlobalIndexCollector};
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use meta_client::MetaClientRef;
39use metric_engine::engine::MetricEngine;
40use mito2::config::MitoConfig;
41use mito2::engine::{MitoEngine, MitoEngineBuilder};
42use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
43use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
44use object_store::util::normalize_dir;
45use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
46use query::QueryEngineFactory;
47use servers::export_metrics::ExportMetricsTask;
48use servers::server::ServerHandlers;
49use snafu::{ensure, OptionExt, ResultExt};
50use store_api::path_utils::WAL_DIR;
51use store_api::region_engine::{RegionEngineRef, RegionRole};
52use tokio::fs;
53use tokio::sync::Notify;
54
55use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
56use crate::error::{
57 self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
58 MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
59 ShutdownServerSnafu, StartServerSnafu,
60};
61use crate::event_listener::{
62 new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
63 RegionServerEventReceiver,
64};
65use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
66use crate::heartbeat::HeartbeatTask;
67use crate::region_server::{DummyTableProviderFactory, RegionServer};
68use crate::store::{self, new_object_store_without_cache};
69use crate::utils::{build_region_open_requests, RegionOpenRequests};
70
71pub struct Datanode {
73 services: ServerHandlers,
74 heartbeat_task: Option<HeartbeatTask>,
75 region_event_receiver: Option<RegionServerEventReceiver>,
76 region_server: RegionServer,
77 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
78 leases_notifier: Option<Arc<Notify>>,
79 plugins: Plugins,
80 export_metrics_task: Option<ExportMetricsTask>,
81}
82
83impl Datanode {
84 pub async fn start(&mut self) -> Result<()> {
85 info!("Starting datanode instance...");
86
87 self.start_heartbeat().await?;
88 self.wait_coordinated().await;
89
90 self.start_telemetry();
91
92 if let Some(t) = self.export_metrics_task.as_ref() {
93 t.start(None).context(StartServerSnafu)?
94 }
95
96 self.services.start_all().await.context(StartServerSnafu)
97 }
98
99 pub fn server_handlers(&self) -> &ServerHandlers {
100 &self.services
101 }
102
103 pub fn start_telemetry(&self) {
104 if let Err(e) = self.greptimedb_telemetry_task.start() {
105 warn!(e; "Failed to start telemetry task!");
106 }
107 }
108
109 pub async fn start_heartbeat(&mut self) -> Result<()> {
110 if let Some(task) = &self.heartbeat_task {
111 let receiver = self.region_event_receiver.take().unwrap();
113
114 task.start(receiver, self.leases_notifier.clone()).await?;
115 }
116 Ok(())
117 }
118
119 pub async fn wait_coordinated(&mut self) {
121 if let Some(notifier) = self.leases_notifier.take() {
122 notifier.notified().await;
123 }
124 }
125
126 pub fn setup_services(&mut self, services: ServerHandlers) {
127 self.services = services;
128 }
129
130 pub async fn shutdown(&mut self) -> Result<()> {
131 self.services
132 .shutdown_all()
133 .await
134 .context(ShutdownServerSnafu)?;
135
136 let _ = self.greptimedb_telemetry_task.stop().await;
137 if let Some(heartbeat_task) = &self.heartbeat_task {
138 heartbeat_task
139 .close()
140 .map_err(BoxedError::new)
141 .context(ShutdownInstanceSnafu)?;
142 }
143 self.region_server.stop().await?;
144 Ok(())
145 }
146
147 pub fn region_server(&self) -> RegionServer {
148 self.region_server.clone()
149 }
150
151 pub fn plugins(&self) -> Plugins {
152 self.plugins.clone()
153 }
154}
155
156pub struct DatanodeBuilder {
157 opts: DatanodeOptions,
158 table_provider_factory: Option<TableProviderFactoryRef>,
159 plugins: Plugins,
160 meta_client: Option<MetaClientRef>,
161 kv_backend: KvBackendRef,
162 cache_registry: Option<Arc<LayeredCacheRegistry>>,
163 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
164 #[cfg(feature = "enterprise")]
165 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
166}
167
168impl DatanodeBuilder {
169 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
170 Self {
171 opts,
172 table_provider_factory: None,
173 plugins,
174 meta_client: None,
175 kv_backend,
176 cache_registry: None,
177 #[cfg(feature = "enterprise")]
178 extension_range_provider_factory: None,
179 topic_stats_reporter: None,
180 }
181 }
182
183 pub fn options(&self) -> &DatanodeOptions {
184 &self.opts
185 }
186
187 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
188 self.meta_client = Some(client);
189 self
190 }
191
192 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
193 self.cache_registry = Some(registry);
194 self
195 }
196
197 pub fn kv_backend(&self) -> &KvBackendRef {
198 &self.kv_backend
199 }
200
201 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
202 self.table_provider_factory = Some(factory);
203 self
204 }
205
206 #[cfg(feature = "enterprise")]
207 pub fn with_extension_range_provider(
208 &mut self,
209 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
210 ) -> &mut Self {
211 self.extension_range_provider_factory = Some(extension_range_provider_factory);
212 self
213 }
214
215 pub async fn build(mut self) -> Result<Datanode> {
216 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
217
218 let meta_client = self.meta_client.take();
219
220 let controlled_by_metasrv = meta_client.is_some();
224
225 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
227 let (tx, rx) = new_region_server_event_channel();
228 (Box::new(tx) as _, Some(rx))
229 } else {
230 (Box::new(NoopRegionServerEventListener) as _, None)
231 };
232
233 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
234 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
235 let table_id_schema_cache: TableSchemaCacheRef =
236 cache_registry.get().context(MissingCacheSnafu)?;
237
238 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
239 table_id_schema_cache,
240 schema_cache,
241 ));
242 let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
243 let region_server = self
244 .new_region_server(
245 schema_metadata_manager,
246 region_event_listener,
247 file_ref_manager,
248 )
249 .await?;
250
251 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
253 let is_recovery_mode = runtime_switch_manager
254 .recovery_mode()
255 .await
256 .context(GetMetadataSnafu)?;
257
258 let region_open_requests =
259 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
260 let open_all_regions = open_all_regions(
261 region_server.clone(),
262 region_open_requests,
263 !controlled_by_metasrv,
264 self.opts.init_regions_parallelism,
265 is_recovery_mode,
267 );
268
269 if self.opts.init_regions_in_background {
270 common_runtime::spawn_global(async move {
272 if let Err(err) = open_all_regions.await {
273 error!(err; "Failed to open regions during the startup.");
274 }
275 });
276 } else {
277 open_all_regions.await?;
278 }
279
280 let heartbeat_task = if let Some(meta_client) = meta_client {
281 Some(
282 HeartbeatTask::try_new(
283 &self.opts,
284 region_server.clone(),
285 meta_client,
286 cache_registry,
287 self.plugins.clone(),
288 )
289 .await?,
290 )
291 } else {
292 None
293 };
294
295 let is_standalone = heartbeat_task.is_none();
296 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
297 Some(self.opts.storage.data_home.clone()),
298 is_standalone && self.opts.enable_telemetry,
299 )
300 .await;
301
302 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
303 Some(Arc::new(Notify::new()))
304 } else {
305 None
306 };
307
308 let export_metrics_task =
309 ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
310 .context(StartServerSnafu)?;
311
312 Ok(Datanode {
313 services: ServerHandlers::default(),
314 heartbeat_task,
315 region_server,
316 greptimedb_telemetry_task,
317 region_event_receiver,
318 leases_notifier,
319 plugins: self.plugins.clone(),
320 export_metrics_task,
321 })
322 }
323
324 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
326 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
327 let default_name = cfg.store.config_name();
328 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
329 for store in &cfg.providers {
330 object_store_manager.add(
331 store.config_name(),
332 store::new_object_store(store.clone(), &cfg.data_home).await?,
333 );
334 }
335 Ok(Arc::new(object_store_manager))
336 }
337
338 #[cfg(test)]
339 async fn initialize_region_server(
341 &self,
342 region_server: &RegionServer,
343 open_with_writable: bool,
344 ) -> Result<()> {
345 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
346
347 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
349 let is_recovery_mode = runtime_switch_manager
350 .recovery_mode()
351 .await
352 .context(GetMetadataSnafu)?;
353 let region_open_requests =
354 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
355
356 open_all_regions(
357 region_server.clone(),
358 region_open_requests,
359 open_with_writable,
360 self.opts.init_regions_parallelism,
361 is_recovery_mode,
362 )
363 .await
364 }
365
366 async fn new_region_server(
367 &mut self,
368 schema_metadata_manager: SchemaMetadataManagerRef,
369 event_listener: RegionServerEventListenerRef,
370 file_ref_manager: FileReferenceManagerRef,
371 ) -> Result<RegionServer> {
372 let opts: &DatanodeOptions = &self.opts;
373
374 let query_engine_factory = QueryEngineFactory::new_with_plugins(
375 DummyCatalogManager::arc(),
377 None,
378 None,
379 None,
380 None,
381 None,
382 false,
383 self.plugins.clone(),
384 opts.query.clone(),
385 );
386 let query_engine = query_engine_factory.query_engine();
387
388 let table_provider_factory = self
389 .table_provider_factory
390 .clone()
391 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
392
393 let mut region_server = RegionServer::with_table_provider(
394 query_engine,
395 common_runtime::global_runtime(),
396 event_listener,
397 table_provider_factory,
398 opts.max_concurrent_queries,
399 Duration::from_millis(100),
401 opts.grpc.flight_compression,
402 );
403
404 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
405 let engines = self
406 .build_store_engines(
407 object_store_manager,
408 schema_metadata_manager,
409 file_ref_manager,
410 self.plugins.clone(),
411 )
412 .await?;
413 for engine in engines {
414 region_server.register_engine(engine);
415 }
416 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
417 region_server.set_topic_stats_reporter(topic_stats_reporter);
418 }
419
420 Ok(region_server)
421 }
422
423 async fn build_store_engines(
427 &mut self,
428 object_store_manager: ObjectStoreManagerRef,
429 schema_metadata_manager: SchemaMetadataManagerRef,
430 file_ref_manager: FileReferenceManagerRef,
431 plugins: Plugins,
432 ) -> Result<Vec<RegionEngineRef>> {
433 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
434 let mut mito_engine_config = MitoConfig::default();
435 let mut file_engine_config = file_engine::config::EngineConfig::default();
436
437 for engine in &self.opts.region_engine {
438 match engine {
439 RegionEngineConfig::Mito(config) => {
440 mito_engine_config = config.clone();
441 }
442 RegionEngineConfig::File(config) => {
443 file_engine_config = config.clone();
444 }
445 RegionEngineConfig::Metric(metric_config) => {
446 metric_engine_config = metric_config.clone();
447 }
448 }
449 }
450
451 let mito_engine = self
452 .build_mito_engine(
453 object_store_manager.clone(),
454 mito_engine_config,
455 schema_metadata_manager.clone(),
456 file_ref_manager.clone(),
457 plugins.clone(),
458 )
459 .await?;
460
461 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
462 .context(BuildMetricEngineSnafu)?;
463
464 let file_engine = FileRegionEngine::new(
465 file_engine_config,
466 object_store_manager.default_object_store().clone(), );
468
469 Ok(vec![
470 Arc::new(mito_engine) as _,
471 Arc::new(metric_engine) as _,
472 Arc::new(file_engine) as _,
473 ])
474 }
475
476 async fn build_mito_engine(
478 &mut self,
479 object_store_manager: ObjectStoreManagerRef,
480 mut config: MitoConfig,
481 schema_metadata_manager: SchemaMetadataManagerRef,
482 file_ref_manager: FileReferenceManagerRef,
483 plugins: Plugins,
484 ) -> Result<MitoEngine> {
485 let opts = &self.opts;
486 if opts.storage.is_object_storage() {
487 config.enable_write_cache = true;
489 info!("Configured 'enable_write_cache=true' for mito engine.");
490 }
491
492 let mito_engine = match &opts.wal {
493 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
494 let log_store =
495 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
496 .await?;
497
498 let builder = MitoEngineBuilder::new(
499 &opts.storage.data_home,
500 config,
501 log_store,
502 object_store_manager,
503 schema_metadata_manager,
504 file_ref_manager,
505 plugins,
506 );
507
508 #[cfg(feature = "enterprise")]
509 let builder = builder.with_extension_range_provider_factory(
510 self.extension_range_provider_factory.take(),
511 );
512
513 builder.try_build().await.context(BuildMitoEngineSnafu)?
514 }
515 DatanodeWalConfig::Kafka(kafka_config) => {
516 if kafka_config.create_index && opts.node_id.is_none() {
517 warn!("The WAL index creation only available in distributed mode.")
518 }
519 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
520 {
521 let operator = new_object_store_without_cache(
522 &opts.storage.store,
523 &opts.storage.data_home,
524 )
525 .await?;
526 let path = default_index_file(opts.node_id.unwrap());
527 Some(Self::build_global_index_collector(
528 kafka_config.dump_index_interval,
529 operator,
530 path,
531 ))
532 } else {
533 None
534 };
535
536 let log_store =
537 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
538 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
539 let builder = MitoEngineBuilder::new(
540 &opts.storage.data_home,
541 config,
542 log_store,
543 object_store_manager,
544 schema_metadata_manager,
545 file_ref_manager,
546 plugins,
547 );
548
549 #[cfg(feature = "enterprise")]
550 let builder = builder.with_extension_range_provider_factory(
551 self.extension_range_provider_factory.take(),
552 );
553
554 builder.try_build().await.context(BuildMitoEngineSnafu)?
555 }
556 };
557 Ok(mito_engine)
558 }
559
560 async fn build_raft_engine_log_store(
562 data_home: &str,
563 config: &RaftEngineConfig,
564 ) -> Result<Arc<RaftEngineLogStore>> {
565 let data_home = normalize_dir(data_home);
566 let wal_dir = match &config.dir {
567 Some(dir) => dir.clone(),
568 None => format!("{}{WAL_DIR}", data_home),
569 };
570
571 fs::create_dir_all(Path::new(&wal_dir))
573 .await
574 .context(CreateDirSnafu { dir: &wal_dir })?;
575 info!(
576 "Creating raft-engine logstore with config: {:?} and storage path: {}",
577 config, &wal_dir
578 );
579 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
580 .await
581 .map_err(Box::new)
582 .context(OpenLogStoreSnafu)?;
583
584 Ok(Arc::new(logstore))
585 }
586
587 async fn build_kafka_log_store(
589 config: &DatanodeKafkaConfig,
590 global_index_collector: Option<GlobalIndexCollector>,
591 ) -> Result<Arc<KafkaLogStore>> {
592 KafkaLogStore::try_new(config, global_index_collector)
593 .await
594 .map_err(Box::new)
595 .context(OpenLogStoreSnafu)
596 .map(Arc::new)
597 }
598
599 fn build_global_index_collector(
601 dump_index_interval: Duration,
602 operator: object_store::ObjectStore,
603 path: String,
604 ) -> GlobalIndexCollector {
605 GlobalIndexCollector::new(dump_index_interval, operator, path)
606 }
607}
608
609async fn open_all_regions(
611 region_server: RegionServer,
612 region_open_requests: RegionOpenRequests,
613 open_with_writable: bool,
614 init_regions_parallelism: usize,
615 ignore_nonexistent_region: bool,
616) -> Result<()> {
617 let RegionOpenRequests {
618 leader_regions,
619 #[cfg(feature = "enterprise")]
620 follower_regions,
621 } = region_open_requests;
622
623 let leader_region_num = leader_regions.len();
624 info!("going to open {} region(s)", leader_region_num);
625 let now = Instant::now();
626 let open_regions = region_server
627 .handle_batch_open_requests(
628 init_regions_parallelism,
629 leader_regions,
630 ignore_nonexistent_region,
631 )
632 .await?;
633 info!(
634 "Opened {} regions in {:?}",
635 open_regions.len(),
636 now.elapsed()
637 );
638 if !ignore_nonexistent_region {
639 ensure!(
640 open_regions.len() == leader_region_num,
641 error::UnexpectedSnafu {
642 violated: format!(
643 "Expected to open {} of regions, only {} of regions has opened",
644 leader_region_num,
645 open_regions.len()
646 )
647 }
648 );
649 } else if open_regions.len() != leader_region_num {
650 warn!(
651 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
652 leader_region_num,
653 open_regions.len()
654 );
655 }
656
657 for region_id in open_regions {
658 if open_with_writable {
659 if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
660 error!(
661 e; "failed to convert region {region_id} to leader"
662 );
663 }
664 }
665 }
666
667 #[cfg(feature = "enterprise")]
668 if !follower_regions.is_empty() {
669 use tokio::time::Instant;
670
671 let follower_region_num = follower_regions.len();
672 info!("going to open {} follower region(s)", follower_region_num);
673
674 let now = Instant::now();
675 let open_regions = region_server
676 .handle_batch_open_requests(
677 init_regions_parallelism,
678 follower_regions,
679 ignore_nonexistent_region,
680 )
681 .await?;
682 info!(
683 "Opened {} follower regions in {:?}",
684 open_regions.len(),
685 now.elapsed()
686 );
687
688 if !ignore_nonexistent_region {
689 ensure!(
690 open_regions.len() == follower_region_num,
691 error::UnexpectedSnafu {
692 violated: format!(
693 "Expected to open {} of follower regions, only {} of regions has opened",
694 follower_region_num,
695 open_regions.len()
696 )
697 }
698 );
699 } else if open_regions.len() != follower_region_num {
700 warn!(
701 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
702 follower_region_num,
703 open_regions.len()
704 );
705 }
706 }
707
708 info!("all regions are opened");
709
710 Ok(())
711}
712
713#[cfg(test)]
714mod tests {
715 use std::assert_matches::assert_matches;
716 use std::collections::{BTreeMap, HashMap};
717 use std::sync::Arc;
718
719 use cache::build_datanode_cache_registry;
720 use common_base::Plugins;
721 use common_meta::cache::LayeredCacheRegistryBuilder;
722 use common_meta::key::datanode_table::DatanodeTableManager;
723 use common_meta::key::RegionRoleSet;
724 use common_meta::kv_backend::memory::MemoryKvBackend;
725 use common_meta::kv_backend::KvBackendRef;
726 use mito2::engine::MITO_ENGINE_NAME;
727 use store_api::region_request::RegionRequest;
728 use store_api::storage::RegionId;
729
730 use crate::config::DatanodeOptions;
731 use crate::datanode::DatanodeBuilder;
732 use crate::tests::{mock_region_server, MockRegionEngine};
733
734 async fn setup_table_datanode(kv: &KvBackendRef) {
735 let mgr = DatanodeTableManager::new(kv.clone());
736 let txn = mgr
737 .build_create_txn(
738 1028,
739 MITO_ENGINE_NAME,
740 "foo/bar/weny",
741 HashMap::from([("foo".to_string(), "bar".to_string())]),
742 HashMap::default(),
743 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
744 )
745 .unwrap();
746
747 let r = kv.txn(txn).await.unwrap();
748 assert!(r.succeeded);
749 }
750
751 #[tokio::test]
752 async fn test_initialize_region_server() {
753 common_telemetry::init_default_ut_logging();
754 let mut mock_region_server = mock_region_server();
755 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
756
757 mock_region_server.register_engine(mock_region.clone());
758
759 let kv_backend = Arc::new(MemoryKvBackend::new());
760 let layered_cache_registry = Arc::new(
761 LayeredCacheRegistryBuilder::default()
762 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
763 .build(),
764 );
765
766 let mut builder = DatanodeBuilder::new(
767 DatanodeOptions {
768 node_id: Some(0),
769 ..Default::default()
770 },
771 Plugins::default(),
772 kv_backend.clone(),
773 );
774 builder.with_cache_registry(layered_cache_registry);
775 setup_table_datanode(&(kv_backend as _)).await;
776
777 builder
778 .initialize_region_server(&mock_region_server, false)
779 .await
780 .unwrap();
781
782 for i in 0..3 {
783 let (region_id, req) = mock_region_handler.recv().await.unwrap();
784 assert_eq!(region_id, RegionId::new(1028, i));
785 if let RegionRequest::Open(req) = req {
786 assert_eq!(
787 req.options,
788 HashMap::from([("foo".to_string(), "bar".to_string())])
789 )
790 } else {
791 unreachable!()
792 }
793 }
794
795 assert_matches!(
796 mock_region_handler.try_recv(),
797 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
798 );
799 }
800}