1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
36use futures::{StreamExt, TryStreamExt};
37use snafu::{ResultExt, ensure};
38use store_api::storage::RegionId;
39use tokio::sync::mpsc::{Receiver, Sender};
40use tokio::sync::oneshot;
41use tokio::time::{MissedTickBehavior, interval, interval_at};
42
43use crate::discovery::utils::accept_ingest_workload;
44use crate::error::{self, Result};
45use crate::failure_detector::PhiAccrualFailureDetectorOptions;
46use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
47use crate::procedure::region_migration::manager::{
48 RegionMigrationManagerRef, RegionMigrationTriggerReason,
49};
50use crate::procedure::region_migration::{
51 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55
56#[derive(Debug)]
60pub(crate) struct DatanodeHeartbeat {
61 datanode_id: DatanodeId,
62 regions: Vec<RegionId>,
64 timestamp: i64,
65}
66
67impl From<&Stat> for DatanodeHeartbeat {
68 fn from(value: &Stat) -> Self {
69 DatanodeHeartbeat {
70 datanode_id: value.id,
71 regions: value.region_stats.iter().map(|x| x.id).collect(),
72 timestamp: value.timestamp_millis,
73 }
74 }
75}
76
77pub(crate) enum Event {
93 Tick,
94 InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
95 RegisterFailureDetectors(Vec<DetectingRegion>),
96 DeregisterFailureDetectors(Vec<DetectingRegion>),
97 HeartbeatArrived(DatanodeHeartbeat),
98 Clear,
99 #[cfg(test)]
100 Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
101}
102
103#[cfg(test)]
104impl Event {
105 pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
106 match self {
107 Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
108 _ => unreachable!(),
109 }
110 }
111}
112
113impl Debug for Event {
114 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
115 match self {
116 Self::Tick => write!(f, "Tick"),
117 Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
118 Self::Clear => write!(f, "Clear"),
119 Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
120 Self::RegisterFailureDetectors(arg0) => f
121 .debug_tuple("RegisterFailureDetectors")
122 .field(arg0)
123 .finish(),
124 Self::DeregisterFailureDetectors(arg0) => f
125 .debug_tuple("DeregisterFailureDetectors")
126 .field(arg0)
127 .finish(),
128 #[cfg(test)]
129 Self::Dump(_) => f.debug_struct("Dump").finish(),
130 }
131 }
132}
133
134pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
135
136#[derive(Debug)]
138pub struct RegionSupervisorTicker {
139 tick_handle: Mutex<Option<JoinHandle<()>>>,
141
142 tick_interval: Duration,
144
145 initialization_delay: Duration,
147
148 initialization_retry_period: Duration,
150
151 sender: Sender<Event>,
153}
154
155#[async_trait]
156impl LeadershipChangeListener for RegionSupervisorTicker {
157 fn name(&self) -> &'static str {
158 "RegionSupervisorTicker"
159 }
160
161 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
162 self.start();
163 Ok(())
164 }
165
166 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
167 self.stop();
168 Ok(())
169 }
170}
171
172impl RegionSupervisorTicker {
173 pub(crate) fn new(
174 tick_interval: Duration,
175 initialization_delay: Duration,
176 initialization_retry_period: Duration,
177 sender: Sender<Event>,
178 ) -> Self {
179 info!(
180 "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
181 tick_interval, initialization_delay, initialization_retry_period
182 );
183 Self {
184 tick_handle: Mutex::new(None),
185 tick_interval,
186 initialization_delay,
187 initialization_retry_period,
188 sender,
189 }
190 }
191
192 pub fn start(&self) {
194 let mut handle = self.tick_handle.lock().unwrap();
195 if handle.is_none() {
196 let sender = self.sender.clone();
197 let tick_interval = self.tick_interval;
198 let initialization_delay = self.initialization_delay;
199
200 let mut initialization_interval = interval_at(
201 tokio::time::Instant::now() + initialization_delay,
202 self.initialization_retry_period,
203 );
204 initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
205 common_runtime::spawn_global(async move {
206 loop {
207 initialization_interval.tick().await;
208 let (tx, rx) = oneshot::channel();
209 if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
210 info!(
211 "EventReceiver is dropped, region failure detectors initialization loop is stopped"
212 );
213 break;
214 }
215 if rx.await.is_ok() {
216 info!("All region failure detectors are initialized.");
217 break;
218 }
219 }
220 });
221
222 let sender = self.sender.clone();
223 let ticker_loop = tokio::spawn(async move {
224 let mut tick_interval = interval(tick_interval);
225 tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
226
227 if let Err(err) = sender.send(Event::Clear).await {
228 warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
229 return;
230 }
231 loop {
232 tick_interval.tick().await;
233 if sender.send(Event::Tick).await.is_err() {
234 info!("EventReceiver is dropped, tick loop is stopped");
235 break;
236 }
237 }
238 });
239 *handle = Some(ticker_loop);
240 }
241 }
242
243 pub fn stop(&self) {
245 let handle = self.tick_handle.lock().unwrap().take();
246 if let Some(handle) = handle {
247 handle.abort();
248 info!("The tick loop is stopped.");
249 }
250 }
251}
252
253impl Drop for RegionSupervisorTicker {
254 fn drop(&mut self) {
255 self.stop();
256 }
257}
258
259pub type RegionSupervisorRef = Arc<RegionSupervisor>;
260
261pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
263pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
265
266pub enum RegionSupervisorSelector {
268 NaiveSelector(SelectorRef),
269 RegionStatAwareSelector(RegionStatAwareSelectorRef),
270}
271
272pub struct RegionSupervisor {
275 failure_detector: RegionFailureDetector,
277 failover_counts: HashMap<DetectingRegion, u32>,
279 receiver: Receiver<Event>,
281 selector_context: SelectorContext,
283 selector: RegionSupervisorSelector,
285 region_migration_manager: RegionMigrationManagerRef,
287 runtime_switch_manager: RuntimeSwitchManagerRef,
289 peer_resolver: PeerResolverRef,
291 kv_backend: KvBackendRef,
293}
294
295#[derive(Debug, Clone)]
297pub struct RegionFailureDetectorControl {
298 sender: Sender<Event>,
299}
300
301impl RegionFailureDetectorControl {
302 pub(crate) fn new(sender: Sender<Event>) -> Self {
303 Self { sender }
304 }
305}
306
307#[async_trait::async_trait]
308impl RegionFailureDetectorController for RegionFailureDetectorControl {
309 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
310 if let Err(err) = self
311 .sender
312 .send(Event::RegisterFailureDetectors(detecting_regions))
313 .await
314 {
315 error!(err; "RegionSupervisor has stop receiving heartbeat.");
316 }
317 }
318
319 async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
320 if let Err(err) = self
321 .sender
322 .send(Event::DeregisterFailureDetectors(detecting_regions))
323 .await
324 {
325 error!(err; "RegionSupervisor has stop receiving heartbeat.");
326 }
327 }
328}
329
330#[derive(Clone)]
332pub(crate) struct HeartbeatAcceptor {
333 sender: Sender<Event>,
334}
335
336impl HeartbeatAcceptor {
337 pub(crate) fn new(sender: Sender<Event>) -> Self {
338 Self { sender }
339 }
340
341 pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
343 if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
344 error!(err; "RegionSupervisor has stop receiving heartbeat.");
345 }
346 }
347}
348
349impl RegionSupervisor {
350 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
352 tokio::sync::mpsc::channel(1024)
353 }
354
355 #[allow(clippy::too_many_arguments)]
356 pub(crate) fn new(
357 event_receiver: Receiver<Event>,
358 options: PhiAccrualFailureDetectorOptions,
359 selector_context: SelectorContext,
360 selector: RegionSupervisorSelector,
361 region_migration_manager: RegionMigrationManagerRef,
362 runtime_switch_manager: RuntimeSwitchManagerRef,
363 peer_resolver: PeerResolverRef,
364 kv_backend: KvBackendRef,
365 ) -> Self {
366 Self {
367 failure_detector: RegionFailureDetector::new(options),
368 failover_counts: HashMap::new(),
369 receiver: event_receiver,
370 selector_context,
371 selector,
372 region_migration_manager,
373 runtime_switch_manager,
374 peer_resolver,
375 kv_backend,
376 }
377 }
378
379 pub(crate) async fn run(&mut self) {
381 while let Some(event) = self.receiver.recv().await {
382 match event {
383 Event::InitializeAllRegions(sender) => {
384 match self.is_maintenance_mode_enabled().await {
385 Ok(false) => {}
386 Ok(true) => {
387 warn!(
388 "Skipping initialize all regions since maintenance mode is enabled."
389 );
390 continue;
391 }
392 Err(err) => {
393 error!(err; "Failed to check maintenance mode during initialize all regions.");
394 continue;
395 }
396 }
397
398 if let Err(err) = self.initialize_all().await {
399 error!(err; "Failed to initialize all regions.");
400 } else {
401 let _ = sender.send(());
403 }
404 }
405 Event::Tick => {
406 let regions = self.detect_region_failure();
407 self.handle_region_failures(regions).await;
408 }
409 Event::RegisterFailureDetectors(detecting_regions) => {
410 self.register_failure_detectors(detecting_regions).await
411 }
412 Event::DeregisterFailureDetectors(detecting_regions) => {
413 self.deregister_failure_detectors(detecting_regions).await
414 }
415 Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
416 Event::Clear => self.clear(),
417 #[cfg(test)]
418 Event::Dump(sender) => {
419 let _ = sender.send(self.failure_detector.dump());
420 }
421 }
422 }
423 info!("RegionSupervisor is stopped!");
424 }
425
426 async fn initialize_all(&self) -> Result<()> {
427 let now = Instant::now();
428 let regions = self.regions();
429 let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
430 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
431 TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
432 })
433 .into_stream();
434
435 let mut stream = stream
436 .map_ok(|(_, value)| {
437 TableRouteValue::try_from_raw_value(&value)
438 .context(error::TableMetadataManagerSnafu)
439 })
440 .boxed();
441 let mut detecting_regions = Vec::new();
442 while let Some(route) = stream
443 .try_next()
444 .await
445 .context(error::TableMetadataManagerSnafu)?
446 {
447 let route = route?;
448 if !route.is_physical() {
449 continue;
450 }
451
452 let physical_table_route = route.into_physical_table_route();
453 physical_table_route
454 .region_routes
455 .iter()
456 .for_each(|region_route| {
457 if !regions.contains(®ion_route.region.id)
458 && let Some(leader_peer) = ®ion_route.leader_peer
459 {
460 detecting_regions.push((leader_peer.id, region_route.region.id));
461 }
462 });
463 }
464
465 let num_detecting_regions = detecting_regions.len();
466 if !detecting_regions.is_empty() {
467 self.register_failure_detectors(detecting_regions).await;
468 }
469
470 info!(
471 "Initialize {} region failure detectors, elapsed: {:?}",
472 num_detecting_regions,
473 now.elapsed()
474 );
475
476 Ok(())
477 }
478
479 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
480 let ts_millis = current_time_millis();
481 for region in detecting_regions {
482 self.failure_detector
484 .maybe_init_region_failure_detector(region, ts_millis);
485 }
486 }
487
488 async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
489 for region in detecting_regions {
490 self.failure_detector.remove(®ion);
491 self.failover_counts.remove(®ion);
492 }
493 }
494
495 async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
496 if regions.is_empty() {
497 return;
498 }
499 match self.is_maintenance_mode_enabled().await {
500 Ok(false) => {}
501 Ok(true) => {
502 warn!(
503 "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
504 regions
505 );
506 return;
507 }
508 Err(err) => {
509 error!(err; "Failed to check maintenance mode");
510 return;
511 }
512 }
513
514 let migrating_regions = regions
516 .extract_if(.., |(_, region_id)| {
517 self.region_migration_manager.tracker().contains(*region_id)
518 })
519 .collect::<Vec<_>>();
520
521 for (datanode_id, region_id) in migrating_regions {
522 debug!(
523 "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
524 );
525 }
526
527 if regions.is_empty() {
528 return;
530 }
531
532 let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
533 HashMap::with_capacity(regions.len());
534 for (datanode_id, region_id) in regions {
535 grouped_regions
536 .entry(datanode_id)
537 .or_default()
538 .push(region_id);
539 }
540
541 for (datanode_id, regions) in grouped_regions {
542 warn!(
543 "Detects region failures on datanode: {}, regions: {:?}",
544 datanode_id, regions
545 );
546 let failed_datanodes = [datanode_id];
550 match self
551 .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
552 .await
553 {
554 Ok(tasks) => {
555 for (task, count) in tasks {
556 let region_id = task.region_id;
557 let datanode_id = task.from_peer.id;
558 if let Err(err) = self.do_failover(task, count).await {
559 error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
560 }
561 }
562 }
563 Err(err) => error!(err; "Failed to generate failover tasks"),
564 }
565 }
566 }
567
568 pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
569 self.runtime_switch_manager
570 .maintenance_mode()
571 .await
572 .context(error::RuntimeSwitchManagerSnafu)
573 }
574
575 async fn select_peers(
576 &self,
577 from_peer_id: DatanodeId,
578 regions: &[RegionId],
579 failure_datanodes: &[DatanodeId],
580 ) -> Result<Vec<(RegionId, Peer)>> {
581 let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
582 match &self.selector {
583 RegionSupervisorSelector::NaiveSelector(selector) => {
584 let opt = SelectorOptions {
585 min_required_items: regions.len(),
586 allow_duplication: true,
587 exclude_peer_ids,
588 workload_filter: Some(accept_ingest_workload),
589 };
590 let peers = selector.select(&self.selector_context, opt).await?;
591 ensure!(
592 peers.len() == regions.len(),
593 error::NoEnoughAvailableNodeSnafu {
594 required: regions.len(),
595 available: peers.len(),
596 select_target: SelectTarget::Datanode,
597 }
598 );
599 let region_peers = regions
600 .iter()
601 .zip(peers)
602 .map(|(region_id, peer)| (*region_id, peer))
603 .collect::<Vec<_>>();
604
605 Ok(region_peers)
606 }
607 RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
608 let peers = selector
609 .select(
610 &self.selector_context,
611 from_peer_id,
612 regions,
613 exclude_peer_ids,
614 )
615 .await?;
616 ensure!(
617 peers.len() == regions.len(),
618 error::NoEnoughAvailableNodeSnafu {
619 required: regions.len(),
620 available: peers.len(),
621 select_target: SelectTarget::Datanode,
622 }
623 );
624
625 Ok(peers)
626 }
627 }
628 }
629
630 async fn generate_failover_tasks(
631 &mut self,
632 from_peer_id: DatanodeId,
633 regions: &[RegionId],
634 failed_datanodes: &[DatanodeId],
635 ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
636 let mut tasks = Vec::with_capacity(regions.len());
637 let from_peer = self
638 .peer_resolver
639 .datanode(from_peer_id)
640 .await
641 .ok()
642 .flatten()
643 .unwrap_or_else(|| Peer::empty(from_peer_id));
644
645 let region_peers = self
646 .select_peers(from_peer_id, regions, failed_datanodes)
647 .await?;
648
649 for (region_id, peer) in region_peers {
650 let count = *self
651 .failover_counts
652 .entry((from_peer_id, region_id))
653 .and_modify(|count| *count += 1)
654 .or_insert(1);
655 let task = RegionMigrationProcedureTask {
656 region_id,
657 from_peer: from_peer.clone(),
658 to_peer: peer,
659 timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
660 trigger_reason: RegionMigrationTriggerReason::Failover,
661 };
662 tasks.push((task, count));
663 }
664
665 Ok(tasks)
666 }
667
668 async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
669 let from_peer_id = task.from_peer.id;
670 let to_peer_id = task.to_peer.id;
671 let region_id = task.region_id;
672
673 info!(
674 "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
675 task.region_id, task.from_peer, task.to_peer, task.timeout, count
676 );
677
678 if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
679 return match err {
680 RegionMigrated { .. } => {
681 info!(
682 "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
683 to_peer_id, region_id, from_peer_id
684 );
685 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
686 .await;
687 Ok(())
688 }
689 MigrationRunning { .. } => {
691 info!(
692 "Another region migration is running, skip failover for region: {}, datanode: {}",
693 region_id, from_peer_id
694 );
695 Ok(())
696 }
697 TableRouteNotFound { .. } => {
698 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
699 .await;
700 info!(
701 "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
702 region_id, from_peer_id
703 );
704 Ok(())
705 }
706 LeaderPeerChanged { .. } => {
707 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
708 .await;
709 info!(
710 "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
711 region_id, from_peer_id
712 );
713 Ok(())
714 }
715 err => Err(err),
716 };
717 };
718
719 Ok(())
720 }
721
722 fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
724 self.failure_detector
725 .iter()
726 .filter_map(|e| {
727 if !e.failure_detector().is_available(current_time_millis()) {
734 Some(*e.region_ident())
735 } else {
736 None
737 }
738 })
739 .collect::<Vec<_>>()
740 }
741
742 fn regions(&self) -> HashSet<RegionId> {
744 self.failure_detector
745 .iter()
746 .map(|e| e.region_ident().1)
747 .collect::<HashSet<_>>()
748 }
749
750 fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
752 for region_id in heartbeat.regions {
753 let detecting_region = (heartbeat.datanode_id, region_id);
754 let mut detector = self
755 .failure_detector
756 .region_failure_detector(detecting_region);
757 detector.heartbeat(heartbeat.timestamp);
758 }
759 }
760
761 fn clear(&self) {
762 self.failure_detector.clear();
763 }
764}
765
766#[cfg(test)]
767pub(crate) mod tests {
768 use std::assert_matches::assert_matches;
769 use std::collections::HashMap;
770 use std::sync::{Arc, Mutex};
771 use std::time::Duration;
772
773 use common_meta::ddl::RegionFailureDetectorController;
774 use common_meta::ddl::test_util::{
775 test_create_logical_table_task, test_create_physical_table_task,
776 };
777 use common_meta::key::table_route::{
778 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
779 };
780 use common_meta::key::{TableMetadataManager, runtime_switch};
781 use common_meta::peer::Peer;
782 use common_meta::rpc::router::{Region, RegionRoute};
783 use common_meta::test_util::NoopPeerResolver;
784 use common_telemetry::info;
785 use common_time::util::current_time_millis;
786 use rand::Rng;
787 use store_api::storage::RegionId;
788 use tokio::sync::mpsc::Sender;
789 use tokio::sync::oneshot;
790 use tokio::time::sleep;
791
792 use super::RegionSupervisorSelector;
793 use crate::procedure::region_migration::manager::RegionMigrationManager;
794 use crate::procedure::region_migration::test_util::TestingEnv;
795 use crate::region::supervisor::{
796 DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
797 RegionSupervisorTicker,
798 };
799 use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
800
801 pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
802 let env = TestingEnv::new();
803 let selector_context = new_test_selector_context();
804 let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
805 let context_factory = env.context_factory();
806 let region_migration_manager = Arc::new(RegionMigrationManager::new(
807 env.procedure_manager().clone(),
808 context_factory,
809 ));
810 let runtime_switch_manager =
811 Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
812 let peer_resolver = Arc::new(NoopPeerResolver);
813 let (tx, rx) = RegionSupervisor::channel();
814 let kv_backend = env.kv_backend();
815
816 (
817 RegionSupervisor::new(
818 rx,
819 Default::default(),
820 selector_context,
821 RegionSupervisorSelector::NaiveSelector(selector),
822 region_migration_manager,
823 runtime_switch_manager,
824 peer_resolver,
825 kv_backend,
826 ),
827 tx,
828 )
829 }
830
831 #[tokio::test]
832 async fn test_heartbeat() {
833 let (mut supervisor, sender) = new_test_supervisor();
834 tokio::spawn(async move { supervisor.run().await });
835
836 sender
837 .send(Event::HeartbeatArrived(DatanodeHeartbeat {
838 datanode_id: 0,
839 regions: vec![RegionId::new(1, 1)],
840 timestamp: 100,
841 }))
842 .await
843 .unwrap();
844 let (tx, rx) = oneshot::channel();
845 sender.send(Event::Dump(tx)).await.unwrap();
846 let detector = rx.await.unwrap();
847 assert!(detector.contains(&(0, RegionId::new(1, 1))));
848
849 sender.send(Event::Clear).await.unwrap();
851 let (tx, rx) = oneshot::channel();
852 sender.send(Event::Dump(tx)).await.unwrap();
853 assert!(rx.await.unwrap().is_empty());
854
855 fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
856 let mut rng = rand::rng();
857 let start = current_time_millis();
858 (0..2000)
859 .map(|i| DatanodeHeartbeat {
860 timestamp: start + i * 1000 + rng.random_range(0..100),
861 datanode_id,
862 regions: region_ids
863 .iter()
864 .map(|number| RegionId::new(0, *number))
865 .collect(),
866 })
867 .collect::<Vec<_>>()
868 }
869
870 let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
871 let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
872 for heartbeat in heartbeats {
873 sender
874 .send(Event::HeartbeatArrived(heartbeat))
875 .await
876 .unwrap();
877 }
878
879 let (tx, rx) = oneshot::channel();
880 sender.send(Event::Dump(tx)).await.unwrap();
881 let detector = rx.await.unwrap();
882 assert_eq!(detector.len(), 3);
883
884 for e in detector.iter() {
885 let fd = e.failure_detector();
886 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
887 let start = last_heartbeat_time;
888
889 for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
891 let now = start + i * 1000;
892 assert_eq!(fd.phi(now), 0.0);
893 }
894
895 let now = start + acceptable_heartbeat_pause_millis + 1000;
898 assert!(fd.phi(now) < fd.threshold() as _);
899 let now = start + acceptable_heartbeat_pause_millis + 2000;
900 assert!(fd.phi(now) > fd.threshold() as _);
901 }
902 }
903
904 #[tokio::test]
905 async fn test_supervisor_ticker() {
906 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
907 let ticker = RegionSupervisorTicker {
908 tick_handle: Mutex::new(None),
909 tick_interval: Duration::from_millis(10),
910 initialization_delay: Duration::from_millis(100),
911 initialization_retry_period: Duration::from_millis(100),
912 sender: tx,
913 };
914 for _ in 0..2 {
916 ticker.start();
917 sleep(Duration::from_millis(100)).await;
918 ticker.stop();
919 assert!(!rx.is_empty());
920 while let Ok(event) = rx.try_recv() {
921 assert_matches!(
922 event,
923 Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
924 );
925 }
926 }
927 }
928
929 #[tokio::test]
930 async fn test_initialize_all_regions_event_handling() {
931 common_telemetry::init_default_ut_logging();
932 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
933 let ticker = RegionSupervisorTicker {
934 tick_handle: Mutex::new(None),
935 tick_interval: Duration::from_millis(1000),
936 initialization_delay: Duration::from_millis(50),
937 initialization_retry_period: Duration::from_millis(50),
938 sender: tx,
939 };
940 ticker.start();
941 sleep(Duration::from_millis(60)).await;
942 let handle = tokio::spawn(async move {
943 let mut counter = 0;
944 while let Some(event) = rx.recv().await {
945 if let Event::InitializeAllRegions(tx) = event {
946 if counter == 0 {
947 counter += 1;
949 continue;
950 }
951 tx.send(()).unwrap();
952 info!("Responded initialize all regions event");
953 break;
954 }
955 }
956 rx
957 });
958
959 let rx = handle.await.unwrap();
960 for _ in 0..3 {
961 sleep(Duration::from_millis(100)).await;
962 assert!(rx.is_empty());
963 }
964 }
965
966 #[tokio::test]
967 async fn test_initialize_all_regions() {
968 common_telemetry::init_default_ut_logging();
969 let (mut supervisor, sender) = new_test_supervisor();
970 let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
971
972 let table_id = 1024;
974 let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
975 create_physical_table_task.set_table_id(table_id);
976 let table_info = create_physical_table_task.table_info;
977 let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
978 region: Region {
979 id: RegionId::new(table_id, 0),
980 ..Default::default()
981 },
982 leader_peer: Some(Peer::empty(1)),
983 ..Default::default()
984 }]);
985 let table_route_value = TableRouteValue::Physical(table_route);
986 table_metadata_manager
987 .create_table_metadata(table_info, table_route_value, HashMap::new())
988 .await
989 .unwrap();
990
991 let logical_table_id = 1025;
993 let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
994 test_create_logical_table_task.set_table_id(logical_table_id);
995 let table_info = test_create_logical_table_task.table_info;
996 let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
997 let table_route_value = TableRouteValue::Logical(table_route);
998 table_metadata_manager
999 .create_table_metadata(table_info, table_route_value, HashMap::new())
1000 .await
1001 .unwrap();
1002 tokio::spawn(async move { supervisor.run().await });
1003 let (tx, rx) = oneshot::channel();
1004 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1005 assert!(rx.await.is_ok());
1006
1007 let (tx, rx) = oneshot::channel();
1008 sender.send(Event::Dump(tx)).await.unwrap();
1009 let detector = rx.await.unwrap();
1010 assert_eq!(detector.len(), 1);
1011 assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1012 }
1013
1014 #[tokio::test]
1015 async fn test_initialize_all_regions_with_maintenance_mode() {
1016 common_telemetry::init_default_ut_logging();
1017 let (mut supervisor, sender) = new_test_supervisor();
1018
1019 supervisor
1020 .runtime_switch_manager
1021 .set_maintenance_mode()
1022 .await
1023 .unwrap();
1024 tokio::spawn(async move { supervisor.run().await });
1025 let (tx, rx) = oneshot::channel();
1026 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1027 assert!(rx.await.is_err());
1029 }
1030
1031 #[tokio::test]
1032 async fn test_region_failure_detector_controller() {
1033 let (mut supervisor, sender) = new_test_supervisor();
1034 let controller = RegionFailureDetectorControl::new(sender.clone());
1035 tokio::spawn(async move { supervisor.run().await });
1036 let detecting_region = (1, RegionId::new(1, 1));
1037 controller
1038 .register_failure_detectors(vec![detecting_region])
1039 .await;
1040
1041 let (tx, rx) = oneshot::channel();
1042 sender.send(Event::Dump(tx)).await.unwrap();
1043 let detector = rx.await.unwrap();
1044 let region_detector = detector.region_failure_detector(detecting_region).clone();
1045
1046 controller
1048 .register_failure_detectors(vec![detecting_region])
1049 .await;
1050 let (tx, rx) = oneshot::channel();
1051 sender.send(Event::Dump(tx)).await.unwrap();
1052 let detector = rx.await.unwrap();
1053 let got = detector.region_failure_detector(detecting_region).clone();
1054 assert_eq!(region_detector, got);
1055
1056 controller
1057 .deregister_failure_detectors(vec![detecting_region])
1058 .await;
1059 let (tx, rx) = oneshot::channel();
1060 sender.send(Event::Dump(tx)).await.unwrap();
1061 assert!(rx.await.unwrap().is_empty());
1062 }
1063}