1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::datanode::Stat;
22use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
23use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
24use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
25use common_meta::key::{MetadataKey, MetadataValue};
26use common_meta::kv_backend::KvBackendRef;
27use common_meta::leadership_notifier::LeadershipChangeListener;
28use common_meta::peer::{Peer, PeerLookupServiceRef};
29use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
30use common_meta::rpc::store::RangeRequest;
31use common_meta::DatanodeId;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use error::Error::{LeaderPeerChanged, MigrationRunning, RegionMigrated, TableRouteNotFound};
36use futures::{StreamExt, TryStreamExt};
37use snafu::{ensure, ResultExt};
38use store_api::storage::RegionId;
39use tokio::sync::mpsc::{Receiver, Sender};
40use tokio::sync::oneshot;
41use tokio::time::{interval, interval_at, MissedTickBehavior};
42
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47 RegionMigrationManagerRef, RegionMigrationTriggerReason,
48};
49use crate::procedure::region_migration::{
50 RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
51};
52use crate::region::failure_detector::RegionFailureDetector;
53use crate::selector::SelectorOptions;
54
55#[derive(Debug)]
59pub(crate) struct DatanodeHeartbeat {
60 datanode_id: DatanodeId,
61 regions: Vec<RegionId>,
63 timestamp: i64,
64}
65
66impl From<&Stat> for DatanodeHeartbeat {
67 fn from(value: &Stat) -> Self {
68 DatanodeHeartbeat {
69 datanode_id: value.id,
70 regions: value.region_stats.iter().map(|x| x.id).collect(),
71 timestamp: value.timestamp_millis,
72 }
73 }
74}
75
76pub(crate) enum Event {
92 Tick,
93 InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
94 RegisterFailureDetectors(Vec<DetectingRegion>),
95 DeregisterFailureDetectors(Vec<DetectingRegion>),
96 HeartbeatArrived(DatanodeHeartbeat),
97 Clear,
98 #[cfg(test)]
99 Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
100}
101
102#[cfg(test)]
103impl Event {
104 pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
105 match self {
106 Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
107 _ => unreachable!(),
108 }
109 }
110}
111
112impl Debug for Event {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 match self {
115 Self::Tick => write!(f, "Tick"),
116 Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
117 Self::Clear => write!(f, "Clear"),
118 Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
119 Self::RegisterFailureDetectors(arg0) => f
120 .debug_tuple("RegisterFailureDetectors")
121 .field(arg0)
122 .finish(),
123 Self::DeregisterFailureDetectors(arg0) => f
124 .debug_tuple("DeregisterFailureDetectors")
125 .field(arg0)
126 .finish(),
127 #[cfg(test)]
128 Self::Dump(_) => f.debug_struct("Dump").finish(),
129 }
130 }
131}
132
133pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
134
135#[derive(Debug)]
137pub struct RegionSupervisorTicker {
138 tick_handle: Mutex<Option<JoinHandle<()>>>,
140
141 tick_interval: Duration,
143
144 initialization_delay: Duration,
146
147 initialization_retry_period: Duration,
149
150 sender: Sender<Event>,
152}
153
154#[async_trait]
155impl LeadershipChangeListener for RegionSupervisorTicker {
156 fn name(&self) -> &'static str {
157 "RegionSupervisorTicker"
158 }
159
160 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
161 self.start();
162 Ok(())
163 }
164
165 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
166 self.stop();
167 Ok(())
168 }
169}
170
171impl RegionSupervisorTicker {
172 pub(crate) fn new(
173 tick_interval: Duration,
174 initialization_delay: Duration,
175 initialization_retry_period: Duration,
176 sender: Sender<Event>,
177 ) -> Self {
178 info!(
179 "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
180 tick_interval, initialization_delay, initialization_retry_period
181 );
182 Self {
183 tick_handle: Mutex::new(None),
184 tick_interval,
185 initialization_delay,
186 initialization_retry_period,
187 sender,
188 }
189 }
190
191 pub fn start(&self) {
193 let mut handle = self.tick_handle.lock().unwrap();
194 if handle.is_none() {
195 let sender = self.sender.clone();
196 let tick_interval = self.tick_interval;
197 let initialization_delay = self.initialization_delay;
198
199 let mut initialization_interval = interval_at(
200 tokio::time::Instant::now() + initialization_delay,
201 self.initialization_retry_period,
202 );
203 initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
204 common_runtime::spawn_global(async move {
205 loop {
206 initialization_interval.tick().await;
207 let (tx, rx) = oneshot::channel();
208 if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
209 info!("EventReceiver is dropped, region failure detectors initialization loop is stopped");
210 break;
211 }
212 if rx.await.is_ok() {
213 info!("All region failure detectors are initialized.");
214 break;
215 }
216 }
217 });
218
219 let sender = self.sender.clone();
220 let ticker_loop = tokio::spawn(async move {
221 let mut tick_interval = interval(tick_interval);
222 tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
223
224 if let Err(err) = sender.send(Event::Clear).await {
225 warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
226 return;
227 }
228 loop {
229 tick_interval.tick().await;
230 if sender.send(Event::Tick).await.is_err() {
231 info!("EventReceiver is dropped, tick loop is stopped");
232 break;
233 }
234 }
235 });
236 *handle = Some(ticker_loop);
237 }
238 }
239
240 pub fn stop(&self) {
242 let handle = self.tick_handle.lock().unwrap().take();
243 if let Some(handle) = handle {
244 handle.abort();
245 info!("The tick loop is stopped.");
246 }
247 }
248}
249
250impl Drop for RegionSupervisorTicker {
251 fn drop(&mut self) {
252 self.stop();
253 }
254}
255
256pub type RegionSupervisorRef = Arc<RegionSupervisor>;
257
258pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
260pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
262
263pub enum RegionSupervisorSelector {
265 NaiveSelector(SelectorRef),
266 RegionStatAwareSelector(RegionStatAwareSelectorRef),
267}
268
269pub struct RegionSupervisor {
272 failure_detector: RegionFailureDetector,
274 failover_counts: HashMap<DetectingRegion, u32>,
276 receiver: Receiver<Event>,
278 selector_context: SelectorContext,
280 selector: RegionSupervisorSelector,
282 region_migration_manager: RegionMigrationManagerRef,
284 runtime_switch_manager: RuntimeSwitchManagerRef,
286 peer_lookup: PeerLookupServiceRef,
288 kv_backend: KvBackendRef,
290}
291
292#[derive(Debug, Clone)]
294pub struct RegionFailureDetectorControl {
295 sender: Sender<Event>,
296}
297
298impl RegionFailureDetectorControl {
299 pub(crate) fn new(sender: Sender<Event>) -> Self {
300 Self { sender }
301 }
302}
303
304#[async_trait::async_trait]
305impl RegionFailureDetectorController for RegionFailureDetectorControl {
306 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
307 if let Err(err) = self
308 .sender
309 .send(Event::RegisterFailureDetectors(detecting_regions))
310 .await
311 {
312 error!(err; "RegionSupervisor has stop receiving heartbeat.");
313 }
314 }
315
316 async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
317 if let Err(err) = self
318 .sender
319 .send(Event::DeregisterFailureDetectors(detecting_regions))
320 .await
321 {
322 error!(err; "RegionSupervisor has stop receiving heartbeat.");
323 }
324 }
325}
326
327#[derive(Clone)]
329pub(crate) struct HeartbeatAcceptor {
330 sender: Sender<Event>,
331}
332
333impl HeartbeatAcceptor {
334 pub(crate) fn new(sender: Sender<Event>) -> Self {
335 Self { sender }
336 }
337
338 pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
340 if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
341 error!(err; "RegionSupervisor has stop receiving heartbeat.");
342 }
343 }
344}
345
346impl RegionSupervisor {
347 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
349 tokio::sync::mpsc::channel(1024)
350 }
351
352 #[allow(clippy::too_many_arguments)]
353 pub(crate) fn new(
354 event_receiver: Receiver<Event>,
355 options: PhiAccrualFailureDetectorOptions,
356 selector_context: SelectorContext,
357 selector: RegionSupervisorSelector,
358 region_migration_manager: RegionMigrationManagerRef,
359 runtime_switch_manager: RuntimeSwitchManagerRef,
360 peer_lookup: PeerLookupServiceRef,
361 kv_backend: KvBackendRef,
362 ) -> Self {
363 Self {
364 failure_detector: RegionFailureDetector::new(options),
365 failover_counts: HashMap::new(),
366 receiver: event_receiver,
367 selector_context,
368 selector,
369 region_migration_manager,
370 runtime_switch_manager,
371 peer_lookup,
372 kv_backend,
373 }
374 }
375
376 pub(crate) async fn run(&mut self) {
378 while let Some(event) = self.receiver.recv().await {
379 match event {
380 Event::InitializeAllRegions(sender) => {
381 match self.is_maintenance_mode_enabled().await {
382 Ok(false) => {}
383 Ok(true) => {
384 warn!("Skipping initialize all regions since maintenance mode is enabled.");
385 continue;
386 }
387 Err(err) => {
388 error!(err; "Failed to check maintenance mode during initialize all regions.");
389 continue;
390 }
391 }
392
393 if let Err(err) = self.initialize_all().await {
394 error!(err; "Failed to initialize all regions.");
395 } else {
396 let _ = sender.send(());
398 }
399 }
400 Event::Tick => {
401 let regions = self.detect_region_failure();
402 self.handle_region_failures(regions).await;
403 }
404 Event::RegisterFailureDetectors(detecting_regions) => {
405 self.register_failure_detectors(detecting_regions).await
406 }
407 Event::DeregisterFailureDetectors(detecting_regions) => {
408 self.deregister_failure_detectors(detecting_regions).await
409 }
410 Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
411 Event::Clear => self.clear(),
412 #[cfg(test)]
413 Event::Dump(sender) => {
414 let _ = sender.send(self.failure_detector.dump());
415 }
416 }
417 }
418 info!("RegionSupervisor is stopped!");
419 }
420
421 async fn initialize_all(&self) -> Result<()> {
422 let now = Instant::now();
423 let regions = self.regions();
424 let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
425 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
426 TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
427 })
428 .into_stream();
429
430 let mut stream = stream
431 .map_ok(|(_, value)| {
432 TableRouteValue::try_from_raw_value(&value)
433 .context(error::TableMetadataManagerSnafu)
434 })
435 .boxed();
436 let mut detecting_regions = Vec::new();
437 while let Some(route) = stream
438 .try_next()
439 .await
440 .context(error::TableMetadataManagerSnafu)?
441 {
442 let route = route?;
443 if !route.is_physical() {
444 continue;
445 }
446
447 let physical_table_route = route.into_physical_table_route();
448 physical_table_route
449 .region_routes
450 .iter()
451 .for_each(|region_route| {
452 if !regions.contains(®ion_route.region.id) {
453 if let Some(leader_peer) = ®ion_route.leader_peer {
454 detecting_regions.push((leader_peer.id, region_route.region.id));
455 }
456 }
457 });
458 }
459
460 let num_detecting_regions = detecting_regions.len();
461 if !detecting_regions.is_empty() {
462 self.register_failure_detectors(detecting_regions).await;
463 }
464
465 info!(
466 "Initialize {} region failure detectors, elapsed: {:?}",
467 num_detecting_regions,
468 now.elapsed()
469 );
470
471 Ok(())
472 }
473
474 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
475 let ts_millis = current_time_millis();
476 for region in detecting_regions {
477 self.failure_detector
479 .maybe_init_region_failure_detector(region, ts_millis);
480 }
481 }
482
483 async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
484 for region in detecting_regions {
485 self.failure_detector.remove(®ion);
486 self.failover_counts.remove(®ion);
487 }
488 }
489
490 async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
491 if regions.is_empty() {
492 return;
493 }
494 match self.is_maintenance_mode_enabled().await {
495 Ok(false) => {}
496 Ok(true) => {
497 warn!(
498 "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
499 regions
500 );
501 return;
502 }
503 Err(err) => {
504 error!(err; "Failed to check maintenance mode");
505 return;
506 }
507 }
508
509 let migrating_regions = regions
511 .extract_if(.., |(_, region_id)| {
512 self.region_migration_manager.tracker().contains(*region_id)
513 })
514 .collect::<Vec<_>>();
515
516 for (datanode_id, region_id) in migrating_regions {
517 debug!(
518 "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
519 );
520 }
521
522 if regions.is_empty() {
523 return;
525 }
526
527 let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
528 HashMap::with_capacity(regions.len());
529 for (datanode_id, region_id) in regions {
530 grouped_regions
531 .entry(datanode_id)
532 .or_default()
533 .push(region_id);
534 }
535
536 for (datanode_id, regions) in grouped_regions {
537 warn!(
538 "Detects region failures on datanode: {}, regions: {:?}",
539 datanode_id, regions
540 );
541 let failed_datanodes = [datanode_id];
545 match self
546 .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
547 .await
548 {
549 Ok(tasks) => {
550 for (task, count) in tasks {
551 let region_id = task.region_id;
552 let datanode_id = task.from_peer.id;
553 if let Err(err) = self.do_failover(task, count).await {
554 error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
555 }
556 }
557 }
558 Err(err) => error!(err; "Failed to generate failover tasks"),
559 }
560 }
561 }
562
563 pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
564 self.runtime_switch_manager
565 .maintenance_mode()
566 .await
567 .context(error::RuntimeSwitchManagerSnafu)
568 }
569
570 async fn select_peers(
571 &self,
572 from_peer_id: DatanodeId,
573 regions: &[RegionId],
574 failure_datanodes: &[DatanodeId],
575 ) -> Result<Vec<(RegionId, Peer)>> {
576 let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
577 match &self.selector {
578 RegionSupervisorSelector::NaiveSelector(selector) => {
579 let opt = SelectorOptions {
580 min_required_items: regions.len(),
581 allow_duplication: true,
582 exclude_peer_ids,
583 };
584 let peers = selector.select(&self.selector_context, opt).await?;
585 ensure!(
586 peers.len() == regions.len(),
587 error::NoEnoughAvailableNodeSnafu {
588 required: regions.len(),
589 available: peers.len(),
590 select_target: SelectTarget::Datanode,
591 }
592 );
593 let region_peers = regions
594 .iter()
595 .zip(peers)
596 .map(|(region_id, peer)| (*region_id, peer))
597 .collect::<Vec<_>>();
598
599 Ok(region_peers)
600 }
601 RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
602 let peers = selector
603 .select(
604 &self.selector_context,
605 from_peer_id,
606 regions,
607 exclude_peer_ids,
608 )
609 .await?;
610 ensure!(
611 peers.len() == regions.len(),
612 error::NoEnoughAvailableNodeSnafu {
613 required: regions.len(),
614 available: peers.len(),
615 select_target: SelectTarget::Datanode,
616 }
617 );
618
619 Ok(peers)
620 }
621 }
622 }
623
624 async fn generate_failover_tasks(
625 &mut self,
626 from_peer_id: DatanodeId,
627 regions: &[RegionId],
628 failed_datanodes: &[DatanodeId],
629 ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
630 let mut tasks = Vec::with_capacity(regions.len());
631 let from_peer = self
632 .peer_lookup
633 .datanode(from_peer_id)
634 .await
635 .ok()
636 .flatten()
637 .unwrap_or_else(|| Peer::empty(from_peer_id));
638
639 let region_peers = self
640 .select_peers(from_peer_id, regions, failed_datanodes)
641 .await?;
642
643 for (region_id, peer) in region_peers {
644 let count = *self
645 .failover_counts
646 .entry((from_peer_id, region_id))
647 .and_modify(|count| *count += 1)
648 .or_insert(1);
649 let task = RegionMigrationProcedureTask {
650 region_id,
651 from_peer: from_peer.clone(),
652 to_peer: peer,
653 timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
654 trigger_reason: RegionMigrationTriggerReason::Failover,
655 };
656 tasks.push((task, count));
657 }
658
659 Ok(tasks)
660 }
661
662 async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
663 let from_peer_id = task.from_peer.id;
664 let to_peer_id = task.to_peer.id;
665 let region_id = task.region_id;
666
667 info!(
668 "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
669 task.region_id, task.from_peer, task.to_peer, task.timeout, count
670 );
671
672 if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
673 return match err {
674 RegionMigrated { .. } => {
675 info!(
676 "Region has been migrated to target peer: {}, removed failover detector for region: {}, datanode: {}",
677 to_peer_id, region_id, from_peer_id
678 );
679 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
680 .await;
681 Ok(())
682 }
683 MigrationRunning { .. } => {
685 info!(
686 "Another region migration is running, skip failover for region: {}, datanode: {}",
687 region_id, from_peer_id
688 );
689 Ok(())
690 }
691 TableRouteNotFound { .. } => {
692 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
693 .await;
694 info!(
695 "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
696 region_id, from_peer_id
697 );
698 Ok(())
699 }
700 LeaderPeerChanged { .. } => {
701 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
702 .await;
703 info!(
704 "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
705 region_id, from_peer_id
706 );
707 Ok(())
708 }
709 err => Err(err),
710 };
711 };
712
713 Ok(())
714 }
715
716 fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
718 self.failure_detector
719 .iter()
720 .filter_map(|e| {
721 if !e.failure_detector().is_available(current_time_millis()) {
728 Some(*e.region_ident())
729 } else {
730 None
731 }
732 })
733 .collect::<Vec<_>>()
734 }
735
736 fn regions(&self) -> HashSet<RegionId> {
738 self.failure_detector
739 .iter()
740 .map(|e| e.region_ident().1)
741 .collect::<HashSet<_>>()
742 }
743
744 fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
746 for region_id in heartbeat.regions {
747 let detecting_region = (heartbeat.datanode_id, region_id);
748 let mut detector = self
749 .failure_detector
750 .region_failure_detector(detecting_region);
751 detector.heartbeat(heartbeat.timestamp);
752 }
753 }
754
755 fn clear(&self) {
756 self.failure_detector.clear();
757 }
758}
759
760#[cfg(test)]
761pub(crate) mod tests {
762 use std::assert_matches::assert_matches;
763 use std::collections::HashMap;
764 use std::sync::{Arc, Mutex};
765 use std::time::Duration;
766
767 use common_meta::ddl::test_util::{
768 test_create_logical_table_task, test_create_physical_table_task,
769 };
770 use common_meta::ddl::RegionFailureDetectorController;
771 use common_meta::key::table_route::{
772 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
773 };
774 use common_meta::key::{runtime_switch, TableMetadataManager};
775 use common_meta::peer::Peer;
776 use common_meta::rpc::router::{Region, RegionRoute};
777 use common_meta::test_util::NoopPeerLookupService;
778 use common_telemetry::info;
779 use common_time::util::current_time_millis;
780 use rand::Rng;
781 use store_api::storage::RegionId;
782 use tokio::sync::mpsc::Sender;
783 use tokio::sync::oneshot;
784 use tokio::time::sleep;
785
786 use super::RegionSupervisorSelector;
787 use crate::procedure::region_migration::manager::RegionMigrationManager;
788 use crate::procedure::region_migration::test_util::TestingEnv;
789 use crate::region::supervisor::{
790 DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
791 RegionSupervisorTicker,
792 };
793 use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
794
795 pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
796 let env = TestingEnv::new();
797 let selector_context = new_test_selector_context();
798 let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
799 let context_factory = env.context_factory();
800 let region_migration_manager = Arc::new(RegionMigrationManager::new(
801 env.procedure_manager().clone(),
802 context_factory,
803 ));
804 let runtime_switch_manager =
805 Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
806 let peer_lookup = Arc::new(NoopPeerLookupService);
807 let (tx, rx) = RegionSupervisor::channel();
808 let kv_backend = env.kv_backend();
809
810 (
811 RegionSupervisor::new(
812 rx,
813 Default::default(),
814 selector_context,
815 RegionSupervisorSelector::NaiveSelector(selector),
816 region_migration_manager,
817 runtime_switch_manager,
818 peer_lookup,
819 kv_backend,
820 ),
821 tx,
822 )
823 }
824
825 #[tokio::test]
826 async fn test_heartbeat() {
827 let (mut supervisor, sender) = new_test_supervisor();
828 tokio::spawn(async move { supervisor.run().await });
829
830 sender
831 .send(Event::HeartbeatArrived(DatanodeHeartbeat {
832 datanode_id: 0,
833 regions: vec![RegionId::new(1, 1)],
834 timestamp: 100,
835 }))
836 .await
837 .unwrap();
838 let (tx, rx) = oneshot::channel();
839 sender.send(Event::Dump(tx)).await.unwrap();
840 let detector = rx.await.unwrap();
841 assert!(detector.contains(&(0, RegionId::new(1, 1))));
842
843 sender.send(Event::Clear).await.unwrap();
845 let (tx, rx) = oneshot::channel();
846 sender.send(Event::Dump(tx)).await.unwrap();
847 assert!(rx.await.unwrap().is_empty());
848
849 fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
850 let mut rng = rand::rng();
851 let start = current_time_millis();
852 (0..2000)
853 .map(|i| DatanodeHeartbeat {
854 timestamp: start + i * 1000 + rng.random_range(0..100),
855 datanode_id,
856 regions: region_ids
857 .iter()
858 .map(|number| RegionId::new(0, *number))
859 .collect(),
860 })
861 .collect::<Vec<_>>()
862 }
863
864 let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
865 let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
866 for heartbeat in heartbeats {
867 sender
868 .send(Event::HeartbeatArrived(heartbeat))
869 .await
870 .unwrap();
871 }
872
873 let (tx, rx) = oneshot::channel();
874 sender.send(Event::Dump(tx)).await.unwrap();
875 let detector = rx.await.unwrap();
876 assert_eq!(detector.len(), 3);
877
878 for e in detector.iter() {
879 let fd = e.failure_detector();
880 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
881 let start = last_heartbeat_time;
882
883 for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
885 let now = start + i * 1000;
886 assert_eq!(fd.phi(now), 0.0);
887 }
888
889 let now = start + acceptable_heartbeat_pause_millis + 1000;
892 assert!(fd.phi(now) < fd.threshold() as _);
893 let now = start + acceptable_heartbeat_pause_millis + 2000;
894 assert!(fd.phi(now) > fd.threshold() as _);
895 }
896 }
897
898 #[tokio::test]
899 async fn test_supervisor_ticker() {
900 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
901 let ticker = RegionSupervisorTicker {
902 tick_handle: Mutex::new(None),
903 tick_interval: Duration::from_millis(10),
904 initialization_delay: Duration::from_millis(100),
905 initialization_retry_period: Duration::from_millis(100),
906 sender: tx,
907 };
908 for _ in 0..2 {
910 ticker.start();
911 sleep(Duration::from_millis(100)).await;
912 ticker.stop();
913 assert!(!rx.is_empty());
914 while let Ok(event) = rx.try_recv() {
915 assert_matches!(
916 event,
917 Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
918 );
919 }
920 }
921 }
922
923 #[tokio::test]
924 async fn test_initialize_all_regions_event_handling() {
925 common_telemetry::init_default_ut_logging();
926 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
927 let ticker = RegionSupervisorTicker {
928 tick_handle: Mutex::new(None),
929 tick_interval: Duration::from_millis(1000),
930 initialization_delay: Duration::from_millis(50),
931 initialization_retry_period: Duration::from_millis(50),
932 sender: tx,
933 };
934 ticker.start();
935 sleep(Duration::from_millis(60)).await;
936 let handle = tokio::spawn(async move {
937 let mut counter = 0;
938 while let Some(event) = rx.recv().await {
939 if let Event::InitializeAllRegions(tx) = event {
940 if counter == 0 {
941 counter += 1;
943 continue;
944 }
945 tx.send(()).unwrap();
946 info!("Responded initialize all regions event");
947 break;
948 }
949 }
950 rx
951 });
952
953 let rx = handle.await.unwrap();
954 for _ in 0..3 {
955 sleep(Duration::from_millis(100)).await;
956 assert!(rx.is_empty());
957 }
958 }
959
960 #[tokio::test]
961 async fn test_initialize_all_regions() {
962 common_telemetry::init_default_ut_logging();
963 let (mut supervisor, sender) = new_test_supervisor();
964 let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
965
966 let table_id = 1024;
968 let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
969 create_physical_table_task.set_table_id(table_id);
970 let table_info = create_physical_table_task.table_info;
971 let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
972 region: Region {
973 id: RegionId::new(table_id, 0),
974 ..Default::default()
975 },
976 leader_peer: Some(Peer::empty(1)),
977 ..Default::default()
978 }]);
979 let table_route_value = TableRouteValue::Physical(table_route);
980 table_metadata_manager
981 .create_table_metadata(table_info, table_route_value, HashMap::new())
982 .await
983 .unwrap();
984
985 let logical_table_id = 1025;
987 let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
988 test_create_logical_table_task.set_table_id(logical_table_id);
989 let table_info = test_create_logical_table_task.table_info;
990 let table_route = LogicalTableRouteValue::new(1024, vec![RegionId::new(1025, 0)]);
991 let table_route_value = TableRouteValue::Logical(table_route);
992 table_metadata_manager
993 .create_table_metadata(table_info, table_route_value, HashMap::new())
994 .await
995 .unwrap();
996 tokio::spawn(async move { supervisor.run().await });
997 let (tx, rx) = oneshot::channel();
998 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
999 assert!(rx.await.is_ok());
1000
1001 let (tx, rx) = oneshot::channel();
1002 sender.send(Event::Dump(tx)).await.unwrap();
1003 let detector = rx.await.unwrap();
1004 assert_eq!(detector.len(), 1);
1005 assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1006 }
1007
1008 #[tokio::test]
1009 async fn test_initialize_all_regions_with_maintenance_mode() {
1010 common_telemetry::init_default_ut_logging();
1011 let (mut supervisor, sender) = new_test_supervisor();
1012
1013 supervisor
1014 .runtime_switch_manager
1015 .set_maintenance_mode()
1016 .await
1017 .unwrap();
1018 tokio::spawn(async move { supervisor.run().await });
1019 let (tx, rx) = oneshot::channel();
1020 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1021 assert!(rx.await.is_err());
1023 }
1024
1025 #[tokio::test]
1026 async fn test_region_failure_detector_controller() {
1027 let (mut supervisor, sender) = new_test_supervisor();
1028 let controller = RegionFailureDetectorControl::new(sender.clone());
1029 tokio::spawn(async move { supervisor.run().await });
1030 let detecting_region = (1, RegionId::new(1, 1));
1031 controller
1032 .register_failure_detectors(vec![detecting_region])
1033 .await;
1034
1035 let (tx, rx) = oneshot::channel();
1036 sender.send(Event::Dump(tx)).await.unwrap();
1037 let detector = rx.await.unwrap();
1038 let region_detector = detector.region_failure_detector(detecting_region).clone();
1039
1040 controller
1042 .register_failure_detectors(vec![detecting_region])
1043 .await;
1044 let (tx, rx) = oneshot::channel();
1045 sender.send(Event::Dump(tx)).await.unwrap();
1046 let detector = rx.await.unwrap();
1047 let got = detector.region_failure_detector(detecting_region).clone();
1048 assert_eq!(region_detector, got);
1049
1050 controller
1051 .deregister_failure_detectors(vec![detecting_region])
1052 .await;
1053 let (tx, rx) = oneshot::channel();
1054 sender.send(Event::Dump(tx)).await.unwrap();
1055 assert!(rx.await.unwrap().is_empty());
1056 }
1057}