1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::Duration;
19
20use async_trait::async_trait;
21use common_meta::datanode::Stat;
22use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
23use common_meta::key::maintenance::MaintenanceModeManagerRef;
24use common_meta::leadership_notifier::LeadershipChangeListener;
25use common_meta::peer::{Peer, PeerLookupServiceRef};
26use common_meta::DatanodeId;
27use common_runtime::JoinHandle;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use error::Error::{LeaderPeerChanged, MigrationRunning, TableRouteNotFound};
31use snafu::{ensure, OptionExt, ResultExt};
32use store_api::storage::RegionId;
33use tokio::sync::mpsc::{Receiver, Sender};
34use tokio::time::{interval, MissedTickBehavior};
35
36use crate::error::{self, Result};
37use crate::failure_detector::PhiAccrualFailureDetectorOptions;
38use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
39use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
40use crate::procedure::region_migration::{
41 RegionMigrationProcedureTask, DEFAULT_REGION_MIGRATION_TIMEOUT,
42};
43use crate::region::failure_detector::RegionFailureDetector;
44use crate::selector::SelectorOptions;
45
46#[derive(Debug)]
50pub(crate) struct DatanodeHeartbeat {
51 datanode_id: DatanodeId,
52 regions: Vec<RegionId>,
54 timestamp: i64,
55}
56
57impl From<&Stat> for DatanodeHeartbeat {
58 fn from(value: &Stat) -> Self {
59 DatanodeHeartbeat {
60 datanode_id: value.id,
61 regions: value.region_stats.iter().map(|x| x.id).collect(),
62 timestamp: value.timestamp_millis,
63 }
64 }
65}
66
67pub(crate) enum Event {
80 Tick,
81 RegisterFailureDetectors(Vec<DetectingRegion>),
82 DeregisterFailureDetectors(Vec<DetectingRegion>),
83 HeartbeatArrived(DatanodeHeartbeat),
84 Clear,
85 #[cfg(test)]
86 Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
87}
88
89#[cfg(test)]
90impl Event {
91 pub(crate) fn into_region_failure_detectors(self) -> Vec<DetectingRegion> {
92 match self {
93 Self::RegisterFailureDetectors(detecting_regions) => detecting_regions,
94 _ => unreachable!(),
95 }
96 }
97}
98
99impl Debug for Event {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 Self::Tick => write!(f, "Tick"),
103 Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
104 Self::Clear => write!(f, "Clear"),
105 Self::RegisterFailureDetectors(arg0) => f
106 .debug_tuple("RegisterFailureDetectors")
107 .field(arg0)
108 .finish(),
109 Self::DeregisterFailureDetectors(arg0) => f
110 .debug_tuple("DeregisterFailureDetectors")
111 .field(arg0)
112 .finish(),
113 #[cfg(test)]
114 Self::Dump(_) => f.debug_struct("Dump").finish(),
115 }
116 }
117}
118
119pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
120
121#[derive(Debug)]
123pub struct RegionSupervisorTicker {
124 tick_handle: Mutex<Option<JoinHandle<()>>>,
126
127 tick_interval: Duration,
129
130 sender: Sender<Event>,
132}
133
134#[async_trait]
135impl LeadershipChangeListener for RegionSupervisorTicker {
136 fn name(&self) -> &'static str {
137 "RegionSupervisorTicker"
138 }
139
140 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
141 self.start();
142 Ok(())
143 }
144
145 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
146 self.stop();
147 Ok(())
148 }
149}
150
151impl RegionSupervisorTicker {
152 pub(crate) fn new(tick_interval: Duration, sender: Sender<Event>) -> Self {
153 Self {
154 tick_handle: Mutex::new(None),
155 tick_interval,
156 sender,
157 }
158 }
159
160 pub fn start(&self) {
162 let mut handle = self.tick_handle.lock().unwrap();
163 if handle.is_none() {
164 let sender = self.sender.clone();
165 let tick_interval = self.tick_interval;
166 let ticker_loop = tokio::spawn(async move {
167 let mut interval = interval(tick_interval);
168 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
169 if let Err(err) = sender.send(Event::Clear).await {
170 warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
171 return;
172 }
173 loop {
174 interval.tick().await;
175 if sender.send(Event::Tick).await.is_err() {
176 info!("EventReceiver is dropped, tick loop is stopped");
177 break;
178 }
179 }
180 });
181 *handle = Some(ticker_loop);
182 }
183 }
184
185 pub fn stop(&self) {
187 let handle = self.tick_handle.lock().unwrap().take();
188 if let Some(handle) = handle {
189 handle.abort();
190 info!("The tick loop is stopped.");
191 }
192 }
193}
194
195impl Drop for RegionSupervisorTicker {
196 fn drop(&mut self) {
197 self.stop();
198 }
199}
200
201pub type RegionSupervisorRef = Arc<RegionSupervisor>;
202
203pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
205
206pub enum RegionSupervisorSelector {
208 NaiveSelector(SelectorRef),
209 RegionStatAwareSelector(RegionStatAwareSelectorRef),
210}
211
212pub struct RegionSupervisor {
215 failure_detector: RegionFailureDetector,
217 failover_counts: HashMap<DetectingRegion, u32>,
219 receiver: Receiver<Event>,
221 selector_context: SelectorContext,
223 selector: RegionSupervisorSelector,
225 region_migration_manager: RegionMigrationManagerRef,
227 maintenance_mode_manager: MaintenanceModeManagerRef,
229 peer_lookup: PeerLookupServiceRef,
231}
232
233#[derive(Debug, Clone)]
235pub struct RegionFailureDetectorControl {
236 sender: Sender<Event>,
237}
238
239impl RegionFailureDetectorControl {
240 pub(crate) fn new(sender: Sender<Event>) -> Self {
241 Self { sender }
242 }
243}
244
245#[async_trait::async_trait]
246impl RegionFailureDetectorController for RegionFailureDetectorControl {
247 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
248 if let Err(err) = self
249 .sender
250 .send(Event::RegisterFailureDetectors(detecting_regions))
251 .await
252 {
253 error!(err; "RegionSupervisor has stop receiving heartbeat.");
254 }
255 }
256
257 async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
258 if let Err(err) = self
259 .sender
260 .send(Event::DeregisterFailureDetectors(detecting_regions))
261 .await
262 {
263 error!(err; "RegionSupervisor has stop receiving heartbeat.");
264 }
265 }
266}
267
268#[derive(Clone)]
270pub(crate) struct HeartbeatAcceptor {
271 sender: Sender<Event>,
272}
273
274impl HeartbeatAcceptor {
275 pub(crate) fn new(sender: Sender<Event>) -> Self {
276 Self { sender }
277 }
278
279 pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
281 if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
282 error!(err; "RegionSupervisor has stop receiving heartbeat.");
283 }
284 }
285}
286
287impl RegionSupervisor {
288 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
290 tokio::sync::mpsc::channel(1024)
291 }
292
293 pub(crate) fn new(
294 event_receiver: Receiver<Event>,
295 options: PhiAccrualFailureDetectorOptions,
296 selector_context: SelectorContext,
297 selector: RegionSupervisorSelector,
298 region_migration_manager: RegionMigrationManagerRef,
299 maintenance_mode_manager: MaintenanceModeManagerRef,
300 peer_lookup: PeerLookupServiceRef,
301 ) -> Self {
302 Self {
303 failure_detector: RegionFailureDetector::new(options),
304 failover_counts: HashMap::new(),
305 receiver: event_receiver,
306 selector_context,
307 selector,
308 region_migration_manager,
309 maintenance_mode_manager,
310 peer_lookup,
311 }
312 }
313
314 pub(crate) async fn run(&mut self) {
316 while let Some(event) = self.receiver.recv().await {
317 match event {
318 Event::Tick => {
319 let regions = self.detect_region_failure();
320 self.handle_region_failures(regions).await;
321 }
322 Event::RegisterFailureDetectors(detecting_regions) => {
323 self.register_failure_detectors(detecting_regions).await
324 }
325 Event::DeregisterFailureDetectors(detecting_regions) => {
326 self.deregister_failure_detectors(detecting_regions).await
327 }
328 Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
329 Event::Clear => self.clear(),
330 #[cfg(test)]
331 Event::Dump(sender) => {
332 let _ = sender.send(self.failure_detector.dump());
333 }
334 }
335 }
336 info!("RegionSupervisor is stopped!");
337 }
338
339 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
340 let ts_millis = current_time_millis();
341 for region in detecting_regions {
342 self.failure_detector
344 .maybe_init_region_failure_detector(region, ts_millis);
345 }
346 }
347
348 async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
349 for region in detecting_regions {
350 self.failure_detector.remove(®ion);
351 self.failover_counts.remove(®ion);
352 }
353 }
354
355 async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
356 if regions.is_empty() {
357 return;
358 }
359 match self.is_maintenance_mode_enabled().await {
360 Ok(false) => {}
361 Ok(true) => {
362 warn!(
363 "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
364 regions
365 );
366 return;
367 }
368 Err(err) => {
369 error!(err; "Failed to check maintenance mode");
370 return;
371 }
372 }
373
374 let migrating_regions = regions
376 .extract_if(.., |(_, region_id)| {
377 self.region_migration_manager.tracker().contains(*region_id)
378 })
379 .collect::<Vec<_>>();
380
381 for (datanode_id, region_id) in migrating_regions {
382 debug!(
383 "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
384 );
385 }
386
387 if regions.is_empty() {
388 return;
390 }
391
392 let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
393 HashMap::with_capacity(regions.len());
394 for (datanode_id, region_id) in regions {
395 grouped_regions
396 .entry(datanode_id)
397 .or_default()
398 .push(region_id);
399 }
400
401 for (datanode_id, regions) in grouped_regions {
402 warn!(
403 "Detects region failures on datanode: {}, regions: {:?}",
404 datanode_id, regions
405 );
406 let failed_datanodes = [datanode_id];
410 match self
411 .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
412 .await
413 {
414 Ok(tasks) => {
415 for (task, count) in tasks {
416 let region_id = task.region_id;
417 let datanode_id = task.from_peer.id;
418 if let Err(err) = self.do_failover(task, count).await {
419 error!(err; "Failed to execute region failover for region: {}, datanode: {}", region_id, datanode_id);
420 }
421 }
422 }
423 Err(err) => error!(err; "Failed to generate failover tasks"),
424 }
425 }
426 }
427
428 pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
429 self.maintenance_mode_manager
430 .maintenance_mode()
431 .await
432 .context(error::MaintenanceModeManagerSnafu)
433 }
434
435 async fn select_peers(
436 &self,
437 from_peer_id: DatanodeId,
438 regions: &[RegionId],
439 failure_datanodes: &[DatanodeId],
440 ) -> Result<Vec<(RegionId, Peer)>> {
441 let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
442 match &self.selector {
443 RegionSupervisorSelector::NaiveSelector(selector) => {
444 let opt = SelectorOptions {
445 min_required_items: regions.len(),
446 allow_duplication: true,
447 exclude_peer_ids,
448 };
449 let peers = selector.select(&self.selector_context, opt).await?;
450 ensure!(
451 peers.len() == regions.len(),
452 error::NoEnoughAvailableNodeSnafu {
453 required: regions.len(),
454 available: peers.len(),
455 select_target: SelectTarget::Datanode,
456 }
457 );
458 let region_peers = regions
459 .iter()
460 .zip(peers)
461 .map(|(region_id, peer)| (*region_id, peer))
462 .collect::<Vec<_>>();
463
464 Ok(region_peers)
465 }
466 RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
467 let peers = selector
468 .select(
469 &self.selector_context,
470 from_peer_id,
471 regions,
472 exclude_peer_ids,
473 )
474 .await?;
475 ensure!(
476 peers.len() == regions.len(),
477 error::NoEnoughAvailableNodeSnafu {
478 required: regions.len(),
479 available: peers.len(),
480 select_target: SelectTarget::Datanode,
481 }
482 );
483
484 Ok(peers)
485 }
486 }
487 }
488
489 async fn generate_failover_tasks(
490 &mut self,
491 from_peer_id: DatanodeId,
492 regions: &[RegionId],
493 failed_datanodes: &[DatanodeId],
494 ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
495 let mut tasks = Vec::with_capacity(regions.len());
496 let from_peer = self
497 .peer_lookup
498 .datanode(from_peer_id)
499 .await
500 .context(error::LookupPeerSnafu {
501 peer_id: from_peer_id,
502 })?
503 .context(error::PeerUnavailableSnafu {
504 peer_id: from_peer_id,
505 })?;
506 let region_peers = self
507 .select_peers(from_peer_id, regions, failed_datanodes)
508 .await?;
509
510 for (region_id, peer) in region_peers {
511 let count = *self
512 .failover_counts
513 .entry((from_peer_id, region_id))
514 .and_modify(|count| *count += 1)
515 .or_insert(1);
516 let task = RegionMigrationProcedureTask {
517 region_id,
518 from_peer: from_peer.clone(),
519 to_peer: peer,
520 timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
521 };
522 tasks.push((task, count));
523 }
524
525 Ok(tasks)
526 }
527
528 async fn do_failover(&mut self, task: RegionMigrationProcedureTask, count: u32) -> Result<()> {
529 let from_peer_id = task.from_peer.id;
530 let region_id = task.region_id;
531
532 info!(
533 "Failover for region: {}, from_peer: {}, to_peer: {}, timeout: {:?}, tries: {}",
534 task.region_id, task.from_peer, task.to_peer, task.timeout, count
535 );
536
537 if let Err(err) = self.region_migration_manager.submit_procedure(task).await {
538 return match err {
539 MigrationRunning { .. } => {
541 info!(
542 "Another region migration is running, skip failover for region: {}, datanode: {}",
543 region_id, from_peer_id
544 );
545 Ok(())
546 }
547 TableRouteNotFound { .. } => {
548 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
549 .await;
550 info!(
551 "Table route is not found, the table is dropped, removed failover detector for region: {}, datanode: {}",
552 region_id, from_peer_id
553 );
554 Ok(())
555 }
556 LeaderPeerChanged { .. } => {
557 self.deregister_failure_detectors(vec![(from_peer_id, region_id)])
558 .await;
559 info!(
560 "Region's leader peer changed, removed failover detector for region: {}, datanode: {}",
561 region_id, from_peer_id
562 );
563 Ok(())
564 }
565 err => Err(err),
566 };
567 };
568
569 Ok(())
570 }
571
572 fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
574 self.failure_detector
575 .iter()
576 .filter_map(|e| {
577 if !e.failure_detector().is_available(current_time_millis()) {
584 Some(*e.region_ident())
585 } else {
586 None
587 }
588 })
589 .collect::<Vec<_>>()
590 }
591
592 fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
594 for region_id in heartbeat.regions {
595 let detecting_region = (heartbeat.datanode_id, region_id);
596 let mut detector = self
597 .failure_detector
598 .region_failure_detector(detecting_region);
599 detector.heartbeat(heartbeat.timestamp);
600 }
601 }
602
603 fn clear(&self) {
604 self.failure_detector.clear();
605 }
606}
607
608#[cfg(test)]
609pub(crate) mod tests {
610 use std::assert_matches::assert_matches;
611 use std::sync::{Arc, Mutex};
612 use std::time::Duration;
613
614 use common_meta::ddl::RegionFailureDetectorController;
615 use common_meta::key::maintenance;
616 use common_meta::peer::Peer;
617 use common_meta::test_util::NoopPeerLookupService;
618 use common_time::util::current_time_millis;
619 use rand::Rng;
620 use store_api::storage::RegionId;
621 use tokio::sync::mpsc::Sender;
622 use tokio::sync::oneshot;
623 use tokio::time::sleep;
624
625 use super::RegionSupervisorSelector;
626 use crate::procedure::region_migration::manager::RegionMigrationManager;
627 use crate::procedure::region_migration::test_util::TestingEnv;
628 use crate::region::supervisor::{
629 DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
630 RegionSupervisorTicker,
631 };
632 use crate::selector::test_utils::{new_test_selector_context, RandomNodeSelector};
633
634 pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
635 let env = TestingEnv::new();
636 let selector_context = new_test_selector_context();
637 let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
638 let context_factory = env.context_factory();
639 let region_migration_manager = Arc::new(RegionMigrationManager::new(
640 env.procedure_manager().clone(),
641 context_factory,
642 ));
643 let maintenance_mode_manager =
644 Arc::new(maintenance::MaintenanceModeManager::new(env.kv_backend()));
645 let peer_lookup = Arc::new(NoopPeerLookupService);
646 let (tx, rx) = RegionSupervisor::channel();
647
648 (
649 RegionSupervisor::new(
650 rx,
651 Default::default(),
652 selector_context,
653 RegionSupervisorSelector::NaiveSelector(selector),
654 region_migration_manager,
655 maintenance_mode_manager,
656 peer_lookup,
657 ),
658 tx,
659 )
660 }
661
662 #[tokio::test]
663 async fn test_heartbeat() {
664 let (mut supervisor, sender) = new_test_supervisor();
665 tokio::spawn(async move { supervisor.run().await });
666
667 sender
668 .send(Event::HeartbeatArrived(DatanodeHeartbeat {
669 datanode_id: 0,
670 regions: vec![RegionId::new(1, 1)],
671 timestamp: 100,
672 }))
673 .await
674 .unwrap();
675 let (tx, rx) = oneshot::channel();
676 sender.send(Event::Dump(tx)).await.unwrap();
677 let detector = rx.await.unwrap();
678 assert!(detector.contains(&(0, RegionId::new(1, 1))));
679
680 sender.send(Event::Clear).await.unwrap();
682 let (tx, rx) = oneshot::channel();
683 sender.send(Event::Dump(tx)).await.unwrap();
684 assert!(rx.await.unwrap().is_empty());
685
686 fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
687 let mut rng = rand::rng();
688 let start = current_time_millis();
689 (0..2000)
690 .map(|i| DatanodeHeartbeat {
691 timestamp: start + i * 1000 + rng.random_range(0..100),
692 datanode_id,
693 regions: region_ids
694 .iter()
695 .map(|number| RegionId::new(0, *number))
696 .collect(),
697 })
698 .collect::<Vec<_>>()
699 }
700
701 let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
702 let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
703 for heartbeat in heartbeats {
704 sender
705 .send(Event::HeartbeatArrived(heartbeat))
706 .await
707 .unwrap();
708 }
709
710 let (tx, rx) = oneshot::channel();
711 sender.send(Event::Dump(tx)).await.unwrap();
712 let detector = rx.await.unwrap();
713 assert_eq!(detector.len(), 3);
714
715 for e in detector.iter() {
716 let fd = e.failure_detector();
717 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
718 let start = last_heartbeat_time;
719
720 for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
722 let now = start + i * 1000;
723 assert_eq!(fd.phi(now), 0.0);
724 }
725
726 let now = start + acceptable_heartbeat_pause_millis + 1000;
729 assert!(fd.phi(now) < fd.threshold() as _);
730 let now = start + acceptable_heartbeat_pause_millis + 2000;
731 assert!(fd.phi(now) > fd.threshold() as _);
732 }
733 }
734
735 #[tokio::test]
736 async fn test_supervisor_ticker() {
737 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
738 let ticker = RegionSupervisorTicker {
739 tick_handle: Mutex::new(None),
740 tick_interval: Duration::from_millis(10),
741 sender: tx,
742 };
743 for _ in 0..2 {
745 ticker.start();
746 sleep(Duration::from_millis(100)).await;
747 ticker.stop();
748 assert!(!rx.is_empty());
749 while let Ok(event) = rx.try_recv() {
750 assert_matches!(event, Event::Tick | Event::Clear);
751 }
752 }
753 }
754
755 #[tokio::test]
756 async fn test_region_failure_detector_controller() {
757 let (mut supervisor, sender) = new_test_supervisor();
758 let controller = RegionFailureDetectorControl::new(sender.clone());
759 tokio::spawn(async move { supervisor.run().await });
760 let detecting_region = (1, RegionId::new(1, 1));
761 controller
762 .register_failure_detectors(vec![detecting_region])
763 .await;
764
765 let (tx, rx) = oneshot::channel();
766 sender.send(Event::Dump(tx)).await.unwrap();
767 let detector = rx.await.unwrap();
768 let region_detector = detector.region_failure_detector(detecting_region).clone();
769
770 controller
772 .register_failure_detectors(vec![detecting_region])
773 .await;
774 let (tx, rx) = oneshot::channel();
775 sender.send(Event::Dump(tx)).await.unwrap();
776 let detector = rx.await.unwrap();
777 let got = detector.region_failure_detector(detecting_region).clone();
778 assert_eq!(region_detector, got);
779
780 controller
781 .deregister_failure_detectors(vec![detecting_region])
782 .await;
783 let (tx, rx) = oneshot::channel();
784 sender.send(Event::Dump(tx)).await.unwrap();
785 assert!(rx.await.unwrap().is_empty());
786 }
787}