1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47 RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62 datanode_id: DatanodeId,
63 regions: Vec<RegionId>,
65 timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69 fn from(value: &Stat) -> Self {
70 DatanodeHeartbeat {
71 datanode_id: value.id,
72 regions: value.region_stats.iter().map(|x| x.id).collect(),
73 timestamp: value.timestamp_millis,
74 }
75 }
76}
77
78pub(crate) enum Event {
94 Tick,
95 InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
96 RegisterFailureDetectors(Vec<DetectingRegion>),
97 DeregisterFailureDetectors(Vec<DetectingRegion>),
98 HeartbeatArrived(DatanodeHeartbeat),
99 Clear,
100 #[cfg(test)]
101 Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
102}
103
104impl Debug for Event {
105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
106 match self {
107 Self::Tick => write!(f, "Tick"),
108 Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
109 Self::Clear => write!(f, "Clear"),
110 Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
111 Self::RegisterFailureDetectors(arg0) => f
112 .debug_tuple("RegisterFailureDetectors")
113 .field(arg0)
114 .finish(),
115 Self::DeregisterFailureDetectors(arg0) => f
116 .debug_tuple("DeregisterFailureDetectors")
117 .field(arg0)
118 .finish(),
119 #[cfg(test)]
120 Self::Dump(_) => f.debug_struct("Dump").finish(),
121 }
122 }
123}
124
125pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
126
127#[derive(Debug)]
129pub struct RegionSupervisorTicker {
130 tick_handle: Mutex<Option<JoinHandle<()>>>,
132
133 initialization_handle: Mutex<Option<JoinHandle<()>>>,
135
136 tick_interval: Duration,
138
139 initialization_delay: Duration,
141
142 initialization_retry_period: Duration,
144
145 sender: Sender<Event>,
147}
148
149#[async_trait]
150impl LeadershipChangeListener for RegionSupervisorTicker {
151 fn name(&self) -> &'static str {
152 "RegionSupervisorTicker"
153 }
154
155 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
156 self.start();
157 Ok(())
158 }
159
160 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
161 self.stop();
162 Ok(())
163 }
164}
165
166impl RegionSupervisorTicker {
167 pub(crate) fn new(
168 tick_interval: Duration,
169 initialization_delay: Duration,
170 initialization_retry_period: Duration,
171 sender: Sender<Event>,
172 ) -> Self {
173 info!(
174 "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
175 tick_interval, initialization_delay, initialization_retry_period
176 );
177 Self {
178 tick_handle: Mutex::new(None),
179 initialization_handle: Mutex::new(None),
180 tick_interval,
181 initialization_delay,
182 initialization_retry_period,
183 sender,
184 }
185 }
186
187 pub fn start(&self) {
189 let mut handle = self.tick_handle.lock().unwrap();
190 if handle.is_none() {
191 let sender = self.sender.clone();
192 let tick_interval = self.tick_interval;
193 let initialization_delay = self.initialization_delay;
194
195 let mut initialization_interval = interval_at(
196 tokio::time::Instant::now() + initialization_delay,
197 self.initialization_retry_period,
198 );
199 initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
200 let initialization_handler = common_runtime::spawn_global(async move {
201 loop {
202 initialization_interval.tick().await;
203 let (tx, rx) = oneshot::channel();
204 if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
205 info!(
206 "EventReceiver is dropped, region failure detectors initialization loop is stopped"
207 );
208 break;
209 }
210 if rx.await.is_ok() {
211 info!("All region failure detectors are initialized.");
212 break;
213 }
214 }
215 });
216 *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
217
218 let sender = self.sender.clone();
219 let ticker_loop = tokio::spawn(async move {
220 let mut tick_interval = interval(tick_interval);
221 tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
222
223 if let Err(err) = sender.send(Event::Clear).await {
224 warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
225 return;
226 }
227 loop {
228 tick_interval.tick().await;
229 if sender.send(Event::Tick).await.is_err() {
230 info!("EventReceiver is dropped, tick loop is stopped");
231 break;
232 }
233 }
234 });
235 *handle = Some(ticker_loop);
236 }
237 }
238
239 pub fn stop(&self) {
241 let handle = self.tick_handle.lock().unwrap().take();
242 if let Some(handle) = handle {
243 handle.abort();
244 info!("The tick loop is stopped.");
245 }
246 let initialization_handler = self.initialization_handle.lock().unwrap().take();
247 if let Some(initialization_handler) = initialization_handler {
248 initialization_handler.abort();
249 info!("The initialization loop is stopped.");
250 }
251 }
252}
253
254impl Drop for RegionSupervisorTicker {
255 fn drop(&mut self) {
256 self.stop();
257 }
258}
259
260pub type RegionSupervisorRef = Arc<RegionSupervisor>;
261
262pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
264pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
266
267pub enum RegionSupervisorSelector {
269 NaiveSelector(SelectorRef),
270 RegionStatAwareSelector(RegionStatAwareSelectorRef),
271}
272
273pub struct RegionSupervisor {
276 failure_detector: RegionFailureDetector,
278 failover_counts: HashMap<DetectingRegion, u32>,
280 receiver: Receiver<Event>,
282 selector_context: SelectorContext,
284 selector: RegionSupervisorSelector,
286 region_migration_manager: RegionMigrationManagerRef,
288 runtime_switch_manager: RuntimeSwitchManagerRef,
290 peer_resolver: PeerResolverRef,
292 kv_backend: KvBackendRef,
294 state: Option<StateRef>,
296}
297
298#[derive(Debug, Clone)]
300pub struct RegionFailureDetectorControl {
301 sender: Sender<Event>,
302}
303
304impl RegionFailureDetectorControl {
305 pub(crate) fn new(sender: Sender<Event>) -> Self {
306 Self { sender }
307 }
308}
309
310#[async_trait::async_trait]
311impl RegionFailureDetectorController for RegionFailureDetectorControl {
312 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
313 if let Err(err) = self
314 .sender
315 .send(Event::RegisterFailureDetectors(detecting_regions))
316 .await
317 {
318 error!(err; "RegionSupervisor has stop receiving heartbeat.");
319 }
320 }
321
322 async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
323 if let Err(err) = self
324 .sender
325 .send(Event::DeregisterFailureDetectors(detecting_regions))
326 .await
327 {
328 error!(err; "RegionSupervisor has stop receiving heartbeat.");
329 }
330 }
331}
332
333#[derive(Clone)]
335pub(crate) struct HeartbeatAcceptor {
336 sender: Sender<Event>,
337}
338
339impl HeartbeatAcceptor {
340 pub(crate) fn new(sender: Sender<Event>) -> Self {
341 Self { sender }
342 }
343
344 pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
346 if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
347 error!(err; "RegionSupervisor has stop receiving heartbeat.");
348 }
349 }
350}
351
352impl RegionSupervisor {
353 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
355 tokio::sync::mpsc::channel(1024)
356 }
357
358 #[allow(clippy::too_many_arguments)]
359 pub(crate) fn new(
360 event_receiver: Receiver<Event>,
361 options: PhiAccrualFailureDetectorOptions,
362 selector_context: SelectorContext,
363 selector: RegionSupervisorSelector,
364 region_migration_manager: RegionMigrationManagerRef,
365 runtime_switch_manager: RuntimeSwitchManagerRef,
366 peer_resolver: PeerResolverRef,
367 kv_backend: KvBackendRef,
368 ) -> Self {
369 Self {
370 failure_detector: RegionFailureDetector::new(options),
371 failover_counts: HashMap::new(),
372 receiver: event_receiver,
373 selector_context,
374 selector,
375 region_migration_manager,
376 runtime_switch_manager,
377 peer_resolver,
378 kv_backend,
379 state: None,
380 }
381 }
382
383 pub(crate) fn with_state(mut self, state: StateRef) -> Self {
385 self.state = Some(state);
386 self
387 }
388
389 pub(crate) async fn run(&mut self) {
391 while let Some(event) = self.receiver.recv().await {
392 if let Some(state) = self.state.as_ref()
393 && !state.read().unwrap().is_leader()
394 {
395 warn!(
396 "The current metasrv is not the leader, ignore {:?} event",
397 event
398 );
399 continue;
400 }
401
402 match event {
403 Event::InitializeAllRegions(sender) => {
404 match self.is_maintenance_mode_enabled().await {
405 Ok(false) => {}
406 Ok(true) => {
407 warn!(
408 "Skipping initialize all regions since maintenance mode is enabled."
409 );
410 continue;
411 }
412 Err(err) => {
413 error!(err; "Failed to check maintenance mode during initialize all regions.");
414 continue;
415 }
416 }
417
418 if let Err(err) = self.initialize_all().await {
419 error!(err; "Failed to initialize all regions.");
420 } else {
421 let _ = sender.send(());
423 }
424 }
425 Event::Tick => {
426 let regions = self.detect_region_failure();
427 self.handle_region_failures(regions).await;
428 }
429 Event::RegisterFailureDetectors(detecting_regions) => {
430 self.register_failure_detectors(detecting_regions).await
431 }
432 Event::DeregisterFailureDetectors(detecting_regions) => {
433 self.deregister_failure_detectors(detecting_regions).await
434 }
435 Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
436 Event::Clear => {
437 self.clear();
438 info!("Region supervisor is initialized.");
439 }
440 #[cfg(test)]
441 Event::Dump(sender) => {
442 let _ = sender.send(self.failure_detector.dump());
443 }
444 }
445 }
446 info!("RegionSupervisor is stopped!");
447 }
448
449 async fn initialize_all(&self) -> Result<()> {
450 let now = Instant::now();
451 let regions = self.regions();
452 let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
453 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
454 TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
455 })
456 .into_stream();
457
458 let mut stream = stream
459 .map_ok(|(_, value)| {
460 TableRouteValue::try_from_raw_value(&value)
461 .context(error::TableMetadataManagerSnafu)
462 })
463 .boxed();
464 let mut detecting_regions = Vec::new();
465 while let Some(route) = stream
466 .try_next()
467 .await
468 .context(error::TableMetadataManagerSnafu)?
469 {
470 let route = route?;
471 if !route.is_physical() {
472 continue;
473 }
474
475 let physical_table_route = route.into_physical_table_route();
476 physical_table_route
477 .region_routes
478 .iter()
479 .for_each(|region_route| {
480 if !regions.contains(®ion_route.region.id)
481 && let Some(leader_peer) = ®ion_route.leader_peer
482 {
483 detecting_regions.push((leader_peer.id, region_route.region.id));
484 }
485 });
486 }
487
488 let num_detecting_regions = detecting_regions.len();
489 if !detecting_regions.is_empty() {
490 self.register_failure_detectors(detecting_regions).await;
491 }
492
493 info!(
494 "Initialize {} region failure detectors, elapsed: {:?}",
495 num_detecting_regions,
496 now.elapsed()
497 );
498
499 Ok(())
500 }
501
502 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
503 let ts_millis = current_time_millis();
504 for region in detecting_regions {
505 self.failure_detector
507 .maybe_init_region_failure_detector(region, ts_millis);
508 }
509 }
510
511 async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
512 for region in detecting_regions {
513 self.failure_detector.remove(®ion);
514 self.failover_counts.remove(®ion);
515 }
516 }
517
518 async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
519 if regions.is_empty() {
520 return;
521 }
522 match self.is_maintenance_mode_enabled().await {
523 Ok(false) => {}
524 Ok(true) => {
525 warn!(
526 "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
527 regions
528 );
529 return;
530 }
531 Err(err) => {
532 error!(err; "Failed to check maintenance mode");
533 return;
534 }
535 }
536
537 let migrating_regions = regions
539 .extract_if(.., |(_, region_id)| {
540 self.region_migration_manager.tracker().contains(*region_id)
541 })
542 .collect::<Vec<_>>();
543
544 for (datanode_id, region_id) in migrating_regions {
545 debug!(
546 "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
547 );
548 }
549
550 if regions.is_empty() {
551 return;
553 }
554
555 let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
556 HashMap::with_capacity(regions.len());
557 for (datanode_id, region_id) in regions {
558 grouped_regions
559 .entry(datanode_id)
560 .or_default()
561 .push(region_id);
562 }
563
564 for (datanode_id, regions) in grouped_regions {
565 warn!(
566 "Detects region failures on datanode: {}, regions: {:?}",
567 datanode_id, regions
568 );
569 let failed_datanodes = [datanode_id];
573 match self
574 .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
575 .await
576 {
577 Ok(tasks) => {
578 let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
579 for (task, count) in tasks {
580 grouped_tasks
581 .entry((task.from_peer.id, task.to_peer.id))
582 .or_default()
583 .push((task, count));
584 }
585
586 for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
587 if tasks.is_empty() {
588 continue;
589 }
590 let task = RegionMigrationTaskBatch::from_tasks(tasks);
591 let region_ids = task.region_ids.clone();
592 if let Err(err) = self.do_failover_tasks(task).await {
593 error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
594 }
595 }
596 }
597 Err(err) => error!(err; "Failed to generate failover tasks"),
598 }
599 }
600 }
601
602 pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
603 self.runtime_switch_manager
604 .maintenance_mode()
605 .await
606 .context(error::RuntimeSwitchManagerSnafu)
607 }
608
609 async fn select_peers(
610 &self,
611 from_peer_id: DatanodeId,
612 regions: &[RegionId],
613 failure_datanodes: &[DatanodeId],
614 ) -> Result<Vec<(RegionId, Peer)>> {
615 let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
616 match &self.selector {
617 RegionSupervisorSelector::NaiveSelector(selector) => {
618 let opt = SelectorOptions {
619 min_required_items: regions.len(),
620 allow_duplication: true,
621 exclude_peer_ids,
622 workload_filter: Some(accept_ingest_workload),
623 extensions: Default::default(),
624 };
625 let peers = selector.select(&self.selector_context, opt).await?;
626 ensure!(
627 peers.len() == regions.len(),
628 error::NoEnoughAvailableNodeSnafu {
629 required: regions.len(),
630 available: peers.len(),
631 select_target: SelectTarget::Datanode,
632 }
633 );
634 let region_peers = regions
635 .iter()
636 .zip(peers)
637 .map(|(region_id, peer)| (*region_id, peer))
638 .collect::<Vec<_>>();
639
640 Ok(region_peers)
641 }
642 RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
643 let peers = selector
644 .select(
645 &self.selector_context,
646 from_peer_id,
647 regions,
648 exclude_peer_ids,
649 )
650 .await?;
651 ensure!(
652 peers.len() == regions.len(),
653 error::NoEnoughAvailableNodeSnafu {
654 required: regions.len(),
655 available: peers.len(),
656 select_target: SelectTarget::Datanode,
657 }
658 );
659
660 Ok(peers)
661 }
662 }
663 }
664
665 async fn generate_failover_tasks(
666 &mut self,
667 from_peer_id: DatanodeId,
668 regions: &[RegionId],
669 failed_datanodes: &[DatanodeId],
670 ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
671 let mut tasks = Vec::with_capacity(regions.len());
672 let from_peer = self
673 .peer_resolver
674 .datanode(from_peer_id)
675 .await
676 .ok()
677 .flatten()
678 .unwrap_or_else(|| Peer::empty(from_peer_id));
679
680 let region_peers = self
681 .select_peers(from_peer_id, regions, failed_datanodes)
682 .await?;
683
684 for (region_id, peer) in region_peers {
685 let count = *self
686 .failover_counts
687 .entry((from_peer_id, region_id))
688 .and_modify(|count| *count += 1)
689 .or_insert(1);
690 let task = RegionMigrationProcedureTask {
691 region_id,
692 from_peer: from_peer.clone(),
693 to_peer: peer,
694 timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
695 trigger_reason: RegionMigrationTriggerReason::Failover,
696 };
697 tasks.push((task, count));
698 }
699
700 Ok(tasks)
701 }
702
703 async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
704 let from_peer_id = task.from_peer.id;
705 let to_peer_id = task.to_peer.id;
706 let timeout = task.timeout;
707 let trigger_reason = task.trigger_reason;
708 let result = self
709 .region_migration_manager
710 .submit_region_migration_task(task)
711 .await?;
712 self.handle_submit_region_migration_task_result(
713 from_peer_id,
714 to_peer_id,
715 timeout,
716 trigger_reason,
717 result,
718 )
719 .await
720 }
721
722 async fn handle_submit_region_migration_task_result(
723 &mut self,
724 from_peer_id: DatanodeId,
725 to_peer_id: DatanodeId,
726 timeout: Duration,
727 trigger_reason: RegionMigrationTriggerReason,
728 result: SubmitRegionMigrationTaskResult,
729 ) -> Result<()> {
730 if !result.migrated.is_empty() {
731 let detecting_regions = result
732 .migrated
733 .iter()
734 .map(|region_id| (from_peer_id, *region_id))
735 .collect::<Vec<_>>();
736 self.deregister_failure_detectors(detecting_regions).await;
737 info!(
738 "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
739 to_peer_id, result.migrated,
740 )
741 }
742 if !result.migrating.is_empty() {
743 info!(
744 "Region is still migrating, skipping failover for regions: {:?}",
745 result.migrating
746 );
747 }
748 if !result.region_not_found.is_empty() {
749 let detecting_regions = result
750 .region_not_found
751 .iter()
752 .map(|region_id| (from_peer_id, *region_id))
753 .collect::<Vec<_>>();
754 self.deregister_failure_detectors(detecting_regions).await;
755 info!(
756 "Region route not found, removed failover detectors for regions: {:?}",
757 result.region_not_found
758 );
759 }
760 if !result.table_not_found.is_empty() {
761 let detecting_regions = result
762 .table_not_found
763 .iter()
764 .map(|region_id| (from_peer_id, *region_id))
765 .collect::<Vec<_>>();
766 self.deregister_failure_detectors(detecting_regions).await;
767 info!(
768 "Table is not found, removed failover detectors for regions: {:?}",
769 result.table_not_found
770 );
771 }
772 if !result.leader_changed.is_empty() {
773 let detecting_regions = result
774 .leader_changed
775 .iter()
776 .map(|region_id| (from_peer_id, *region_id))
777 .collect::<Vec<_>>();
778 self.deregister_failure_detectors(detecting_regions).await;
779 info!(
780 "Region's leader peer changed, removed failover detectors for regions: {:?}",
781 result.leader_changed
782 );
783 }
784 if !result.peer_conflict.is_empty() {
785 info!(
786 "Region has peer conflict, ignore failover for regions: {:?}",
787 result.peer_conflict
788 );
789 }
790 if !result.submitted.is_empty() {
791 info!(
792 "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
793 result.submitted,
794 from_peer_id,
795 to_peer_id,
796 result.procedure_id,
797 timeout,
798 trigger_reason,
799 );
800 }
801
802 Ok(())
803 }
804
805 fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
807 self.failure_detector
808 .iter()
809 .filter_map(|e| {
810 if !e.failure_detector().is_available(current_time_millis()) {
817 Some(*e.region_ident())
818 } else {
819 None
820 }
821 })
822 .collect::<Vec<_>>()
823 }
824
825 fn regions(&self) -> HashSet<RegionId> {
827 self.failure_detector
828 .iter()
829 .map(|e| e.region_ident().1)
830 .collect::<HashSet<_>>()
831 }
832
833 fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
835 for region_id in heartbeat.regions {
836 let detecting_region = (heartbeat.datanode_id, region_id);
837 let mut detector = self
838 .failure_detector
839 .region_failure_detector(detecting_region);
840 detector.heartbeat(heartbeat.timestamp);
841 }
842 }
843
844 fn clear(&self) {
845 self.failure_detector.clear();
846 }
847}
848
849#[cfg(test)]
850pub(crate) mod tests {
851 use std::assert_matches;
852 use std::collections::HashMap;
853 use std::sync::{Arc, Mutex};
854 use std::time::Duration;
855
856 use common_meta::ddl::RegionFailureDetectorController;
857 use common_meta::ddl::test_util::{
858 test_create_logical_table_task, test_create_physical_table_task,
859 };
860 use common_meta::key::table_route::{
861 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
862 };
863 use common_meta::key::{TableMetadataManager, runtime_switch};
864 use common_meta::peer::Peer;
865 use common_meta::rpc::router::{Region, RegionRoute};
866 use common_meta::test_util::NoopPeerResolver;
867 use common_telemetry::info;
868 use common_time::util::current_time_millis;
869 use rand::Rng;
870 use store_api::storage::RegionId;
871 use tokio::sync::mpsc::Sender;
872 use tokio::sync::oneshot;
873 use tokio::time::sleep;
874
875 use super::RegionSupervisorSelector;
876 use crate::procedure::region_migration::RegionMigrationTriggerReason;
877 use crate::procedure::region_migration::manager::{
878 RegionMigrationManager, SubmitRegionMigrationTaskResult,
879 };
880 use crate::procedure::region_migration::test_util::TestingEnv;
881 use crate::region::supervisor::{
882 DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
883 RegionSupervisorTicker,
884 };
885 use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
886
887 pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
888 let env = TestingEnv::new();
889 let selector_context = new_test_selector_context();
890 let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
891 let context_factory = env.context_factory();
892 let region_migration_manager = Arc::new(RegionMigrationManager::new(
893 env.procedure_manager().clone(),
894 context_factory,
895 ));
896 let runtime_switch_manager =
897 Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
898 let peer_resolver = Arc::new(NoopPeerResolver);
899 let (tx, rx) = RegionSupervisor::channel();
900 let kv_backend = env.kv_backend();
901
902 (
903 RegionSupervisor::new(
904 rx,
905 Default::default(),
906 selector_context,
907 RegionSupervisorSelector::NaiveSelector(selector),
908 region_migration_manager,
909 runtime_switch_manager,
910 peer_resolver,
911 kv_backend,
912 ),
913 tx,
914 )
915 }
916
917 #[tokio::test]
918 async fn test_heartbeat() {
919 let (mut supervisor, sender) = new_test_supervisor();
920 tokio::spawn(async move { supervisor.run().await });
921
922 sender
923 .send(Event::HeartbeatArrived(DatanodeHeartbeat {
924 datanode_id: 0,
925 regions: vec![RegionId::new(1, 1)],
926 timestamp: 100,
927 }))
928 .await
929 .unwrap();
930 let (tx, rx) = oneshot::channel();
931 sender.send(Event::Dump(tx)).await.unwrap();
932 let detector = rx.await.unwrap();
933 assert!(detector.contains(&(0, RegionId::new(1, 1))));
934
935 sender.send(Event::Clear).await.unwrap();
937 let (tx, rx) = oneshot::channel();
938 sender.send(Event::Dump(tx)).await.unwrap();
939 assert!(rx.await.unwrap().is_empty());
940
941 fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
942 let mut rng = rand::rng();
943 let start = current_time_millis();
944 (0..2000)
945 .map(|i| DatanodeHeartbeat {
946 timestamp: start + i * 1000 + rng.random_range(0..100),
947 datanode_id,
948 regions: region_ids
949 .iter()
950 .map(|number| RegionId::new(0, *number))
951 .collect(),
952 })
953 .collect::<Vec<_>>()
954 }
955
956 let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
957 let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
958 for heartbeat in heartbeats {
959 sender
960 .send(Event::HeartbeatArrived(heartbeat))
961 .await
962 .unwrap();
963 }
964
965 let (tx, rx) = oneshot::channel();
966 sender.send(Event::Dump(tx)).await.unwrap();
967 let detector = rx.await.unwrap();
968 assert_eq!(detector.len(), 3);
969
970 for e in detector.iter() {
971 let fd = e.failure_detector();
972 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
973 let start = last_heartbeat_time;
974
975 for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
977 let now = start + i * 1000;
978 assert_eq!(fd.phi(now), 0.0);
979 }
980
981 let now = start + acceptable_heartbeat_pause_millis + 1000;
984 assert!(fd.phi(now) < fd.threshold() as _);
985 let now = start + acceptable_heartbeat_pause_millis + 2000;
986 assert!(fd.phi(now) > fd.threshold() as _);
987 }
988 }
989
990 #[tokio::test]
991 async fn test_supervisor_ticker() {
992 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
993 let ticker = RegionSupervisorTicker {
994 tick_handle: Mutex::new(None),
995 initialization_handle: Mutex::new(None),
996 tick_interval: Duration::from_millis(10),
997 initialization_delay: Duration::from_millis(100),
998 initialization_retry_period: Duration::from_millis(100),
999 sender: tx,
1000 };
1001 for _ in 0..2 {
1003 ticker.start();
1004 sleep(Duration::from_millis(100)).await;
1005 ticker.stop();
1006 assert!(!rx.is_empty());
1007 while let Ok(event) = rx.try_recv() {
1008 assert_matches!(
1009 event,
1010 Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
1011 );
1012 }
1013 assert!(ticker.initialization_handle.lock().unwrap().is_none());
1014 assert!(ticker.tick_handle.lock().unwrap().is_none());
1015 }
1016 }
1017
1018 #[tokio::test]
1019 async fn test_initialize_all_regions_event_handling() {
1020 common_telemetry::init_default_ut_logging();
1021 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1022 let ticker = RegionSupervisorTicker {
1023 tick_handle: Mutex::new(None),
1024 initialization_handle: Mutex::new(None),
1025 tick_interval: Duration::from_millis(1000),
1026 initialization_delay: Duration::from_millis(50),
1027 initialization_retry_period: Duration::from_millis(50),
1028 sender: tx,
1029 };
1030 ticker.start();
1031 sleep(Duration::from_millis(60)).await;
1032 let handle = tokio::spawn(async move {
1033 let mut counter = 0;
1034 while let Some(event) = rx.recv().await {
1035 if let Event::InitializeAllRegions(tx) = event {
1036 if counter == 0 {
1037 counter += 1;
1039 continue;
1040 }
1041 tx.send(()).unwrap();
1042 info!("Responded initialize all regions event");
1043 break;
1044 }
1045 }
1046 rx
1047 });
1048
1049 let rx = handle.await.unwrap();
1050 for _ in 0..3 {
1051 sleep(Duration::from_millis(100)).await;
1052 assert!(rx.is_empty());
1053 }
1054 }
1055
1056 #[tokio::test]
1057 async fn test_initialize_all_regions() {
1058 common_telemetry::init_default_ut_logging();
1059 let (mut supervisor, sender) = new_test_supervisor();
1060 let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1061
1062 let table_id = 1024;
1064 let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1065 create_physical_table_task.set_table_id(table_id);
1066 let table_info = create_physical_table_task.table_info;
1067 let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1068 region: Region {
1069 id: RegionId::new(table_id, 0),
1070 ..Default::default()
1071 },
1072 leader_peer: Some(Peer::empty(1)),
1073 ..Default::default()
1074 }]);
1075 let table_route_value = TableRouteValue::Physical(table_route);
1076 table_metadata_manager
1077 .create_table_metadata(table_info, table_route_value, HashMap::new())
1078 .await
1079 .unwrap();
1080
1081 let logical_table_id = 1025;
1083 let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1084 test_create_logical_table_task.set_table_id(logical_table_id);
1085 let table_info = test_create_logical_table_task.table_info;
1086 let table_route = LogicalTableRouteValue::new(1024);
1087 let table_route_value = TableRouteValue::Logical(table_route);
1088 table_metadata_manager
1089 .create_table_metadata(table_info, table_route_value, HashMap::new())
1090 .await
1091 .unwrap();
1092 tokio::spawn(async move { supervisor.run().await });
1093 let (tx, rx) = oneshot::channel();
1094 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1095 assert!(rx.await.is_ok());
1096
1097 let (tx, rx) = oneshot::channel();
1098 sender.send(Event::Dump(tx)).await.unwrap();
1099 let detector = rx.await.unwrap();
1100 assert_eq!(detector.len(), 1);
1101 assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1102 }
1103
1104 #[tokio::test]
1105 async fn test_initialize_all_regions_with_maintenance_mode() {
1106 common_telemetry::init_default_ut_logging();
1107 let (mut supervisor, sender) = new_test_supervisor();
1108
1109 supervisor
1110 .runtime_switch_manager
1111 .set_maintenance_mode()
1112 .await
1113 .unwrap();
1114 tokio::spawn(async move { supervisor.run().await });
1115 let (tx, rx) = oneshot::channel();
1116 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1117 assert!(rx.await.is_err());
1119 }
1120
1121 #[tokio::test]
1122 async fn test_region_failure_detector_controller() {
1123 let (mut supervisor, sender) = new_test_supervisor();
1124 let controller = RegionFailureDetectorControl::new(sender.clone());
1125 tokio::spawn(async move { supervisor.run().await });
1126 let detecting_region = (1, RegionId::new(1, 1));
1127 controller
1128 .register_failure_detectors(vec![detecting_region])
1129 .await;
1130
1131 let (tx, rx) = oneshot::channel();
1132 sender.send(Event::Dump(tx)).await.unwrap();
1133 let detector = rx.await.unwrap();
1134 let region_detector = detector.region_failure_detector(detecting_region).clone();
1135
1136 controller
1138 .register_failure_detectors(vec![detecting_region])
1139 .await;
1140 let (tx, rx) = oneshot::channel();
1141 sender.send(Event::Dump(tx)).await.unwrap();
1142 let detector = rx.await.unwrap();
1143 let got = detector.region_failure_detector(detecting_region).clone();
1144 assert_eq!(region_detector, got);
1145
1146 controller
1147 .deregister_failure_detectors(vec![detecting_region])
1148 .await;
1149 let (tx, rx) = oneshot::channel();
1150 sender.send(Event::Dump(tx)).await.unwrap();
1151 assert!(rx.await.unwrap().is_empty());
1152 }
1153
1154 #[tokio::test]
1155 async fn test_handle_submit_region_migration_task_result_migrated() {
1156 common_telemetry::init_default_ut_logging();
1157 let (mut supervisor, _) = new_test_supervisor();
1158 let region_id = RegionId::new(1, 1);
1159 let detecting_region = (1, region_id);
1160 supervisor
1161 .register_failure_detectors(vec![detecting_region])
1162 .await;
1163 supervisor.failover_counts.insert(detecting_region, 1);
1164 let result = SubmitRegionMigrationTaskResult {
1165 migrated: vec![region_id],
1166 ..Default::default()
1167 };
1168 supervisor
1169 .handle_submit_region_migration_task_result(
1170 1,
1171 2,
1172 Duration::from_millis(1000),
1173 RegionMigrationTriggerReason::Manual,
1174 result,
1175 )
1176 .await
1177 .unwrap();
1178 assert!(!supervisor.failure_detector.contains(&detecting_region));
1179 assert!(supervisor.failover_counts.is_empty());
1180 }
1181
1182 #[tokio::test]
1183 async fn test_handle_submit_region_migration_task_result_migrating() {
1184 common_telemetry::init_default_ut_logging();
1185 let (mut supervisor, _) = new_test_supervisor();
1186 let region_id = RegionId::new(1, 1);
1187 let detecting_region = (1, region_id);
1188 supervisor
1189 .register_failure_detectors(vec![detecting_region])
1190 .await;
1191 supervisor.failover_counts.insert(detecting_region, 1);
1192 let result = SubmitRegionMigrationTaskResult {
1193 migrating: vec![region_id],
1194 ..Default::default()
1195 };
1196 supervisor
1197 .handle_submit_region_migration_task_result(
1198 1,
1199 2,
1200 Duration::from_millis(1000),
1201 RegionMigrationTriggerReason::Manual,
1202 result,
1203 )
1204 .await
1205 .unwrap();
1206 assert!(supervisor.failure_detector.contains(&detecting_region));
1207 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1208 }
1209
1210 #[tokio::test]
1211 async fn test_handle_submit_region_migration_task_result_table_not_found() {
1212 common_telemetry::init_default_ut_logging();
1213 let (mut supervisor, _) = new_test_supervisor();
1214 let region_id = RegionId::new(1, 1);
1215 let detecting_region = (1, region_id);
1216 supervisor
1217 .register_failure_detectors(vec![detecting_region])
1218 .await;
1219 supervisor.failover_counts.insert(detecting_region, 1);
1220 let result = SubmitRegionMigrationTaskResult {
1221 table_not_found: vec![region_id],
1222 ..Default::default()
1223 };
1224 supervisor
1225 .handle_submit_region_migration_task_result(
1226 1,
1227 2,
1228 Duration::from_millis(1000),
1229 RegionMigrationTriggerReason::Manual,
1230 result,
1231 )
1232 .await
1233 .unwrap();
1234 assert!(!supervisor.failure_detector.contains(&detecting_region));
1235 assert!(supervisor.failover_counts.is_empty());
1236 }
1237
1238 #[tokio::test]
1239 async fn test_handle_submit_region_migration_task_result_region_not_found() {
1240 common_telemetry::init_default_ut_logging();
1241 let (mut supervisor, _) = new_test_supervisor();
1242 let region_id = RegionId::new(1, 1);
1243 let detecting_region = (1, region_id);
1244 supervisor
1245 .register_failure_detectors(vec![detecting_region])
1246 .await;
1247 supervisor.failover_counts.insert(detecting_region, 1);
1248 let result = SubmitRegionMigrationTaskResult {
1249 region_not_found: vec![region_id],
1250 ..Default::default()
1251 };
1252 supervisor
1253 .handle_submit_region_migration_task_result(
1254 1,
1255 2,
1256 Duration::from_millis(1000),
1257 RegionMigrationTriggerReason::Manual,
1258 result,
1259 )
1260 .await
1261 .unwrap();
1262 assert!(!supervisor.failure_detector.contains(&detecting_region));
1263 assert!(supervisor.failover_counts.is_empty());
1264 }
1265
1266 #[tokio::test]
1267 async fn test_handle_submit_region_migration_task_result_leader_changed() {
1268 common_telemetry::init_default_ut_logging();
1269 let (mut supervisor, _) = new_test_supervisor();
1270 let region_id = RegionId::new(1, 1);
1271 let detecting_region = (1, region_id);
1272 supervisor
1273 .register_failure_detectors(vec![detecting_region])
1274 .await;
1275 supervisor.failover_counts.insert(detecting_region, 1);
1276 let result = SubmitRegionMigrationTaskResult {
1277 leader_changed: vec![region_id],
1278 ..Default::default()
1279 };
1280 supervisor
1281 .handle_submit_region_migration_task_result(
1282 1,
1283 2,
1284 Duration::from_millis(1000),
1285 RegionMigrationTriggerReason::Manual,
1286 result,
1287 )
1288 .await
1289 .unwrap();
1290 assert!(!supervisor.failure_detector.contains(&detecting_region));
1291 assert!(supervisor.failover_counts.is_empty());
1292 }
1293
1294 #[tokio::test]
1295 async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1296 common_telemetry::init_default_ut_logging();
1297 let (mut supervisor, _) = new_test_supervisor();
1298 let region_id = RegionId::new(1, 1);
1299 let detecting_region = (1, region_id);
1300 supervisor
1301 .register_failure_detectors(vec![detecting_region])
1302 .await;
1303 supervisor.failover_counts.insert(detecting_region, 1);
1304 let result = SubmitRegionMigrationTaskResult {
1305 peer_conflict: vec![region_id],
1306 ..Default::default()
1307 };
1308 supervisor
1309 .handle_submit_region_migration_task_result(
1310 1,
1311 2,
1312 Duration::from_millis(1000),
1313 RegionMigrationTriggerReason::Manual,
1314 result,
1315 )
1316 .await
1317 .unwrap();
1318 assert!(supervisor.failure_detector.contains(&detecting_region));
1319 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1320 }
1321
1322 #[tokio::test]
1323 async fn test_handle_submit_region_migration_task_result_submitted() {
1324 common_telemetry::init_default_ut_logging();
1325 let (mut supervisor, _) = new_test_supervisor();
1326 let region_id = RegionId::new(1, 1);
1327 let detecting_region = (1, region_id);
1328 supervisor
1329 .register_failure_detectors(vec![detecting_region])
1330 .await;
1331 supervisor.failover_counts.insert(detecting_region, 1);
1332 let result = SubmitRegionMigrationTaskResult {
1333 submitted: vec![region_id],
1334 ..Default::default()
1335 };
1336 supervisor
1337 .handle_submit_region_migration_task_result(
1338 1,
1339 2,
1340 Duration::from_millis(1000),
1341 RegionMigrationTriggerReason::Manual,
1342 result,
1343 )
1344 .await
1345 .unwrap();
1346 assert!(supervisor.failure_detector.contains(&detecting_region));
1347 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1348 }
1349}