1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_query::prelude::set_default_prefix;
31use common_stat::ResourceStatImpl;
32use common_telemetry::{error, info, warn};
33use common_wal::config::DatanodeWalConfig;
34use common_wal::config::kafka::DatanodeKafkaConfig;
35use common_wal::config::raft_engine::RaftEngineConfig;
36use file_engine::engine::FileRegionEngine;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{GlobalIndexCollector, default_index_file};
39use log_store::noop::log_store::NoopLogStore;
40use log_store::raft_engine::log_store::RaftEngineLogStore;
41use meta_client::MetaClientRef;
42use metric_engine::engine::MetricEngine;
43use mito2::config::MitoConfig;
44use mito2::engine::{MitoEngine, MitoEngineBuilder};
45use mito2::region::opener::PartitionExprFetcherRef;
46use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
47use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
48use object_store::util::normalize_dir;
49use query::QueryEngineFactory;
50use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
51use servers::server::ServerHandlers;
52use snafu::{OptionExt, ResultExt, ensure};
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{
55 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
56};
57use tokio::fs;
58use tokio::sync::Notify;
59
60use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
61use crate::error::{
62 self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
63 GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
64 ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
65};
66use crate::event_listener::{
67 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
68 new_region_server_event_channel,
69};
70use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
71use crate::heartbeat::HeartbeatTask;
72use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
73use crate::region_server::{DummyTableProviderFactory, RegionServer};
74use crate::store::{self, new_object_store_without_cache};
75use crate::utils::{RegionOpenRequests, build_region_open_requests};
76
77pub struct Datanode {
79 services: ServerHandlers,
80 heartbeat_task: Option<HeartbeatTask>,
81 region_event_receiver: Option<RegionServerEventReceiver>,
82 region_server: RegionServer,
83 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
84 leases_notifier: Option<Arc<Notify>>,
85 plugins: Plugins,
86}
87
88impl Datanode {
89 pub async fn start(&mut self) -> Result<()> {
90 info!("Starting datanode instance...");
91
92 self.start_heartbeat().await?;
93 self.wait_coordinated().await;
94
95 self.start_telemetry();
96
97 self.services.start_all().await.context(StartServerSnafu)
98 }
99
100 pub fn server_handlers(&self) -> &ServerHandlers {
101 &self.services
102 }
103
104 pub fn start_telemetry(&self) {
105 if let Err(e) = self.greptimedb_telemetry_task.start() {
106 warn!(e; "Failed to start telemetry task!");
107 }
108 }
109
110 pub async fn start_heartbeat(&mut self) -> Result<()> {
111 if let Some(task) = &self.heartbeat_task {
112 let receiver = self.region_event_receiver.take().unwrap();
114
115 task.start(receiver, self.leases_notifier.clone()).await?;
116 }
117 Ok(())
118 }
119
120 pub async fn wait_coordinated(&mut self) {
122 if let Some(notifier) = self.leases_notifier.take() {
123 notifier.notified().await;
124 }
125 }
126
127 pub fn setup_services(&mut self, services: ServerHandlers) {
128 self.services = services;
129 }
130
131 pub async fn shutdown(&mut self) -> Result<()> {
132 self.services
133 .shutdown_all()
134 .await
135 .context(ShutdownServerSnafu)?;
136
137 let _ = self.greptimedb_telemetry_task.stop().await;
138 if let Some(heartbeat_task) = &self.heartbeat_task {
139 heartbeat_task
140 .close()
141 .map_err(BoxedError::new)
142 .context(ShutdownInstanceSnafu)?;
143 }
144 self.region_server.stop().await?;
145 Ok(())
146 }
147
148 pub fn region_server(&self) -> RegionServer {
149 self.region_server.clone()
150 }
151
152 pub fn plugins(&self) -> Plugins {
153 self.plugins.clone()
154 }
155}
156
157pub struct DatanodeBuilder {
158 opts: DatanodeOptions,
159 table_provider_factory: Option<TableProviderFactoryRef>,
160 plugins: Plugins,
161 meta_client: Option<MetaClientRef>,
162 kv_backend: KvBackendRef,
163 cache_registry: Option<Arc<LayeredCacheRegistry>>,
164 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
165 #[cfg(feature = "enterprise")]
166 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
167}
168
169impl DatanodeBuilder {
170 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
171 Self {
172 opts,
173 table_provider_factory: None,
174 plugins,
175 meta_client: None,
176 kv_backend,
177 cache_registry: None,
178 #[cfg(feature = "enterprise")]
179 extension_range_provider_factory: None,
180 topic_stats_reporter: None,
181 }
182 }
183
184 pub fn options(&self) -> &DatanodeOptions {
185 &self.opts
186 }
187
188 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
189 self.meta_client = Some(client);
190 self
191 }
192
193 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
194 self.cache_registry = Some(registry);
195 self
196 }
197
198 pub fn kv_backend(&self) -> &KvBackendRef {
199 &self.kv_backend
200 }
201
202 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
203 self.table_provider_factory = Some(factory);
204 self
205 }
206
207 #[cfg(feature = "enterprise")]
208 pub fn with_extension_range_provider(
209 &mut self,
210 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
211 ) -> &mut Self {
212 self.extension_range_provider_factory = Some(extension_range_provider_factory);
213 self
214 }
215
216 pub async fn build(mut self) -> Result<Datanode> {
217 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
218 set_default_prefix(self.opts.default_column_prefix.as_deref())
219 .map_err(BoxedError::new)
220 .context(BuildDatanodeSnafu)?;
221
222 let meta_client = self.meta_client.take();
223
224 let controlled_by_metasrv = meta_client.is_some();
228
229 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
231 let (tx, rx) = new_region_server_event_channel();
232 (Box::new(tx) as _, Some(rx))
233 } else {
234 (Box::new(NoopRegionServerEventListener) as _, None)
235 };
236
237 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
238 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
239 let table_id_schema_cache: TableSchemaCacheRef =
240 cache_registry.get().context(MissingCacheSnafu)?;
241
242 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
243 table_id_schema_cache,
244 schema_cache,
245 ));
246 let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
247 let region_server = self
248 .new_region_server(
249 schema_metadata_manager,
250 region_event_listener,
251 file_ref_manager,
252 )
253 .await?;
254
255 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
257 let is_recovery_mode = runtime_switch_manager
258 .recovery_mode()
259 .await
260 .context(GetMetadataSnafu)?;
261
262 let region_open_requests =
263 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
264 let open_all_regions = open_all_regions(
265 region_server.clone(),
266 region_open_requests,
267 !controlled_by_metasrv,
268 self.opts.init_regions_parallelism,
269 is_recovery_mode,
271 );
272
273 if self.opts.init_regions_in_background {
274 common_runtime::spawn_global(async move {
276 if let Err(err) = open_all_regions.await {
277 error!(err; "Failed to open regions during the startup.");
278 }
279 });
280 } else {
281 open_all_regions.await?;
282 }
283
284 let mut resource_stat = ResourceStatImpl::default();
285 resource_stat.start_collect_cpu_usage();
286
287 let heartbeat_task = if let Some(meta_client) = meta_client {
288 Some(
289 HeartbeatTask::try_new(
290 &self.opts,
291 region_server.clone(),
292 meta_client,
293 cache_registry,
294 self.plugins.clone(),
295 Arc::new(resource_stat),
296 )
297 .await?,
298 )
299 } else {
300 None
301 };
302
303 let is_standalone = heartbeat_task.is_none();
304 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
305 Some(self.opts.storage.data_home.clone()),
306 is_standalone && self.opts.enable_telemetry,
307 )
308 .await;
309
310 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
311 Some(Arc::new(Notify::new()))
312 } else {
313 None
314 };
315
316 Ok(Datanode {
317 services: ServerHandlers::default(),
318 heartbeat_task,
319 region_server,
320 greptimedb_telemetry_task,
321 region_event_receiver,
322 leases_notifier,
323 plugins: self.plugins.clone(),
324 })
325 }
326
327 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
329 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
330 let default_name = cfg.store.config_name();
331 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
332 for store in &cfg.providers {
333 object_store_manager.add(
334 store.config_name(),
335 store::new_object_store(store.clone(), &cfg.data_home).await?,
336 );
337 }
338 Ok(Arc::new(object_store_manager))
339 }
340
341 #[cfg(test)]
342 async fn initialize_region_server(
344 &self,
345 region_server: &RegionServer,
346 open_with_writable: bool,
347 ) -> Result<()> {
348 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
349
350 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
352 let is_recovery_mode = runtime_switch_manager
353 .recovery_mode()
354 .await
355 .context(GetMetadataSnafu)?;
356 let region_open_requests =
357 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
358
359 open_all_regions(
360 region_server.clone(),
361 region_open_requests,
362 open_with_writable,
363 self.opts.init_regions_parallelism,
364 is_recovery_mode,
365 )
366 .await
367 }
368
369 async fn new_region_server(
370 &mut self,
371 schema_metadata_manager: SchemaMetadataManagerRef,
372 event_listener: RegionServerEventListenerRef,
373 file_ref_manager: FileReferenceManagerRef,
374 ) -> Result<RegionServer> {
375 let opts: &DatanodeOptions = &self.opts;
376
377 let query_engine_factory = QueryEngineFactory::new_with_plugins(
378 DummyCatalogManager::arc(),
380 None,
381 None,
382 None,
383 None,
384 None,
385 false,
386 self.plugins.clone(),
387 opts.query.clone(),
388 );
389 let query_engine = query_engine_factory.query_engine();
390
391 let table_provider_factory = self
392 .table_provider_factory
393 .clone()
394 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
395
396 let mut region_server = RegionServer::with_table_provider(
397 query_engine,
398 common_runtime::global_runtime(),
399 event_listener,
400 table_provider_factory,
401 opts.max_concurrent_queries,
402 Duration::from_millis(100),
404 opts.grpc.flight_compression,
405 );
406
407 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
408 let engines = self
409 .build_store_engines(
410 object_store_manager,
411 schema_metadata_manager,
412 file_ref_manager,
413 self.plugins.clone(),
414 )
415 .await?;
416 for engine in engines {
417 region_server.register_engine(engine);
418 }
419 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
420 region_server.set_topic_stats_reporter(topic_stats_reporter);
421 }
422
423 Ok(region_server)
424 }
425
426 async fn build_store_engines(
430 &mut self,
431 object_store_manager: ObjectStoreManagerRef,
432 schema_metadata_manager: SchemaMetadataManagerRef,
433 file_ref_manager: FileReferenceManagerRef,
434 plugins: Plugins,
435 ) -> Result<Vec<RegionEngineRef>> {
436 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
437 let mut mito_engine_config = MitoConfig::default();
438 let mut file_engine_config = file_engine::config::EngineConfig::default();
439
440 for engine in &self.opts.region_engine {
441 match engine {
442 RegionEngineConfig::Mito(config) => {
443 mito_engine_config = config.clone();
444 }
445 RegionEngineConfig::File(config) => {
446 file_engine_config = config.clone();
447 }
448 RegionEngineConfig::Metric(metric_config) => {
449 metric_engine_config = metric_config.clone();
450 }
451 }
452 }
453
454 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
456 let mito_engine = self
457 .build_mito_engine(
458 object_store_manager.clone(),
459 mito_engine_config,
460 schema_metadata_manager.clone(),
461 file_ref_manager.clone(),
462 fetcher.clone(),
463 plugins.clone(),
464 )
465 .await?;
466
467 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
468 .context(BuildMetricEngineSnafu)?;
469
470 let file_engine = FileRegionEngine::new(
471 file_engine_config,
472 object_store_manager.default_object_store().clone(), );
474
475 Ok(vec![
476 Arc::new(mito_engine) as _,
477 Arc::new(metric_engine) as _,
478 Arc::new(file_engine) as _,
479 ])
480 }
481
482 async fn build_mito_engine(
484 &mut self,
485 object_store_manager: ObjectStoreManagerRef,
486 mut config: MitoConfig,
487 schema_metadata_manager: SchemaMetadataManagerRef,
488 file_ref_manager: FileReferenceManagerRef,
489 partition_expr_fetcher: PartitionExprFetcherRef,
490 plugins: Plugins,
491 ) -> Result<MitoEngine> {
492 let opts = &self.opts;
493 if opts.storage.is_object_storage() {
494 config.enable_write_cache = true;
496 info!("Configured 'enable_write_cache=true' for mito engine.");
497 }
498
499 let mito_engine = match &opts.wal {
500 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
501 let log_store =
502 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
503 .await?;
504
505 let builder = MitoEngineBuilder::new(
506 &opts.storage.data_home,
507 config,
508 log_store,
509 object_store_manager,
510 schema_metadata_manager,
511 file_ref_manager,
512 partition_expr_fetcher.clone(),
513 plugins,
514 opts.max_concurrent_queries,
515 );
516
517 #[cfg(feature = "enterprise")]
518 let builder = builder.with_extension_range_provider_factory(
519 self.extension_range_provider_factory.take(),
520 );
521
522 builder.try_build().await.context(BuildMitoEngineSnafu)?
523 }
524 DatanodeWalConfig::Kafka(kafka_config) => {
525 if kafka_config.create_index && opts.node_id.is_none() {
526 warn!("The WAL index creation only available in distributed mode.")
527 }
528 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
529 {
530 let operator = new_object_store_without_cache(
531 &opts.storage.store,
532 &opts.storage.data_home,
533 )
534 .await?;
535 let path = default_index_file(opts.node_id.unwrap());
536 Some(Self::build_global_index_collector(
537 kafka_config.dump_index_interval,
538 operator,
539 path,
540 ))
541 } else {
542 None
543 };
544
545 let log_store =
546 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
547 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
548 let builder = MitoEngineBuilder::new(
549 &opts.storage.data_home,
550 config,
551 log_store,
552 object_store_manager,
553 schema_metadata_manager,
554 file_ref_manager,
555 partition_expr_fetcher,
556 plugins,
557 opts.max_concurrent_queries,
558 );
559
560 #[cfg(feature = "enterprise")]
561 let builder = builder.with_extension_range_provider_factory(
562 self.extension_range_provider_factory.take(),
563 );
564
565 builder.try_build().await.context(BuildMitoEngineSnafu)?
566 }
567 DatanodeWalConfig::Noop => {
568 let log_store = Arc::new(NoopLogStore);
569
570 let builder = MitoEngineBuilder::new(
571 &opts.storage.data_home,
572 config,
573 log_store,
574 object_store_manager,
575 schema_metadata_manager,
576 file_ref_manager,
577 partition_expr_fetcher.clone(),
578 plugins,
579 opts.max_concurrent_queries,
580 );
581
582 #[cfg(feature = "enterprise")]
583 let builder = builder.with_extension_range_provider_factory(
584 self.extension_range_provider_factory.take(),
585 );
586
587 builder.try_build().await.context(BuildMitoEngineSnafu)?
588 }
589 };
590 Ok(mito_engine)
591 }
592
593 async fn build_raft_engine_log_store(
595 data_home: &str,
596 config: &RaftEngineConfig,
597 ) -> Result<Arc<RaftEngineLogStore>> {
598 let data_home = normalize_dir(data_home);
599 let wal_dir = match &config.dir {
600 Some(dir) => dir.clone(),
601 None => format!("{}{WAL_DIR}", data_home),
602 };
603
604 fs::create_dir_all(Path::new(&wal_dir))
606 .await
607 .context(CreateDirSnafu { dir: &wal_dir })?;
608 info!(
609 "Creating raft-engine logstore with config: {:?} and storage path: {}",
610 config, &wal_dir
611 );
612 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
613 .await
614 .map_err(Box::new)
615 .context(OpenLogStoreSnafu)?;
616
617 Ok(Arc::new(logstore))
618 }
619
620 async fn build_kafka_log_store(
622 config: &DatanodeKafkaConfig,
623 global_index_collector: Option<GlobalIndexCollector>,
624 ) -> Result<Arc<KafkaLogStore>> {
625 KafkaLogStore::try_new(config, global_index_collector)
626 .await
627 .map_err(Box::new)
628 .context(OpenLogStoreSnafu)
629 .map(Arc::new)
630 }
631
632 fn build_global_index_collector(
634 dump_index_interval: Duration,
635 operator: object_store::ObjectStore,
636 path: String,
637 ) -> GlobalIndexCollector {
638 GlobalIndexCollector::new(dump_index_interval, operator, path)
639 }
640}
641
642async fn open_all_regions(
644 region_server: RegionServer,
645 region_open_requests: RegionOpenRequests,
646 open_with_writable: bool,
647 init_regions_parallelism: usize,
648 ignore_nonexistent_region: bool,
649) -> Result<()> {
650 let RegionOpenRequests {
651 leader_regions,
652 #[cfg(feature = "enterprise")]
653 follower_regions,
654 } = region_open_requests;
655
656 let leader_region_num = leader_regions.len();
657 info!("going to open {} region(s)", leader_region_num);
658 let now = Instant::now();
659 let open_regions = region_server
660 .handle_batch_open_requests(
661 init_regions_parallelism,
662 leader_regions,
663 ignore_nonexistent_region,
664 )
665 .await?;
666 info!(
667 "Opened {} regions in {:?}",
668 open_regions.len(),
669 now.elapsed()
670 );
671 if !ignore_nonexistent_region {
672 ensure!(
673 open_regions.len() == leader_region_num,
674 error::UnexpectedSnafu {
675 violated: format!(
676 "Expected to open {} of regions, only {} of regions has opened",
677 leader_region_num,
678 open_regions.len()
679 )
680 }
681 );
682 } else if open_regions.len() != leader_region_num {
683 warn!(
684 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
685 leader_region_num,
686 open_regions.len()
687 );
688 }
689
690 for region_id in open_regions {
691 if open_with_writable {
692 let res = region_server.set_region_role(region_id, RegionRole::Leader);
693 match res {
694 Ok(_) => {
695 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
697 .set_region_role_state_gracefully(
698 region_id,
699 SettableRegionRoleState::Leader,
700 )
701 .await?
702 {
703 error!(err; "failed to convert region {region_id} to leader");
704 }
705 }
706 Err(e) => {
707 error!(e; "failed to convert region {region_id} to leader");
708 }
709 }
710 }
711 }
712
713 #[cfg(feature = "enterprise")]
714 if !follower_regions.is_empty() {
715 use tokio::time::Instant;
716
717 let follower_region_num = follower_regions.len();
718 info!("going to open {} follower region(s)", follower_region_num);
719
720 let now = Instant::now();
721 let open_regions = region_server
722 .handle_batch_open_requests(
723 init_regions_parallelism,
724 follower_regions,
725 ignore_nonexistent_region,
726 )
727 .await?;
728 info!(
729 "Opened {} follower regions in {:?}",
730 open_regions.len(),
731 now.elapsed()
732 );
733
734 if !ignore_nonexistent_region {
735 ensure!(
736 open_regions.len() == follower_region_num,
737 error::UnexpectedSnafu {
738 violated: format!(
739 "Expected to open {} of follower regions, only {} of regions has opened",
740 follower_region_num,
741 open_regions.len()
742 )
743 }
744 );
745 } else if open_regions.len() != follower_region_num {
746 warn!(
747 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
748 follower_region_num,
749 open_regions.len()
750 );
751 }
752 }
753
754 info!("all regions are opened");
755
756 Ok(())
757}
758
759#[cfg(test)]
760mod tests {
761 use std::assert_matches::assert_matches;
762 use std::collections::{BTreeMap, HashMap};
763 use std::sync::Arc;
764
765 use cache::build_datanode_cache_registry;
766 use common_base::Plugins;
767 use common_meta::cache::LayeredCacheRegistryBuilder;
768 use common_meta::key::RegionRoleSet;
769 use common_meta::key::datanode_table::DatanodeTableManager;
770 use common_meta::kv_backend::KvBackendRef;
771 use common_meta::kv_backend::memory::MemoryKvBackend;
772 use mito2::engine::MITO_ENGINE_NAME;
773 use store_api::region_request::RegionRequest;
774 use store_api::storage::RegionId;
775
776 use crate::config::DatanodeOptions;
777 use crate::datanode::DatanodeBuilder;
778 use crate::tests::{MockRegionEngine, mock_region_server};
779
780 async fn setup_table_datanode(kv: &KvBackendRef) {
781 let mgr = DatanodeTableManager::new(kv.clone());
782 let txn = mgr
783 .build_create_txn(
784 1028,
785 MITO_ENGINE_NAME,
786 "foo/bar/weny",
787 HashMap::from([("foo".to_string(), "bar".to_string())]),
788 HashMap::default(),
789 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
790 )
791 .unwrap();
792
793 let r = kv.txn(txn).await.unwrap();
794 assert!(r.succeeded);
795 }
796
797 #[tokio::test]
798 async fn test_initialize_region_server() {
799 common_telemetry::init_default_ut_logging();
800 let mut mock_region_server = mock_region_server();
801 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
802
803 mock_region_server.register_engine(mock_region.clone());
804
805 let kv_backend = Arc::new(MemoryKvBackend::new());
806 let layered_cache_registry = Arc::new(
807 LayeredCacheRegistryBuilder::default()
808 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
809 .build(),
810 );
811
812 let mut builder = DatanodeBuilder::new(
813 DatanodeOptions {
814 node_id: Some(0),
815 ..Default::default()
816 },
817 Plugins::default(),
818 kv_backend.clone(),
819 );
820 builder.with_cache_registry(layered_cache_registry);
821 setup_table_datanode(&(kv_backend as _)).await;
822
823 builder
824 .initialize_region_server(&mock_region_server, false)
825 .await
826 .unwrap();
827
828 for i in 0..3 {
829 let (region_id, req) = mock_region_handler.recv().await.unwrap();
830 assert_eq!(region_id, RegionId::new(1028, i));
831 if let RegionRequest::Open(req) = req {
832 assert_eq!(
833 req.options,
834 HashMap::from([("foo".to_string(), "bar".to_string())])
835 )
836 } else {
837 unreachable!()
838 }
839 }
840
841 assert_matches!(
842 mock_region_handler.try_recv(),
843 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
844 );
845 }
846}