1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_stat::ResourceStatImpl;
31use common_telemetry::{error, info, warn};
32use common_wal::config::DatanodeWalConfig;
33use common_wal::config::kafka::DatanodeKafkaConfig;
34use common_wal::config::raft_engine::RaftEngineConfig;
35use file_engine::engine::FileRegionEngine;
36use log_store::kafka::log_store::KafkaLogStore;
37use log_store::kafka::{GlobalIndexCollector, default_index_file};
38use log_store::noop::log_store::NoopLogStore;
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::{MitoEngine, MitoEngineBuilder};
44use mito2::region::opener::PartitionExprFetcherRef;
45use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
46use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
47use object_store::util::normalize_dir;
48use query::QueryEngineFactory;
49use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
50use servers::export_metrics::ExportMetricsTask;
51use servers::server::ServerHandlers;
52use snafu::{OptionExt, ResultExt, ensure};
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{
55 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
56};
57use tokio::fs;
58use tokio::sync::Notify;
59
60use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
61use crate::error::{
62 self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
63 MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
64 ShutdownServerSnafu, StartServerSnafu,
65};
66use crate::event_listener::{
67 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
68 new_region_server_event_channel,
69};
70use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
71use crate::heartbeat::HeartbeatTask;
72use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
73use crate::region_server::{DummyTableProviderFactory, RegionServer};
74use crate::store::{self, new_object_store_without_cache};
75use crate::utils::{RegionOpenRequests, build_region_open_requests};
76
77pub struct Datanode {
79 services: ServerHandlers,
80 heartbeat_task: Option<HeartbeatTask>,
81 region_event_receiver: Option<RegionServerEventReceiver>,
82 region_server: RegionServer,
83 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
84 leases_notifier: Option<Arc<Notify>>,
85 plugins: Plugins,
86 export_metrics_task: Option<ExportMetricsTask>,
87}
88
89impl Datanode {
90 pub async fn start(&mut self) -> Result<()> {
91 info!("Starting datanode instance...");
92
93 self.start_heartbeat().await?;
94 self.wait_coordinated().await;
95
96 self.start_telemetry();
97
98 if let Some(t) = self.export_metrics_task.as_ref() {
99 t.start(None).context(StartServerSnafu)?
100 }
101
102 self.services.start_all().await.context(StartServerSnafu)
103 }
104
105 pub fn server_handlers(&self) -> &ServerHandlers {
106 &self.services
107 }
108
109 pub fn start_telemetry(&self) {
110 if let Err(e) = self.greptimedb_telemetry_task.start() {
111 warn!(e; "Failed to start telemetry task!");
112 }
113 }
114
115 pub async fn start_heartbeat(&mut self) -> Result<()> {
116 if let Some(task) = &self.heartbeat_task {
117 let receiver = self.region_event_receiver.take().unwrap();
119
120 task.start(receiver, self.leases_notifier.clone()).await?;
121 }
122 Ok(())
123 }
124
125 pub async fn wait_coordinated(&mut self) {
127 if let Some(notifier) = self.leases_notifier.take() {
128 notifier.notified().await;
129 }
130 }
131
132 pub fn setup_services(&mut self, services: ServerHandlers) {
133 self.services = services;
134 }
135
136 pub async fn shutdown(&mut self) -> Result<()> {
137 self.services
138 .shutdown_all()
139 .await
140 .context(ShutdownServerSnafu)?;
141
142 let _ = self.greptimedb_telemetry_task.stop().await;
143 if let Some(heartbeat_task) = &self.heartbeat_task {
144 heartbeat_task
145 .close()
146 .map_err(BoxedError::new)
147 .context(ShutdownInstanceSnafu)?;
148 }
149 self.region_server.stop().await?;
150 Ok(())
151 }
152
153 pub fn region_server(&self) -> RegionServer {
154 self.region_server.clone()
155 }
156
157 pub fn plugins(&self) -> Plugins {
158 self.plugins.clone()
159 }
160}
161
162pub struct DatanodeBuilder {
163 opts: DatanodeOptions,
164 table_provider_factory: Option<TableProviderFactoryRef>,
165 plugins: Plugins,
166 meta_client: Option<MetaClientRef>,
167 kv_backend: KvBackendRef,
168 cache_registry: Option<Arc<LayeredCacheRegistry>>,
169 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
170 #[cfg(feature = "enterprise")]
171 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
172}
173
174impl DatanodeBuilder {
175 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
176 Self {
177 opts,
178 table_provider_factory: None,
179 plugins,
180 meta_client: None,
181 kv_backend,
182 cache_registry: None,
183 #[cfg(feature = "enterprise")]
184 extension_range_provider_factory: None,
185 topic_stats_reporter: None,
186 }
187 }
188
189 pub fn options(&self) -> &DatanodeOptions {
190 &self.opts
191 }
192
193 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
194 self.meta_client = Some(client);
195 self
196 }
197
198 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
199 self.cache_registry = Some(registry);
200 self
201 }
202
203 pub fn kv_backend(&self) -> &KvBackendRef {
204 &self.kv_backend
205 }
206
207 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
208 self.table_provider_factory = Some(factory);
209 self
210 }
211
212 #[cfg(feature = "enterprise")]
213 pub fn with_extension_range_provider(
214 &mut self,
215 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
216 ) -> &mut Self {
217 self.extension_range_provider_factory = Some(extension_range_provider_factory);
218 self
219 }
220
221 pub async fn build(mut self) -> Result<Datanode> {
222 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
223
224 let meta_client = self.meta_client.take();
225
226 let controlled_by_metasrv = meta_client.is_some();
230
231 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
233 let (tx, rx) = new_region_server_event_channel();
234 (Box::new(tx) as _, Some(rx))
235 } else {
236 (Box::new(NoopRegionServerEventListener) as _, None)
237 };
238
239 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
240 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
241 let table_id_schema_cache: TableSchemaCacheRef =
242 cache_registry.get().context(MissingCacheSnafu)?;
243
244 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
245 table_id_schema_cache,
246 schema_cache,
247 ));
248 let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
249 let region_server = self
250 .new_region_server(
251 schema_metadata_manager,
252 region_event_listener,
253 file_ref_manager,
254 )
255 .await?;
256
257 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
259 let is_recovery_mode = runtime_switch_manager
260 .recovery_mode()
261 .await
262 .context(GetMetadataSnafu)?;
263
264 let region_open_requests =
265 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
266 let open_all_regions = open_all_regions(
267 region_server.clone(),
268 region_open_requests,
269 !controlled_by_metasrv,
270 self.opts.init_regions_parallelism,
271 is_recovery_mode,
273 );
274
275 if self.opts.init_regions_in_background {
276 common_runtime::spawn_global(async move {
278 if let Err(err) = open_all_regions.await {
279 error!(err; "Failed to open regions during the startup.");
280 }
281 });
282 } else {
283 open_all_regions.await?;
284 }
285
286 let mut resource_stat = ResourceStatImpl::default();
287 resource_stat.start_collect_cpu_usage();
288
289 let heartbeat_task = if let Some(meta_client) = meta_client {
290 Some(
291 HeartbeatTask::try_new(
292 &self.opts,
293 region_server.clone(),
294 meta_client,
295 cache_registry,
296 self.plugins.clone(),
297 Arc::new(resource_stat),
298 )
299 .await?,
300 )
301 } else {
302 None
303 };
304
305 let is_standalone = heartbeat_task.is_none();
306 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
307 Some(self.opts.storage.data_home.clone()),
308 is_standalone && self.opts.enable_telemetry,
309 )
310 .await;
311
312 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
313 Some(Arc::new(Notify::new()))
314 } else {
315 None
316 };
317
318 let export_metrics_task =
319 ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
320 .context(StartServerSnafu)?;
321
322 Ok(Datanode {
323 services: ServerHandlers::default(),
324 heartbeat_task,
325 region_server,
326 greptimedb_telemetry_task,
327 region_event_receiver,
328 leases_notifier,
329 plugins: self.plugins.clone(),
330 export_metrics_task,
331 })
332 }
333
334 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
336 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
337 let default_name = cfg.store.config_name();
338 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
339 for store in &cfg.providers {
340 object_store_manager.add(
341 store.config_name(),
342 store::new_object_store(store.clone(), &cfg.data_home).await?,
343 );
344 }
345 Ok(Arc::new(object_store_manager))
346 }
347
348 #[cfg(test)]
349 async fn initialize_region_server(
351 &self,
352 region_server: &RegionServer,
353 open_with_writable: bool,
354 ) -> Result<()> {
355 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
356
357 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
359 let is_recovery_mode = runtime_switch_manager
360 .recovery_mode()
361 .await
362 .context(GetMetadataSnafu)?;
363 let region_open_requests =
364 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
365
366 open_all_regions(
367 region_server.clone(),
368 region_open_requests,
369 open_with_writable,
370 self.opts.init_regions_parallelism,
371 is_recovery_mode,
372 )
373 .await
374 }
375
376 async fn new_region_server(
377 &mut self,
378 schema_metadata_manager: SchemaMetadataManagerRef,
379 event_listener: RegionServerEventListenerRef,
380 file_ref_manager: FileReferenceManagerRef,
381 ) -> Result<RegionServer> {
382 let opts: &DatanodeOptions = &self.opts;
383
384 let query_engine_factory = QueryEngineFactory::new_with_plugins(
385 DummyCatalogManager::arc(),
387 None,
388 None,
389 None,
390 None,
391 None,
392 false,
393 self.plugins.clone(),
394 opts.query.clone(),
395 );
396 let query_engine = query_engine_factory.query_engine();
397
398 let table_provider_factory = self
399 .table_provider_factory
400 .clone()
401 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
402
403 let mut region_server = RegionServer::with_table_provider(
404 query_engine,
405 common_runtime::global_runtime(),
406 event_listener,
407 table_provider_factory,
408 opts.max_concurrent_queries,
409 Duration::from_millis(100),
411 opts.grpc.flight_compression,
412 );
413
414 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
415 let engines = self
416 .build_store_engines(
417 object_store_manager,
418 schema_metadata_manager,
419 file_ref_manager,
420 self.plugins.clone(),
421 )
422 .await?;
423 for engine in engines {
424 region_server.register_engine(engine);
425 }
426 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
427 region_server.set_topic_stats_reporter(topic_stats_reporter);
428 }
429
430 Ok(region_server)
431 }
432
433 async fn build_store_engines(
437 &mut self,
438 object_store_manager: ObjectStoreManagerRef,
439 schema_metadata_manager: SchemaMetadataManagerRef,
440 file_ref_manager: FileReferenceManagerRef,
441 plugins: Plugins,
442 ) -> Result<Vec<RegionEngineRef>> {
443 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
444 let mut mito_engine_config = MitoConfig::default();
445 let mut file_engine_config = file_engine::config::EngineConfig::default();
446
447 for engine in &self.opts.region_engine {
448 match engine {
449 RegionEngineConfig::Mito(config) => {
450 mito_engine_config = config.clone();
451 }
452 RegionEngineConfig::File(config) => {
453 file_engine_config = config.clone();
454 }
455 RegionEngineConfig::Metric(metric_config) => {
456 metric_engine_config = metric_config.clone();
457 }
458 }
459 }
460
461 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
463 let mito_engine = self
464 .build_mito_engine(
465 object_store_manager.clone(),
466 mito_engine_config,
467 schema_metadata_manager.clone(),
468 file_ref_manager.clone(),
469 fetcher.clone(),
470 plugins.clone(),
471 )
472 .await?;
473
474 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
475 .context(BuildMetricEngineSnafu)?;
476
477 let file_engine = FileRegionEngine::new(
478 file_engine_config,
479 object_store_manager.default_object_store().clone(), );
481
482 Ok(vec![
483 Arc::new(mito_engine) as _,
484 Arc::new(metric_engine) as _,
485 Arc::new(file_engine) as _,
486 ])
487 }
488
489 async fn build_mito_engine(
491 &mut self,
492 object_store_manager: ObjectStoreManagerRef,
493 mut config: MitoConfig,
494 schema_metadata_manager: SchemaMetadataManagerRef,
495 file_ref_manager: FileReferenceManagerRef,
496 partition_expr_fetcher: PartitionExprFetcherRef,
497 plugins: Plugins,
498 ) -> Result<MitoEngine> {
499 let opts = &self.opts;
500 if opts.storage.is_object_storage() {
501 config.enable_write_cache = true;
503 info!("Configured 'enable_write_cache=true' for mito engine.");
504 }
505
506 let mito_engine = match &opts.wal {
507 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
508 let log_store =
509 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
510 .await?;
511
512 let builder = MitoEngineBuilder::new(
513 &opts.storage.data_home,
514 config,
515 log_store,
516 object_store_manager,
517 schema_metadata_manager,
518 file_ref_manager,
519 partition_expr_fetcher.clone(),
520 plugins,
521 );
522
523 #[cfg(feature = "enterprise")]
524 let builder = builder.with_extension_range_provider_factory(
525 self.extension_range_provider_factory.take(),
526 );
527
528 builder.try_build().await.context(BuildMitoEngineSnafu)?
529 }
530 DatanodeWalConfig::Kafka(kafka_config) => {
531 if kafka_config.create_index && opts.node_id.is_none() {
532 warn!("The WAL index creation only available in distributed mode.")
533 }
534 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
535 {
536 let operator = new_object_store_without_cache(
537 &opts.storage.store,
538 &opts.storage.data_home,
539 )
540 .await?;
541 let path = default_index_file(opts.node_id.unwrap());
542 Some(Self::build_global_index_collector(
543 kafka_config.dump_index_interval,
544 operator,
545 path,
546 ))
547 } else {
548 None
549 };
550
551 let log_store =
552 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
553 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
554 let builder = MitoEngineBuilder::new(
555 &opts.storage.data_home,
556 config,
557 log_store,
558 object_store_manager,
559 schema_metadata_manager,
560 file_ref_manager,
561 partition_expr_fetcher,
562 plugins,
563 );
564
565 #[cfg(feature = "enterprise")]
566 let builder = builder.with_extension_range_provider_factory(
567 self.extension_range_provider_factory.take(),
568 );
569
570 builder.try_build().await.context(BuildMitoEngineSnafu)?
571 }
572 DatanodeWalConfig::Noop => {
573 let log_store = Arc::new(NoopLogStore);
574
575 let builder = MitoEngineBuilder::new(
576 &opts.storage.data_home,
577 config,
578 log_store,
579 object_store_manager,
580 schema_metadata_manager,
581 file_ref_manager,
582 partition_expr_fetcher.clone(),
583 plugins,
584 );
585
586 #[cfg(feature = "enterprise")]
587 let builder = builder.with_extension_range_provider_factory(
588 self.extension_range_provider_factory.take(),
589 );
590
591 builder.try_build().await.context(BuildMitoEngineSnafu)?
592 }
593 };
594 Ok(mito_engine)
595 }
596
597 async fn build_raft_engine_log_store(
599 data_home: &str,
600 config: &RaftEngineConfig,
601 ) -> Result<Arc<RaftEngineLogStore>> {
602 let data_home = normalize_dir(data_home);
603 let wal_dir = match &config.dir {
604 Some(dir) => dir.clone(),
605 None => format!("{}{WAL_DIR}", data_home),
606 };
607
608 fs::create_dir_all(Path::new(&wal_dir))
610 .await
611 .context(CreateDirSnafu { dir: &wal_dir })?;
612 info!(
613 "Creating raft-engine logstore with config: {:?} and storage path: {}",
614 config, &wal_dir
615 );
616 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
617 .await
618 .map_err(Box::new)
619 .context(OpenLogStoreSnafu)?;
620
621 Ok(Arc::new(logstore))
622 }
623
624 async fn build_kafka_log_store(
626 config: &DatanodeKafkaConfig,
627 global_index_collector: Option<GlobalIndexCollector>,
628 ) -> Result<Arc<KafkaLogStore>> {
629 KafkaLogStore::try_new(config, global_index_collector)
630 .await
631 .map_err(Box::new)
632 .context(OpenLogStoreSnafu)
633 .map(Arc::new)
634 }
635
636 fn build_global_index_collector(
638 dump_index_interval: Duration,
639 operator: object_store::ObjectStore,
640 path: String,
641 ) -> GlobalIndexCollector {
642 GlobalIndexCollector::new(dump_index_interval, operator, path)
643 }
644}
645
646async fn open_all_regions(
648 region_server: RegionServer,
649 region_open_requests: RegionOpenRequests,
650 open_with_writable: bool,
651 init_regions_parallelism: usize,
652 ignore_nonexistent_region: bool,
653) -> Result<()> {
654 let RegionOpenRequests {
655 leader_regions,
656 #[cfg(feature = "enterprise")]
657 follower_regions,
658 } = region_open_requests;
659
660 let leader_region_num = leader_regions.len();
661 info!("going to open {} region(s)", leader_region_num);
662 let now = Instant::now();
663 let open_regions = region_server
664 .handle_batch_open_requests(
665 init_regions_parallelism,
666 leader_regions,
667 ignore_nonexistent_region,
668 )
669 .await?;
670 info!(
671 "Opened {} regions in {:?}",
672 open_regions.len(),
673 now.elapsed()
674 );
675 if !ignore_nonexistent_region {
676 ensure!(
677 open_regions.len() == leader_region_num,
678 error::UnexpectedSnafu {
679 violated: format!(
680 "Expected to open {} of regions, only {} of regions has opened",
681 leader_region_num,
682 open_regions.len()
683 )
684 }
685 );
686 } else if open_regions.len() != leader_region_num {
687 warn!(
688 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
689 leader_region_num,
690 open_regions.len()
691 );
692 }
693
694 for region_id in open_regions {
695 if open_with_writable {
696 let res = region_server.set_region_role(region_id, RegionRole::Leader);
697 match res {
698 Ok(_) => {
699 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
701 .set_region_role_state_gracefully(
702 region_id,
703 SettableRegionRoleState::Leader,
704 )
705 .await?
706 {
707 error!(err; "failed to convert region {region_id} to leader");
708 }
709 }
710 Err(e) => {
711 error!(e; "failed to convert region {region_id} to leader");
712 }
713 }
714 }
715 }
716
717 #[cfg(feature = "enterprise")]
718 if !follower_regions.is_empty() {
719 use tokio::time::Instant;
720
721 let follower_region_num = follower_regions.len();
722 info!("going to open {} follower region(s)", follower_region_num);
723
724 let now = Instant::now();
725 let open_regions = region_server
726 .handle_batch_open_requests(
727 init_regions_parallelism,
728 follower_regions,
729 ignore_nonexistent_region,
730 )
731 .await?;
732 info!(
733 "Opened {} follower regions in {:?}",
734 open_regions.len(),
735 now.elapsed()
736 );
737
738 if !ignore_nonexistent_region {
739 ensure!(
740 open_regions.len() == follower_region_num,
741 error::UnexpectedSnafu {
742 violated: format!(
743 "Expected to open {} of follower regions, only {} of regions has opened",
744 follower_region_num,
745 open_regions.len()
746 )
747 }
748 );
749 } else if open_regions.len() != follower_region_num {
750 warn!(
751 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
752 follower_region_num,
753 open_regions.len()
754 );
755 }
756 }
757
758 info!("all regions are opened");
759
760 Ok(())
761}
762
763#[cfg(test)]
764mod tests {
765 use std::assert_matches::assert_matches;
766 use std::collections::{BTreeMap, HashMap};
767 use std::sync::Arc;
768
769 use cache::build_datanode_cache_registry;
770 use common_base::Plugins;
771 use common_meta::cache::LayeredCacheRegistryBuilder;
772 use common_meta::key::RegionRoleSet;
773 use common_meta::key::datanode_table::DatanodeTableManager;
774 use common_meta::kv_backend::KvBackendRef;
775 use common_meta::kv_backend::memory::MemoryKvBackend;
776 use mito2::engine::MITO_ENGINE_NAME;
777 use store_api::region_request::RegionRequest;
778 use store_api::storage::RegionId;
779
780 use crate::config::DatanodeOptions;
781 use crate::datanode::DatanodeBuilder;
782 use crate::tests::{MockRegionEngine, mock_region_server};
783
784 async fn setup_table_datanode(kv: &KvBackendRef) {
785 let mgr = DatanodeTableManager::new(kv.clone());
786 let txn = mgr
787 .build_create_txn(
788 1028,
789 MITO_ENGINE_NAME,
790 "foo/bar/weny",
791 HashMap::from([("foo".to_string(), "bar".to_string())]),
792 HashMap::default(),
793 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
794 )
795 .unwrap();
796
797 let r = kv.txn(txn).await.unwrap();
798 assert!(r.succeeded);
799 }
800
801 #[tokio::test]
802 async fn test_initialize_region_server() {
803 common_telemetry::init_default_ut_logging();
804 let mut mock_region_server = mock_region_server();
805 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
806
807 mock_region_server.register_engine(mock_region.clone());
808
809 let kv_backend = Arc::new(MemoryKvBackend::new());
810 let layered_cache_registry = Arc::new(
811 LayeredCacheRegistryBuilder::default()
812 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
813 .build(),
814 );
815
816 let mut builder = DatanodeBuilder::new(
817 DatanodeOptions {
818 node_id: Some(0),
819 ..Default::default()
820 },
821 Plugins::default(),
822 kv_backend.clone(),
823 );
824 builder.with_cache_registry(layered_cache_registry);
825 setup_table_datanode(&(kv_backend as _)).await;
826
827 builder
828 .initialize_region_server(&mock_region_server, false)
829 .await
830 .unwrap();
831
832 for i in 0..3 {
833 let (region_id, req) = mock_region_handler.recv().await.unwrap();
834 assert_eq!(region_id, RegionId::new(1028, i));
835 if let RegionRequest::Open(req) = req {
836 assert_eq!(
837 req.options,
838 HashMap::from([("foo".to_string(), "bar".to_string())])
839 )
840 } else {
841 unreachable!()
842 }
843 }
844
845 assert_matches!(
846 mock_region_handler.try_recv(),
847 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
848 );
849 }
850}