1use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56 RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63 self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64 GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65 ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68 NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69 new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78pub struct Datanode {
80 services: ServerHandlers,
81 heartbeat_task: Option<HeartbeatTask>,
82 region_event_receiver: Option<RegionServerEventReceiver>,
83 region_server: RegionServer,
84 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85 leases_notifier: Option<Arc<Notify>>,
86 plugins: Plugins,
87}
88
89impl Datanode {
90 pub async fn start(&mut self) -> Result<()> {
91 info!("Starting datanode instance...");
92
93 self.start_heartbeat().await?;
94 self.wait_coordinated().await;
95
96 self.start_telemetry();
97
98 self.services.start_all().await.context(StartServerSnafu)
99 }
100
101 pub fn server_handlers(&self) -> &ServerHandlers {
102 &self.services
103 }
104
105 pub fn start_telemetry(&self) {
106 if let Err(e) = self.greptimedb_telemetry_task.start() {
107 warn!(e; "Failed to start telemetry task!");
108 }
109 }
110
111 pub async fn start_heartbeat(&mut self) -> Result<()> {
112 if let Some(task) = &self.heartbeat_task {
113 let receiver = self.region_event_receiver.take().unwrap();
115
116 task.start(receiver, self.leases_notifier.clone()).await?;
117 }
118 Ok(())
119 }
120
121 pub async fn wait_coordinated(&mut self) {
123 if let Some(notifier) = self.leases_notifier.take() {
124 notifier.notified().await;
125 }
126 }
127
128 pub fn setup_services(&mut self, services: ServerHandlers) {
129 self.services = services;
130 }
131
132 pub async fn shutdown(&mut self) -> Result<()> {
133 self.services
134 .shutdown_all()
135 .await
136 .context(ShutdownServerSnafu)?;
137
138 let _ = self.greptimedb_telemetry_task.stop().await;
139 if let Some(heartbeat_task) = &self.heartbeat_task {
140 heartbeat_task
141 .close()
142 .map_err(BoxedError::new)
143 .context(ShutdownInstanceSnafu)?;
144 }
145 self.region_server.stop().await?;
146 Ok(())
147 }
148
149 pub fn region_server(&self) -> RegionServer {
150 self.region_server.clone()
151 }
152
153 pub fn plugins(&self) -> Plugins {
154 self.plugins.clone()
155 }
156}
157
158pub struct DatanodeBuilder {
159 opts: DatanodeOptions,
160 table_provider_factory: Option<TableProviderFactoryRef>,
161 plugins: Plugins,
162 meta_client: Option<MetaClientRef>,
163 kv_backend: KvBackendRef,
164 cache_registry: Option<Arc<LayeredCacheRegistry>>,
165 topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166 open_regions_writable_override: Option<bool>,
167 #[cfg(feature = "enterprise")]
168 extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
169}
170
171impl DatanodeBuilder {
172 pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
173 Self {
174 opts,
175 table_provider_factory: None,
176 plugins,
177 meta_client: None,
178 kv_backend,
179 cache_registry: None,
180 open_regions_writable_override: None,
181 #[cfg(feature = "enterprise")]
182 extension_range_provider_factory: None,
183 topic_stats_reporter: None,
184 }
185 }
186
187 pub fn options(&self) -> &DatanodeOptions {
188 &self.opts
189 }
190
191 pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
192 self.meta_client = Some(client);
193 self
194 }
195
196 pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
197 self.cache_registry = Some(registry);
198 self
199 }
200
201 pub fn kv_backend(&self) -> &KvBackendRef {
202 &self.kv_backend
203 }
204
205 pub fn meta_client(&self) -> Option<&MetaClientRef> {
206 self.meta_client.as_ref()
207 }
208
209 pub fn cache_registry(&self) -> Option<&Arc<LayeredCacheRegistry>> {
210 self.cache_registry.as_ref()
211 }
212
213 pub fn set_plugins(&mut self, plugins: Plugins) {
214 self.plugins = plugins;
215 }
216
217 pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
218 self.table_provider_factory = Some(factory);
219 self
220 }
221
222 pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self {
232 self.open_regions_writable_override = Some(writable);
233 self
234 }
235
236 #[cfg(feature = "enterprise")]
237 pub fn with_extension_range_provider(
238 &mut self,
239 extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
240 ) -> &mut Self {
241 self.extension_range_provider_factory = Some(extension_range_provider_factory);
242 self
243 }
244
245 pub async fn build(mut self) -> Result<Datanode> {
246 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
247 set_default_prefix(self.opts.default_column_prefix.as_deref())
248 .map_err(BoxedError::new)
249 .context(BuildDatanodeSnafu)?;
250
251 let meta_client = self.meta_client.take();
252
253 let controlled_by_metasrv = meta_client.is_some();
257
258 let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
260 let (tx, rx) = new_region_server_event_channel();
261 (Box::new(tx) as _, Some(rx))
262 } else {
263 (Box::new(NoopRegionServerEventListener) as _, None)
264 };
265
266 let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
267 let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
268 let table_id_schema_cache: TableSchemaCacheRef =
269 cache_registry.get().context(MissingCacheSnafu)?;
270
271 let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
272 table_id_schema_cache,
273 schema_cache,
274 ));
275
276 let gc_enabled = self.opts.region_engine.iter().any(|engine| {
277 if let RegionEngineConfig::Mito(config) = engine {
278 config.gc.enable
279 } else {
280 false
281 }
282 });
283
284 let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
285 Some(node_id),
286 gc_enabled,
287 ));
288 let region_server = self
289 .new_region_server(
290 schema_metadata_manager,
291 region_event_listener,
292 file_ref_manager,
293 )
294 .await?;
295
296 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
298 let is_recovery_mode = runtime_switch_manager
299 .recovery_mode()
300 .await
301 .context(GetMetadataSnafu)?;
302
303 let region_open_requests =
304 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
305 let open_with_writable = self
306 .open_regions_writable_override
307 .unwrap_or(!controlled_by_metasrv);
308 let open_all_regions = open_all_regions(
309 region_server.clone(),
310 region_open_requests,
311 open_with_writable,
312 self.opts.init_regions_parallelism,
313 is_recovery_mode,
315 );
316
317 if self.opts.init_regions_in_background {
318 common_runtime::spawn_global(async move {
320 if let Err(err) = open_all_regions.await {
321 error!(err; "Failed to open regions during the startup.");
322 }
323 });
324 } else {
325 open_all_regions.await?;
326 }
327
328 let heartbeat_task = if let Some(meta_client) = meta_client {
329 let task = self
330 .create_heartbeat_task(®ion_server, meta_client, cache_registry)
331 .await?;
332 Some(task)
333 } else {
334 None
335 };
336
337 let is_standalone = heartbeat_task.is_none();
338 let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
339 Some(self.opts.storage.data_home.clone()),
340 is_standalone && self.opts.enable_telemetry,
341 )
342 .await;
343
344 let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
345 Some(Arc::new(Notify::new()))
346 } else {
347 None
348 };
349
350 Ok(Datanode {
351 services: ServerHandlers::default(),
352 heartbeat_task,
353 region_server,
354 greptimedb_telemetry_task,
355 region_event_receiver,
356 leases_notifier,
357 plugins: self.plugins.clone(),
358 })
359 }
360
361 async fn create_heartbeat_task(
362 &self,
363 region_server: &RegionServer,
364 meta_client: MetaClientRef,
365 cache_invalidator: CacheInvalidatorRef,
366 ) -> Result<HeartbeatTask> {
367 let stat = {
368 let mut stat = ResourceStatImpl::default();
369 stat.start_collect_cpu_usage();
370 Arc::new(stat)
371 };
372
373 HeartbeatTask::try_new(
374 &self.opts,
375 region_server.clone(),
376 meta_client,
377 self.kv_backend.clone(),
378 cache_invalidator,
379 self.plugins.clone(),
380 stat,
381 )
382 .await
383 }
384
385 pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
387 let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
388 let default_name = cfg.store.config_name();
389 let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
390 for store in &cfg.providers {
391 object_store_manager.add(
392 store.config_name(),
393 store::new_object_store(store.clone(), &cfg.data_home).await?,
394 );
395 }
396 Ok(Arc::new(object_store_manager))
397 }
398
399 #[cfg(test)]
400 async fn initialize_region_server(
402 &self,
403 region_server: &RegionServer,
404 open_with_writable: bool,
405 ) -> Result<()> {
406 let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
407
408 let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
410 let is_recovery_mode = runtime_switch_manager
411 .recovery_mode()
412 .await
413 .context(GetMetadataSnafu)?;
414 let region_open_requests =
415 build_region_open_requests(node_id, self.kv_backend.clone()).await?;
416
417 open_all_regions(
418 region_server.clone(),
419 region_open_requests,
420 open_with_writable,
421 self.opts.init_regions_parallelism,
422 is_recovery_mode,
423 )
424 .await
425 }
426
427 async fn new_region_server(
428 &mut self,
429 schema_metadata_manager: SchemaMetadataManagerRef,
430 event_listener: RegionServerEventListenerRef,
431 file_ref_manager: FileReferenceManagerRef,
432 ) -> Result<RegionServer> {
433 let opts: &DatanodeOptions = &self.opts;
434
435 let query_engine_factory = QueryEngineFactory::new_with_plugins(
436 DummyCatalogManager::arc(),
438 None,
439 None,
440 None,
441 None,
442 None,
443 false,
444 self.plugins.clone(),
445 opts.query.clone(),
446 );
447 let query_engine = query_engine_factory.query_engine();
448
449 let table_provider_factory = self
450 .table_provider_factory
451 .clone()
452 .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
453 let mut region_server = RegionServer::with_table_provider(
454 query_engine,
455 common_runtime::global_runtime(),
456 event_listener,
457 table_provider_factory,
458 opts.max_concurrent_queries,
459 opts.concurrent_query_limiter_timeout,
460 opts.grpc.flight_compression,
461 );
462 region_server.install_remote_dyn_filter_receiver_injector(&self.plugins);
463
464 let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
465 let engines = self
466 .build_store_engines(
467 object_store_manager,
468 schema_metadata_manager,
469 file_ref_manager,
470 self.plugins.clone(),
471 )
472 .await?;
473 for engine in engines {
474 region_server.register_engine(engine);
475 }
476 if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
477 region_server.set_topic_stats_reporter(topic_stats_reporter);
478 }
479
480 Ok(region_server)
481 }
482
483 async fn build_store_engines(
487 &mut self,
488 object_store_manager: ObjectStoreManagerRef,
489 schema_metadata_manager: SchemaMetadataManagerRef,
490 file_ref_manager: FileReferenceManagerRef,
491 plugins: Plugins,
492 ) -> Result<Vec<RegionEngineRef>> {
493 let mut metric_engine_config = metric_engine::config::EngineConfig::default();
494 let mut mito_engine_config = MitoConfig::default();
495 let mut file_engine_config = file_engine::config::EngineConfig::default();
496
497 for engine in &self.opts.region_engine {
498 match engine {
499 RegionEngineConfig::Mito(config) => {
500 mito_engine_config = config.clone();
501 }
502 RegionEngineConfig::File(config) => {
503 file_engine_config = config.clone();
504 }
505 RegionEngineConfig::Metric(metric_config) => {
506 metric_engine_config = metric_config.clone();
507 }
508 }
509 }
510
511 let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
513 let mito_engine = self
514 .build_mito_engine(
515 object_store_manager.clone(),
516 mito_engine_config,
517 schema_metadata_manager.clone(),
518 file_ref_manager.clone(),
519 fetcher.clone(),
520 plugins.clone(),
521 )
522 .await?;
523
524 let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
525 .context(BuildMetricEngineSnafu)?;
526
527 let file_engine = FileRegionEngine::new(
528 file_engine_config,
529 object_store_manager.default_object_store().clone(), );
531
532 Ok(vec![
533 Arc::new(mito_engine) as _,
534 Arc::new(metric_engine) as _,
535 Arc::new(file_engine) as _,
536 ])
537 }
538
539 async fn build_mito_engine(
541 &mut self,
542 object_store_manager: ObjectStoreManagerRef,
543 mut config: MitoConfig,
544 schema_metadata_manager: SchemaMetadataManagerRef,
545 file_ref_manager: FileReferenceManagerRef,
546 partition_expr_fetcher: PartitionExprFetcherRef,
547 plugins: Plugins,
548 ) -> Result<MitoEngine> {
549 let opts = &self.opts;
550 if opts.storage.is_object_storage() {
551 config.enable_write_cache = true;
553 info!("Configured 'enable_write_cache=true' for mito engine.");
554 }
555
556 let mito_engine = match &opts.wal {
557 DatanodeWalConfig::RaftEngine(raft_engine_config) => {
558 let log_store =
559 Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
560 .await?;
561
562 let builder = MitoEngineBuilder::new(
563 &opts.storage.data_home,
564 config,
565 log_store,
566 object_store_manager,
567 schema_metadata_manager,
568 file_ref_manager,
569 partition_expr_fetcher.clone(),
570 plugins,
571 );
572
573 #[cfg(feature = "enterprise")]
574 let builder = builder.with_extension_range_provider_factory(
575 self.extension_range_provider_factory.take(),
576 );
577
578 builder.try_build().await.context(BuildMitoEngineSnafu)?
579 }
580 DatanodeWalConfig::Kafka(kafka_config) => {
581 if kafka_config.create_index && opts.node_id.is_none() {
582 warn!("The WAL index creation only available in distributed mode.")
583 }
584 let global_index_collector = if kafka_config.create_index
585 && let Some(node_id) = opts.node_id
586 {
587 let operator = new_object_store_without_cache(
588 &opts.storage.store,
589 &opts.storage.data_home,
590 )
591 .await?;
592 let path = default_index_file(node_id);
593 Some(Self::build_global_index_collector(
594 kafka_config.dump_index_interval,
595 operator,
596 path,
597 ))
598 } else {
599 None
600 };
601
602 let log_store =
603 Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
604 self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
605 let builder = MitoEngineBuilder::new(
606 &opts.storage.data_home,
607 config,
608 log_store,
609 object_store_manager,
610 schema_metadata_manager,
611 file_ref_manager,
612 partition_expr_fetcher,
613 plugins,
614 );
615
616 #[cfg(feature = "enterprise")]
617 let builder = builder.with_extension_range_provider_factory(
618 self.extension_range_provider_factory.take(),
619 );
620
621 builder.try_build().await.context(BuildMitoEngineSnafu)?
622 }
623 DatanodeWalConfig::Noop => {
624 let log_store = Arc::new(NoopLogStore);
625
626 let builder = MitoEngineBuilder::new(
627 &opts.storage.data_home,
628 config,
629 log_store,
630 object_store_manager,
631 schema_metadata_manager,
632 file_ref_manager,
633 partition_expr_fetcher.clone(),
634 plugins,
635 );
636
637 #[cfg(feature = "enterprise")]
638 let builder = builder.with_extension_range_provider_factory(
639 self.extension_range_provider_factory.take(),
640 );
641
642 builder.try_build().await.context(BuildMitoEngineSnafu)?
643 }
644 };
645 Ok(mito_engine)
646 }
647
648 async fn build_raft_engine_log_store(
650 data_home: &str,
651 config: &RaftEngineConfig,
652 ) -> Result<Arc<RaftEngineLogStore>> {
653 let data_home = normalize_dir(data_home);
654 let wal_dir = match &config.dir {
655 Some(dir) => dir.clone(),
656 None => format!("{}{WAL_DIR}", data_home),
657 };
658
659 fs::create_dir_all(Path::new(&wal_dir))
661 .await
662 .context(CreateDirSnafu { dir: &wal_dir })?;
663 info!(
664 "Creating raft-engine logstore with config: {:?} and storage path: {}",
665 config, &wal_dir
666 );
667 let logstore = RaftEngineLogStore::try_new(wal_dir, config)
668 .await
669 .map_err(Box::new)
670 .context(OpenLogStoreSnafu)?;
671
672 Ok(Arc::new(logstore))
673 }
674
675 async fn build_kafka_log_store(
677 config: &DatanodeKafkaConfig,
678 global_index_collector: Option<GlobalIndexCollector>,
679 ) -> Result<Arc<KafkaLogStore>> {
680 KafkaLogStore::try_new(config, global_index_collector)
681 .await
682 .map_err(Box::new)
683 .context(OpenLogStoreSnafu)
684 .map(Arc::new)
685 }
686
687 fn build_global_index_collector(
689 dump_index_interval: Duration,
690 operator: object_store::ObjectStore,
691 path: String,
692 ) -> GlobalIndexCollector {
693 GlobalIndexCollector::new(dump_index_interval, operator, path)
694 }
695}
696
697async fn open_all_regions(
699 region_server: RegionServer,
700 region_open_requests: RegionOpenRequests,
701 open_with_writable: bool,
702 init_regions_parallelism: usize,
703 ignore_nonexistent_region: bool,
704) -> Result<()> {
705 let RegionOpenRequests {
706 leader_regions,
707 #[cfg(feature = "enterprise")]
708 follower_regions,
709 } = region_open_requests;
710
711 let leader_region_num = leader_regions.len();
712 info!("going to open {} region(s)", leader_region_num);
713 let now = Instant::now();
714 let open_regions = region_server
715 .handle_batch_open_requests(
716 init_regions_parallelism,
717 leader_regions,
718 ignore_nonexistent_region,
719 )
720 .await?;
721 info!(
722 "Opened {} regions in {:?}",
723 open_regions.len(),
724 now.elapsed()
725 );
726 if !ignore_nonexistent_region {
727 ensure!(
728 open_regions.len() == leader_region_num,
729 error::UnexpectedSnafu {
730 violated: format!(
731 "Expected to open {} of regions, only {} of regions has opened",
732 leader_region_num,
733 open_regions.len()
734 )
735 }
736 );
737 } else if open_regions.len() != leader_region_num {
738 warn!(
739 "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
740 leader_region_num,
741 open_regions.len()
742 );
743 }
744
745 for region_id in open_regions {
746 if open_with_writable {
747 let res = region_server.set_region_role(region_id, RegionRole::Leader);
748 match res {
749 Ok(_) => {
750 if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
752 .set_region_role_state_gracefully(
753 region_id,
754 SettableRegionRoleState::Leader,
755 )
756 .await?
757 {
758 error!(err; "failed to convert region {region_id} to leader");
759 }
760 }
761 Err(e) => {
762 error!(e; "failed to convert region {region_id} to leader");
763 }
764 }
765 }
766 }
767
768 #[cfg(feature = "enterprise")]
769 if !follower_regions.is_empty() {
770 use tokio::time::Instant;
771
772 let follower_region_num = follower_regions.len();
773 info!("going to open {} follower region(s)", follower_region_num);
774
775 let now = Instant::now();
776 let open_regions = region_server
777 .handle_batch_open_requests(
778 init_regions_parallelism,
779 follower_regions,
780 ignore_nonexistent_region,
781 )
782 .await?;
783 info!(
784 "Opened {} follower regions in {:?}",
785 open_regions.len(),
786 now.elapsed()
787 );
788
789 if !ignore_nonexistent_region {
790 ensure!(
791 open_regions.len() == follower_region_num,
792 error::UnexpectedSnafu {
793 violated: format!(
794 "Expected to open {} of follower regions, only {} of regions has opened",
795 follower_region_num,
796 open_regions.len()
797 )
798 }
799 );
800 } else if open_regions.len() != follower_region_num {
801 warn!(
802 "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
803 follower_region_num,
804 open_regions.len()
805 );
806 }
807 }
808
809 info!("all regions are opened");
810
811 Ok(())
812}
813
814#[cfg(test)]
815mod tests {
816 use std::assert_matches;
817 use std::collections::{BTreeMap, HashMap};
818 use std::sync::Arc;
819
820 use cache::build_datanode_cache_registry;
821 use common_base::Plugins;
822 use common_meta::cache::LayeredCacheRegistryBuilder;
823 use common_meta::key::RegionRoleSet;
824 use common_meta::key::datanode_table::DatanodeTableManager;
825 use common_meta::kv_backend::KvBackendRef;
826 use common_meta::kv_backend::memory::MemoryKvBackend;
827 use mito2::engine::MITO_ENGINE_NAME;
828 use store_api::region_request::RegionRequest;
829 use store_api::storage::RegionId;
830
831 use crate::config::DatanodeOptions;
832 use crate::datanode::DatanodeBuilder;
833 use crate::tests::{MockRegionEngine, mock_region_server};
834
835 async fn setup_table_datanode(kv: &KvBackendRef) {
836 let mgr = DatanodeTableManager::new(kv.clone());
837 let txn = mgr
838 .build_create_txn(
839 1028,
840 MITO_ENGINE_NAME,
841 "foo/bar/weny",
842 HashMap::from([("foo".to_string(), "bar".to_string())]),
843 HashMap::default(),
844 BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
845 )
846 .unwrap();
847
848 let r = kv.txn(txn).await.unwrap();
849 assert!(r.succeeded);
850 }
851
852 #[tokio::test]
853 async fn test_initialize_region_server() {
854 common_telemetry::init_default_ut_logging();
855 let mut mock_region_server = mock_region_server();
856 let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
857
858 mock_region_server.register_engine(mock_region.clone());
859
860 let kv_backend = Arc::new(MemoryKvBackend::new());
861 let layered_cache_registry = Arc::new(
862 LayeredCacheRegistryBuilder::default()
863 .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
864 .build(),
865 );
866
867 let mut builder = DatanodeBuilder::new(
868 DatanodeOptions {
869 node_id: Some(0),
870 ..Default::default()
871 },
872 Plugins::default(),
873 kv_backend.clone(),
874 );
875 builder.with_cache_registry(layered_cache_registry);
876 setup_table_datanode(&(kv_backend as _)).await;
877
878 builder
879 .initialize_region_server(&mock_region_server, false)
880 .await
881 .unwrap();
882
883 for i in 0..3 {
884 let (region_id, req) = mock_region_handler.recv().await.unwrap();
885 assert_eq!(region_id, RegionId::new(1028, i));
886 if let RegionRequest::Open(req) = req {
887 assert_eq!(
888 req.options,
889 HashMap::from([("foo".to_string(), "bar".to_string())])
890 )
891 } else {
892 unreachable!()
893 }
894 }
895
896 assert_matches!(
897 mock_region_handler.try_recv(),
898 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
899 );
900 }
901}