1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63 self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64 GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65 ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69 new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78pub struct Datanode {
80 services: ServerHandlers,
81 heartbeat_task: Option<HeartbeatTask>,
82 region_event_receiver: Option<RegionServerEventReceiver>,
83 region_server: RegionServer,
84 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85 leases_notifier: Option<Arc<Notify>>,
86 plugins: Plugins,
87}
88
89impl Datanode {
90 pub async fn start(&mut self) -> Result<()> {
91 info!("Starting datanode instance...");
92
93 self.start_heartbeat().await?;
94 self.wait_coordinated().await;
95
96 self.start_telemetry();
97
98 self.services.start_all().await.context(StartServerSnafu)
99 }
100
101 pub fn server_handlers(&self) -> &ServerHandlers {
102 &self.services
103 }
104
105 pub fn start_telemetry(&self) {
106 if let Err(e) = self.greptimedb_telemetry_task.start() {
107 warn!(e; "Failed to start telemetry task!");
108 }
109 }
110
111 pub async fn start_heartbeat(&mut self) -> Result<()> {
112 if let Some(task) = &self.heartbeat_task {
113 let receiver = self.region_event_receiver.take().unwrap();
115
116 task.start(receiver, self.leases_notifier.clone()).await?;
117 }
118 Ok(())
119 }
120
121 pub async fn wait_coordinated(&mut self) {
123 if let Some(notifier) = self.leases_notifier.take() {
124 notifier.notified().await;
125 }
126 }
127
128 pub fn setup_services(&mut self, services: ServerHandlers) {
129 self.services = services;
130 }
131
132 pub async fn shutdown(&mut self) -> Result<()> {
133 self.services
134 .shutdown_all()
135 .await
136 .context(ShutdownServerSnafu)?;
137
138 let _ = self.greptimedb_telemetry_task.stop().await;
139 if let Some(heartbeat_task) = &self.heartbeat_task {
140 heartbeat_task
141 .close()
142 .map_err(BoxedError::new)
143 .context(ShutdownInstanceSnafu)?;
144 }
145 self.region_server.stop().await?;
146 Ok(())
147 }
148
149 pub fn region_server(&self) -> RegionServer {
150 self.region_server.clone()
151 }
152
153 pub fn plugins(&self) -> Plugins {
154 self.plugins.clone()
155 }
156}
157
158pub struct DatanodeBuilder {
159 opts: DatanodeOptions,
160 table_provider_factory: Option<TableProviderFactoryRef>,
161 plugins: Plugins,
162 meta_client: Option<MetaClientRef>,
163 kv_backend: KvBackendRef,
164 cache_registry: Option<Arc<LayeredCacheRegistry>>,
165 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166 #[cfg(feature = "enterprise")]
167 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
168}
169
170impl DatanodeBuilder {
171 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
172 Self {
173 opts,
174 table_provider_factory: None,
175 plugins,
176 meta_client: None,
177 kv_backend,
178 cache_registry: None,
179 #[cfg(feature = "enterprise")]
180 extension_range_provider_factory: None,
181 topic_stats_reporter: None,
182 }
183 }
184
185 pub fn options(&self) -> &DatanodeOptions {
186 &self.opts
187 }
188
189 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
190 self.meta_client = Some(client);
191 self
192 }
193
194 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
195 self.cache_registry = Some(registry);
196 self
197 }
198
199 pub fn kv_backend(&self) -> &KvBackendRef {
200 &self.kv_backend
201 }
202
203 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
204 self.table_provider_factory = Some(factory);
205 self
206 }
207
208 #[cfg(feature = "enterprise")]
209 pub fn with_extension_range_provider(
210 &mut self,
211 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
212 ) -> &mut Self {
213 self.extension_range_provider_factory = Some(extension_range_provider_factory);
214 self
215 }
216
217 pub async fn build(mut self) -> Result<Datanode> {
218 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
219 set_default_prefix(self.opts.default_column_prefix.as_deref())
220 .map_err(BoxedError::new)
221 .context(BuildDatanodeSnafu)?;
222
223 let meta_client = self.meta_client.take();
224
225 let controlled_by_metasrv = meta_client.is_some();
229
230 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
232 let (tx, rx) = new_region_server_event_channel();
233 (Box::new(tx) as _, Some(rx))
234 } else {
235 (Box::new(NoopRegionServerEventListener) as _, None)
236 };
237
238 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
239 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
240 let table_id_schema_cache: TableSchemaCacheRef =
241 cache_registry.get().context(MissingCacheSnafu)?;
242
243 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
244 table_id_schema_cache,
245 schema_cache,
246 ));
247
248 let gc_enabled = self.opts.region_engine.iter().any(|engine| {
249 if let RegionEngineConfig::Mito(config) = engine {
250 config.gc.enable
251 } else {
252 false
253 }
254 });
255
256 let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
257 Some(node_id),
258 gc_enabled,
259 ));
260 let region_server = self
261 .new_region_server(
262 schema_metadata_manager,
263 region_event_listener,
264 file_ref_manager,
265 )
266 .await?;
267
268 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
270 let is_recovery_mode = runtime_switch_manager
271 .recovery_mode()
272 .await
273 .context(GetMetadataSnafu)?;
274
275 let region_open_requests =
276 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
277 let open_all_regions = open_all_regions(
278 region_server.clone(),
279 region_open_requests,
280 !controlled_by_metasrv,
281 self.opts.init_regions_parallelism,
282 is_recovery_mode,
284 );
285
286 if self.opts.init_regions_in_background {
287 common_runtime::spawn_global(async move {
289 if let Err(err) = open_all_regions.await {
290 error!(err; "Failed to open regions during the startup.");
291 }
292 });
293 } else {
294 open_all_regions.await?;
295 }
296
297 let heartbeat_task = if let Some(meta_client) = meta_client {
298 let task = self
299 .create_heartbeat_task(®ion_server, meta_client, cache_registry)
300 .await?;
301 Some(task)
302 } else {
303 None
304 };
305
306 let is_standalone = heartbeat_task.is_none();
307 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
308 Some(self.opts.storage.data_home.clone()),
309 is_standalone && self.opts.enable_telemetry,
310 )
311 .await;
312
313 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
314 Some(Arc::new(Notify::new()))
315 } else {
316 None
317 };
318
319 Ok(Datanode {
320 services: ServerHandlers::default(),
321 heartbeat_task,
322 region_server,
323 greptimedb_telemetry_task,
324 region_event_receiver,
325 leases_notifier,
326 plugins: self.plugins.clone(),
327 })
328 }
329
330 async fn create_heartbeat_task(
331 &self,
332 region_server: &RegionServer,
333 meta_client: MetaClientRef,
334 cache_invalidator: CacheInvalidatorRef,
335 ) -> Result<HeartbeatTask> {
336 let stat = {
337 let mut stat = ResourceStatImpl::default();
338 stat.start_collect_cpu_usage();
339 Arc::new(stat)
340 };
341
342 HeartbeatTask::try_new(
343 &self.opts,
344 region_server.clone(),
345 meta_client,
346 self.kv_backend.clone(),
347 cache_invalidator,
348 self.plugins.clone(),
349 stat,
350 )
351 .await
352 }
353
354 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
356 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
357 let default_name = cfg.store.config_name();
358 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
359 for store in &cfg.providers {
360 object_store_manager.add(
361 store.config_name(),
362 store::new_object_store(store.clone(), &cfg.data_home).await?,
363 );
364 }
365 Ok(Arc::new(object_store_manager))
366 }
367
368 #[cfg(test)]
369 async fn initialize_region_server(
371 &self,
372 region_server: &RegionServer,
373 open_with_writable: bool,
374 ) -> Result<()> {
375 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
376
377 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
379 let is_recovery_mode = runtime_switch_manager
380 .recovery_mode()
381 .await
382 .context(GetMetadataSnafu)?;
383 let region_open_requests =
384 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
385
386 open_all_regions(
387 region_server.clone(),
388 region_open_requests,
389 open_with_writable,
390 self.opts.init_regions_parallelism,
391 is_recovery_mode,
392 )
393 .await
394 }
395
396 async fn new_region_server(
397 &mut self,
398 schema_metadata_manager: SchemaMetadataManagerRef,
399 event_listener: RegionServerEventListenerRef,
400 file_ref_manager: FileReferenceManagerRef,
401 ) -> Result<RegionServer> {
402 let opts: &DatanodeOptions = &self.opts;
403
404 let query_engine_factory = QueryEngineFactory::new_with_plugins(
405 DummyCatalogManager::arc(),
407 None,
408 None,
409 None,
410 None,
411 None,
412 false,
413 self.plugins.clone(),
414 opts.query.clone(),
415 );
416 let query_engine = query_engine_factory.query_engine();
417
418 let table_provider_factory = self
419 .table_provider_factory
420 .clone()
421 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
422
423 let mut region_server = RegionServer::with_table_provider(
424 query_engine,
425 common_runtime::global_runtime(),
426 event_listener,
427 table_provider_factory,
428 opts.max_concurrent_queries,
429 Duration::from_millis(100),
431 opts.grpc.flight_compression,
432 );
433
434 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
435 let engines = self
436 .build_store_engines(
437 object_store_manager,
438 schema_metadata_manager,
439 file_ref_manager,
440 self.plugins.clone(),
441 )
442 .await?;
443 for engine in engines {
444 region_server.register_engine(engine);
445 }
446 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
447 region_server.set_topic_stats_reporter(topic_stats_reporter);
448 }
449
450 Ok(region_server)
451 }
452
453 async fn build_store_engines(
457 &mut self,
458 object_store_manager: ObjectStoreManagerRef,
459 schema_metadata_manager: SchemaMetadataManagerRef,
460 file_ref_manager: FileReferenceManagerRef,
461 plugins: Plugins,
462 ) -> Result<Vec<RegionEngineRef>> {
463 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
464 let mut mito_engine_config = MitoConfig::default();
465 let mut file_engine_config = file_engine::config::EngineConfig::default();
466
467 for engine in &self.opts.region_engine {
468 match engine {
469 RegionEngineConfig::Mito(config) => {
470 mito_engine_config = config.clone();
471 }
472 RegionEngineConfig::File(config) => {
473 file_engine_config = config.clone();
474 }
475 RegionEngineConfig::Metric(metric_config) => {
476 metric_engine_config = metric_config.clone();
477 }
478 }
479 }
480
481 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
483 let mito_engine = self
484 .build_mito_engine(
485 object_store_manager.clone(),
486 mito_engine_config,
487 schema_metadata_manager.clone(),
488 file_ref_manager.clone(),
489 fetcher.clone(),
490 plugins.clone(),
491 )
492 .await?;
493
494 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
495 .context(BuildMetricEngineSnafu)?;
496
497 let file_engine = FileRegionEngine::new(
498 file_engine_config,
499 object_store_manager.default_object_store().clone(), );
501
502 Ok(vec![
503 Arc::new(mito_engine) as _,
504 Arc::new(metric_engine) as _,
505 Arc::new(file_engine) as _,
506 ])
507 }
508
509 async fn build_mito_engine(
511 &mut self,
512 object_store_manager: ObjectStoreManagerRef,
513 mut config: MitoConfig,
514 schema_metadata_manager: SchemaMetadataManagerRef,
515 file_ref_manager: FileReferenceManagerRef,
516 partition_expr_fetcher: PartitionExprFetcherRef,
517 plugins: Plugins,
518 ) -> Result<MitoEngine> {
519 let opts = &self.opts;
520 if opts.storage.is_object_storage() {
521 config.enable_write_cache = true;
523 info!("Configured 'enable_write_cache=true' for mito engine.");
524 }
525
526 let mito_engine = match &opts.wal {
527 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
528 let log_store =
529 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
530 .await?;
531
532 let builder = MitoEngineBuilder::new(
533 &opts.storage.data_home,
534 config,
535 log_store,
536 object_store_manager,
537 schema_metadata_manager,
538 file_ref_manager,
539 partition_expr_fetcher.clone(),
540 plugins,
541 opts.max_concurrent_queries,
542 );
543
544 #[cfg(feature = "enterprise")]
545 let builder = builder.with_extension_range_provider_factory(
546 self.extension_range_provider_factory.take(),
547 );
548
549 builder.try_build().await.context(BuildMitoEngineSnafu)?
550 }
551 DatanodeWalConfig::Kafka(kafka_config) => {
552 if kafka_config.create_index && opts.node_id.is_none() {
553 warn!("The WAL index creation only available in distributed mode.")
554 }
555 let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
556 {
557 let operator = new_object_store_without_cache(
558 &opts.storage.store,
559 &opts.storage.data_home,
560 )
561 .await?;
562 let path = default_index_file(opts.node_id.unwrap());
563 Some(Self::build_global_index_collector(
564 kafka_config.dump_index_interval,
565 operator,
566 path,
567 ))
568 } else {
569 None
570 };
571
572 let log_store =
573 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
574 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
575 let builder = MitoEngineBuilder::new(
576 &opts.storage.data_home,
577 config,
578 log_store,
579 object_store_manager,
580 schema_metadata_manager,
581 file_ref_manager,
582 partition_expr_fetcher,
583 plugins,
584 opts.max_concurrent_queries,
585 );
586
587 #[cfg(feature = "enterprise")]
588 let builder = builder.with_extension_range_provider_factory(
589 self.extension_range_provider_factory.take(),
590 );
591
592 builder.try_build().await.context(BuildMitoEngineSnafu)?
593 }
594 DatanodeWalConfig::Noop => {
595 let log_store = Arc::new(NoopLogStore);
596
597 let builder = MitoEngineBuilder::new(
598 &opts.storage.data_home,
599 config,
600 log_store,
601 object_store_manager,
602 schema_metadata_manager,
603 file_ref_manager,
604 partition_expr_fetcher.clone(),
605 plugins,
606 opts.max_concurrent_queries,
607 );
608
609 #[cfg(feature = "enterprise")]
610 let builder = builder.with_extension_range_provider_factory(
611 self.extension_range_provider_factory.take(),
612 );
613
614 builder.try_build().await.context(BuildMitoEngineSnafu)?
615 }
616 };
617 Ok(mito_engine)
618 }
619
620 async fn build_raft_engine_log_store(
622 data_home: &str,
623 config: &RaftEngineConfig,
624 ) -> Result<Arc<RaftEngineLogStore>> {
625 let data_home = normalize_dir(data_home);
626 let wal_dir = match &config.dir {
627 Some(dir) => dir.clone(),
628 None => format!("{}{WAL_DIR}", data_home),
629 };
630
631 fs::create_dir_all(Path::new(&wal_dir))
633 .await
634 .context(CreateDirSnafu { dir: &wal_dir })?;
635 info!(
636 "Creating raft-engine logstore with config: {:?} and storage path: {}",
637 config, &wal_dir
638 );
639 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
640 .await
641 .map_err(Box::new)
642 .context(OpenLogStoreSnafu)?;
643
644 Ok(Arc::new(logstore))
645 }
646
647 async fn build_kafka_log_store(
649 config: &DatanodeKafkaConfig,
650 global_index_collector: Option<GlobalIndexCollector>,
651 ) -> Result<Arc<KafkaLogStore>> {
652 KafkaLogStore::try_new(config, global_index_collector)
653 .await
654 .map_err(Box::new)
655 .context(OpenLogStoreSnafu)
656 .map(Arc::new)
657 }
658
659 fn build_global_index_collector(
661 dump_index_interval: Duration,
662 operator: object_store::ObjectStore,
663 path: String,
664 ) -> GlobalIndexCollector {
665 GlobalIndexCollector::new(dump_index_interval, operator, path)
666 }
667}
668
669async fn open_all_regions(
671 region_server: RegionServer,
672 region_open_requests: RegionOpenRequests,
673 open_with_writable: bool,
674 init_regions_parallelism: usize,
675 ignore_nonexistent_region: bool,
676) -> Result<()> {
677 let RegionOpenRequests {
678 leader_regions,
679 #[cfg(feature = "enterprise")]
680 follower_regions,
681 } = region_open_requests;
682
683 let leader_region_num = leader_regions.len();
684 info!("going to open {} region(s)", leader_region_num);
685 let now = Instant::now();
686 let open_regions = region_server
687 .handle_batch_open_requests(
688 init_regions_parallelism,
689 leader_regions,
690 ignore_nonexistent_region,
691 )
692 .await?;
693 info!(
694 "Opened {} regions in {:?}",
695 open_regions.len(),
696 now.elapsed()
697 );
698 if !ignore_nonexistent_region {
699 ensure!(
700 open_regions.len() == leader_region_num,
701 error::UnexpectedSnafu {
702 violated: format!(
703 "Expected to open {} of regions, only {} of regions has opened",
704 leader_region_num,
705 open_regions.len()
706 )
707 }
708 );
709 } else if open_regions.len() != leader_region_num {
710 warn!(
711 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
712 leader_region_num,
713 open_regions.len()
714 );
715 }
716
717 for region_id in open_regions {
718 if open_with_writable {
719 let res = region_server.set_region_role(region_id, RegionRole::Leader);
720 match res {
721 Ok(_) => {
722 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
724 .set_region_role_state_gracefully(
725 region_id,
726 SettableRegionRoleState::Leader,
727 )
728 .await?
729 {
730 error!(err; "failed to convert region {region_id} to leader");
731 }
732 }
733 Err(e) => {
734 error!(e; "failed to convert region {region_id} to leader");
735 }
736 }
737 }
738 }
739
740 #[cfg(feature = "enterprise")]
741 if !follower_regions.is_empty() {
742 use tokio::time::Instant;
743
744 let follower_region_num = follower_regions.len();
745 info!("going to open {} follower region(s)", follower_region_num);
746
747 let now = Instant::now();
748 let open_regions = region_server
749 .handle_batch_open_requests(
750 init_regions_parallelism,
751 follower_regions,
752 ignore_nonexistent_region,
753 )
754 .await?;
755 info!(
756 "Opened {} follower regions in {:?}",
757 open_regions.len(),
758 now.elapsed()
759 );
760
761 if !ignore_nonexistent_region {
762 ensure!(
763 open_regions.len() == follower_region_num,
764 error::UnexpectedSnafu {
765 violated: format!(
766 "Expected to open {} of follower regions, only {} of regions has opened",
767 follower_region_num,
768 open_regions.len()
769 )
770 }
771 );
772 } else if open_regions.len() != follower_region_num {
773 warn!(
774 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
775 follower_region_num,
776 open_regions.len()
777 );
778 }
779 }
780
781 info!("all regions are opened");
782
783 Ok(())
784}
785
786#[cfg(test)]
787mod tests {
788 use std::assert_matches::assert_matches;
789 use std::collections::{BTreeMap, HashMap};
790 use std::sync::Arc;
791
792 use cache::build_datanode_cache_registry;
793 use common_base::Plugins;
794 use common_meta::cache::LayeredCacheRegistryBuilder;
795 use common_meta::key::RegionRoleSet;
796 use common_meta::key::datanode_table::DatanodeTableManager;
797 use common_meta::kv_backend::KvBackendRef;
798 use common_meta::kv_backend::memory::MemoryKvBackend;
799 use mito2::engine::MITO_ENGINE_NAME;
800 use store_api::region_request::RegionRequest;
801 use store_api::storage::RegionId;
802
803 use crate::config::DatanodeOptions;
804 use crate::datanode::DatanodeBuilder;
805 use crate::tests::{MockRegionEngine, mock_region_server};
806
807 async fn setup_table_datanode(kv: &KvBackendRef) {
808 let mgr = DatanodeTableManager::new(kv.clone());
809 let txn = mgr
810 .build_create_txn(
811 1028,
812 MITO_ENGINE_NAME,
813 "foo/bar/weny",
814 HashMap::from([("foo".to_string(), "bar".to_string())]),
815 HashMap::default(),
816 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
817 )
818 .unwrap();
819
820 let r = kv.txn(txn).await.unwrap();
821 assert!(r.succeeded);
822 }
823
824 #[tokio::test]
825 async fn test_initialize_region_server() {
826 common_telemetry::init_default_ut_logging();
827 let mut mock_region_server = mock_region_server();
828 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
829
830 mock_region_server.register_engine(mock_region.clone());
831
832 let kv_backend = Arc::new(MemoryKvBackend::new());
833 let layered_cache_registry = Arc::new(
834 LayeredCacheRegistryBuilder::default()
835 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
836 .build(),
837 );
838
839 let mut builder = DatanodeBuilder::new(
840 DatanodeOptions {
841 node_id: Some(0),
842 ..Default::default()
843 },
844 Plugins::default(),
845 kv_backend.clone(),
846 );
847 builder.with_cache_registry(layered_cache_registry);
848 setup_table_datanode(&(kv_backend as _)).await;
849
850 builder
851 .initialize_region_server(&mock_region_server, false)
852 .await
853 .unwrap();
854
855 for i in 0..3 {
856 let (region_id, req) = mock_region_handler.recv().await.unwrap();
857 assert_eq!(region_id, RegionId::new(1028, i));
858 if let RegionRequest::Open(req) = req {
859 assert_eq!(
860 req.options,
861 HashMap::from([("foo".to_string(), "bar".to_string())])
862 )
863 } else {
864 unreachable!()
865 }
866 }
867
868 assert_matches!(
869 mock_region_handler.try_recv(),
870 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
871 );
872 }
873}