1use std::collections::{HashMap, HashSet};
16use std::fmt::Debug;
17use std::sync::{Arc, Mutex};
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use common_meta::DatanodeId;
22use common_meta::datanode::Stat;
23use common_meta::ddl::{DetectingRegion, RegionFailureDetectorController};
24use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
25use common_meta::key::table_route::{TableRouteKey, TableRouteValue};
26use common_meta::key::{MetadataKey, MetadataValue};
27use common_meta::kv_backend::KvBackendRef;
28use common_meta::leadership_notifier::LeadershipChangeListener;
29use common_meta::peer::{Peer, PeerResolverRef};
30use common_meta::range_stream::{DEFAULT_PAGE_SIZE, PaginationStream};
31use common_meta::rpc::store::RangeRequest;
32use common_runtime::JoinHandle;
33use common_telemetry::{debug, error, info, warn};
34use common_time::util::current_time_millis;
35use futures::{StreamExt, TryStreamExt};
36use snafu::{ResultExt, ensure};
37use store_api::storage::RegionId;
38use tokio::sync::mpsc::{Receiver, Sender};
39use tokio::sync::oneshot;
40use tokio::time::{MissedTickBehavior, interval, interval_at};
41
42use crate::discovery::utils::accept_ingest_workload;
43use crate::error::{self, Result};
44use crate::failure_detector::PhiAccrualFailureDetectorOptions;
45use crate::metasrv::{RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef};
46use crate::procedure::region_migration::manager::{
47 RegionMigrationManagerRef, RegionMigrationTriggerReason, SubmitRegionMigrationTaskResult,
48};
49use crate::procedure::region_migration::utils::RegionMigrationTaskBatch;
50use crate::procedure::region_migration::{
51 DEFAULT_REGION_MIGRATION_TIMEOUT, RegionMigrationProcedureTask,
52};
53use crate::region::failure_detector::RegionFailureDetector;
54use crate::selector::SelectorOptions;
55use crate::state::StateRef;
56
57#[derive(Debug)]
61pub(crate) struct DatanodeHeartbeat {
62 datanode_id: DatanodeId,
63 regions: Vec<RegionId>,
65 timestamp: i64,
66}
67
68impl From<&Stat> for DatanodeHeartbeat {
69 fn from(value: &Stat) -> Self {
70 DatanodeHeartbeat {
71 datanode_id: value.id,
72 regions: value.region_stats.iter().map(|x| x.id).collect(),
73 timestamp: value.timestamp_millis,
74 }
75 }
76}
77
78pub(crate) enum Event {
95 Tick,
96 InitializeAllRegions(tokio::sync::oneshot::Sender<()>),
97 RegisterFailureDetectors(Vec<DetectingRegion>),
98 DeregisterFailureDetectors(Vec<DetectingRegion>),
99 ResetFailureDetectors(Vec<DetectingRegion>),
100 HeartbeatArrived(DatanodeHeartbeat),
101 Clear,
102 #[cfg(test)]
103 Dump(tokio::sync::oneshot::Sender<RegionFailureDetector>),
104}
105
106impl Debug for Event {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 match self {
109 Self::Tick => write!(f, "Tick"),
110 Self::HeartbeatArrived(arg0) => f.debug_tuple("HeartbeatArrived").field(arg0).finish(),
111 Self::Clear => write!(f, "Clear"),
112 Self::InitializeAllRegions(_) => write!(f, "InspectAndRegisterRegions"),
113 Self::RegisterFailureDetectors(arg0) => f
114 .debug_tuple("RegisterFailureDetectors")
115 .field(arg0)
116 .finish(),
117 Self::ResetFailureDetectors(arg0) => {
118 f.debug_tuple("ResetFailureDetectors").field(arg0).finish()
119 }
120 Self::DeregisterFailureDetectors(arg0) => f
121 .debug_tuple("DeregisterFailureDetectors")
122 .field(arg0)
123 .finish(),
124 #[cfg(test)]
125 Self::Dump(_) => f.debug_struct("Dump").finish(),
126 }
127 }
128}
129
130pub type RegionSupervisorTickerRef = Arc<RegionSupervisorTicker>;
131
132#[derive(Debug)]
134pub struct RegionSupervisorTicker {
135 tick_handle: Mutex<Option<JoinHandle<()>>>,
137
138 initialization_handle: Mutex<Option<JoinHandle<()>>>,
140
141 tick_interval: Duration,
143
144 initialization_delay: Duration,
146
147 initialization_retry_period: Duration,
149
150 sender: Sender<Event>,
152}
153
154#[async_trait]
155impl LeadershipChangeListener for RegionSupervisorTicker {
156 fn name(&self) -> &'static str {
157 "RegionSupervisorTicker"
158 }
159
160 async fn on_leader_start(&self) -> common_meta::error::Result<()> {
161 self.start();
162 Ok(())
163 }
164
165 async fn on_leader_stop(&self) -> common_meta::error::Result<()> {
166 self.stop();
167 Ok(())
168 }
169}
170
171impl RegionSupervisorTicker {
172 pub(crate) fn new(
173 tick_interval: Duration,
174 initialization_delay: Duration,
175 initialization_retry_period: Duration,
176 sender: Sender<Event>,
177 ) -> Self {
178 info!(
179 "RegionSupervisorTicker is created, tick_interval: {:?}, initialization_delay: {:?}, initialization_retry_period: {:?}",
180 tick_interval, initialization_delay, initialization_retry_period
181 );
182 Self {
183 tick_handle: Mutex::new(None),
184 initialization_handle: Mutex::new(None),
185 tick_interval,
186 initialization_delay,
187 initialization_retry_period,
188 sender,
189 }
190 }
191
192 pub fn start(&self) {
194 let mut handle = self.tick_handle.lock().unwrap();
195 if handle.is_none() {
196 let sender = self.sender.clone();
197 let tick_interval = self.tick_interval;
198 let initialization_delay = self.initialization_delay;
199
200 let mut initialization_interval = interval_at(
201 tokio::time::Instant::now() + initialization_delay,
202 self.initialization_retry_period,
203 );
204 initialization_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
205 let initialization_handler = common_runtime::spawn_global(async move {
206 loop {
207 initialization_interval.tick().await;
208 let (tx, rx) = oneshot::channel();
209 if sender.send(Event::InitializeAllRegions(tx)).await.is_err() {
210 info!(
211 "EventReceiver is dropped, region failure detectors initialization loop is stopped"
212 );
213 break;
214 }
215 if rx.await.is_ok() {
216 info!("All region failure detectors are initialized.");
217 break;
218 }
219 }
220 });
221 *self.initialization_handle.lock().unwrap() = Some(initialization_handler);
222
223 let sender = self.sender.clone();
224 let ticker_loop = tokio::spawn(async move {
225 let mut tick_interval = interval(tick_interval);
226 tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
227
228 if let Err(err) = sender.send(Event::Clear).await {
229 warn!(err; "EventReceiver is dropped, failed to send Event::Clear");
230 return;
231 }
232 loop {
233 tick_interval.tick().await;
234 if sender.send(Event::Tick).await.is_err() {
235 info!("EventReceiver is dropped, tick loop is stopped");
236 break;
237 }
238 }
239 });
240 *handle = Some(ticker_loop);
241 }
242 }
243
244 pub fn stop(&self) {
246 let handle = self.tick_handle.lock().unwrap().take();
247 if let Some(handle) = handle {
248 handle.abort();
249 info!("The tick loop is stopped.");
250 }
251 let initialization_handler = self.initialization_handle.lock().unwrap().take();
252 if let Some(initialization_handler) = initialization_handler {
253 initialization_handler.abort();
254 info!("The initialization loop is stopped.");
255 }
256 }
257}
258
259impl Drop for RegionSupervisorTicker {
260 fn drop(&mut self) {
261 self.stop();
262 }
263}
264
265pub type RegionSupervisorRef = Arc<RegionSupervisor>;
266
267pub const DEFAULT_TICK_INTERVAL: Duration = Duration::from_secs(1);
269pub const DEFAULT_INITIALIZATION_RETRY_PERIOD: Duration = Duration::from_secs(60);
271
272pub enum RegionSupervisorSelector {
274 NaiveSelector(SelectorRef),
275 RegionStatAwareSelector(RegionStatAwareSelectorRef),
276}
277
278pub struct RegionSupervisor {
281 failure_detector: RegionFailureDetector,
283 failover_counts: HashMap<DetectingRegion, u32>,
285 receiver: Receiver<Event>,
287 selector_context: SelectorContext,
289 selector: RegionSupervisorSelector,
291 region_migration_manager: RegionMigrationManagerRef,
293 runtime_switch_manager: RuntimeSwitchManagerRef,
295 peer_resolver: PeerResolverRef,
297 kv_backend: KvBackendRef,
299 state: Option<StateRef>,
301}
302
303#[derive(Debug, Clone)]
305pub struct RegionFailureDetectorControl {
306 sender: Sender<Event>,
307}
308
309impl RegionFailureDetectorControl {
310 pub(crate) fn new(sender: Sender<Event>) -> Self {
311 Self { sender }
312 }
313}
314
315#[async_trait::async_trait]
316impl RegionFailureDetectorController for RegionFailureDetectorControl {
317 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
318 if let Err(err) = self
319 .sender
320 .send(Event::RegisterFailureDetectors(detecting_regions))
321 .await
322 {
323 error!(err; "RegionSupervisor has stop receiving heartbeat.");
324 }
325 }
326
327 async fn reset_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
328 if let Err(err) = self
329 .sender
330 .send(Event::ResetFailureDetectors(detecting_regions))
331 .await
332 {
333 error!(err; "RegionSupervisor has stop receiving heartbeat.");
334 }
335 }
336
337 async fn deregister_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
338 if let Err(err) = self
339 .sender
340 .send(Event::DeregisterFailureDetectors(detecting_regions))
341 .await
342 {
343 error!(err; "RegionSupervisor has stop receiving heartbeat.");
344 }
345 }
346}
347
348#[derive(Clone)]
350pub(crate) struct HeartbeatAcceptor {
351 sender: Sender<Event>,
352}
353
354impl HeartbeatAcceptor {
355 pub(crate) fn new(sender: Sender<Event>) -> Self {
356 Self { sender }
357 }
358
359 pub(crate) async fn accept(&self, heartbeat: DatanodeHeartbeat) {
361 if let Err(err) = self.sender.send(Event::HeartbeatArrived(heartbeat)).await {
362 error!(err; "RegionSupervisor has stop receiving heartbeat.");
363 }
364 }
365}
366
367impl RegionSupervisor {
368 pub(crate) fn channel() -> (Sender<Event>, Receiver<Event>) {
370 tokio::sync::mpsc::channel(1024)
371 }
372
373 #[allow(clippy::too_many_arguments)]
374 pub(crate) fn new(
375 event_receiver: Receiver<Event>,
376 options: PhiAccrualFailureDetectorOptions,
377 selector_context: SelectorContext,
378 selector: RegionSupervisorSelector,
379 region_migration_manager: RegionMigrationManagerRef,
380 runtime_switch_manager: RuntimeSwitchManagerRef,
381 peer_resolver: PeerResolverRef,
382 kv_backend: KvBackendRef,
383 ) -> Self {
384 Self {
385 failure_detector: RegionFailureDetector::new(options),
386 failover_counts: HashMap::new(),
387 receiver: event_receiver,
388 selector_context,
389 selector,
390 region_migration_manager,
391 runtime_switch_manager,
392 peer_resolver,
393 kv_backend,
394 state: None,
395 }
396 }
397
398 pub(crate) fn with_state(mut self, state: StateRef) -> Self {
400 self.state = Some(state);
401 self
402 }
403
404 pub(crate) async fn run(&mut self) {
406 while let Some(event) = self.receiver.recv().await {
407 if let Some(state) = self.state.as_ref()
408 && !state.read().unwrap().is_leader()
409 {
410 warn!(
411 "The current metasrv is not the leader, ignore {:?} event",
412 event
413 );
414 continue;
415 }
416
417 match event {
418 Event::InitializeAllRegions(sender) => {
419 match self.is_maintenance_mode_enabled().await {
420 Ok(false) => {}
421 Ok(true) => {
422 warn!(
423 "Skipping initialize all regions since maintenance mode is enabled."
424 );
425 continue;
426 }
427 Err(err) => {
428 error!(err; "Failed to check maintenance mode during initialize all regions.");
429 continue;
430 }
431 }
432
433 if let Err(err) = self.initialize_all().await {
434 error!(err; "Failed to initialize all regions.");
435 } else {
436 let _ = sender.send(());
438 }
439 }
440 Event::Tick => {
441 let regions = self.detect_region_failure();
442 self.handle_region_failures(regions).await;
443 }
444 Event::RegisterFailureDetectors(detecting_regions) => {
445 self.register_failure_detectors(detecting_regions).await
446 }
447 Event::ResetFailureDetectors(detecting_regions) => {
448 self.reset_failure_detectors(detecting_regions).await
449 }
450 Event::DeregisterFailureDetectors(detecting_regions) => {
451 self.deregister_failure_detectors(detecting_regions).await
452 }
453 Event::HeartbeatArrived(heartbeat) => self.on_heartbeat_arrived(heartbeat),
454 Event::Clear => {
455 self.clear();
456 info!("Region supervisor is initialized.");
457 }
458 #[cfg(test)]
459 Event::Dump(sender) => {
460 let _ = sender.send(self.failure_detector.dump());
461 }
462 }
463 }
464 info!("RegionSupervisor is stopped!");
465 }
466
467 async fn initialize_all(&self) -> Result<()> {
468 let now = Instant::now();
469 let regions = self.regions();
470 let req = RangeRequest::new().with_prefix(TableRouteKey::range_prefix());
471 let stream = PaginationStream::new(self.kv_backend.clone(), req, DEFAULT_PAGE_SIZE, |kv| {
472 TableRouteKey::from_bytes(&kv.key).map(|v| (v.table_id, kv.value))
473 })
474 .into_stream();
475
476 let mut stream = stream
477 .map_ok(|(_, value)| {
478 TableRouteValue::try_from_raw_value(&value)
479 .context(error::TableMetadataManagerSnafu)
480 })
481 .boxed();
482 let mut detecting_regions = Vec::new();
483 while let Some(route) = stream
484 .try_next()
485 .await
486 .context(error::TableMetadataManagerSnafu)?
487 {
488 let route = route?;
489 if !route.is_physical() {
490 continue;
491 }
492
493 let physical_table_route = route.into_physical_table_route();
494 physical_table_route
495 .region_routes
496 .iter()
497 .for_each(|region_route| {
498 if !regions.contains(®ion_route.region.id)
499 && let Some(leader_peer) = ®ion_route.leader_peer
500 {
501 detecting_regions.push((leader_peer.id, region_route.region.id));
502 }
503 });
504 }
505
506 let num_detecting_regions = detecting_regions.len();
507 if !detecting_regions.is_empty() {
508 self.register_failure_detectors(detecting_regions).await;
509 }
510
511 info!(
512 "Initialize {} region failure detectors, elapsed: {:?}",
513 num_detecting_regions,
514 now.elapsed()
515 );
516
517 Ok(())
518 }
519
520 async fn register_failure_detectors(&self, detecting_regions: Vec<DetectingRegion>) {
521 let ts_millis = current_time_millis();
522 for region in detecting_regions {
523 self.failure_detector
525 .maybe_init_region_failure_detector(region, ts_millis);
526 }
527 }
528
529 async fn reset_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
530 let ts_millis = current_time_millis();
531 for region in detecting_regions {
532 self.failure_detector
533 .reset_region_failure_detector(region, ts_millis);
534 }
535 }
536
537 async fn deregister_failure_detectors(&mut self, detecting_regions: Vec<DetectingRegion>) {
538 for region in detecting_regions {
539 self.failure_detector.remove(®ion);
540 self.failover_counts.remove(®ion);
541 }
542 }
543
544 async fn handle_region_failures(&mut self, mut regions: Vec<(DatanodeId, RegionId)>) {
545 if regions.is_empty() {
546 return;
547 }
548 match self.is_maintenance_mode_enabled().await {
549 Ok(false) => {}
550 Ok(true) => {
551 warn!(
552 "Skipping failover since maintenance mode is enabled. Detected region failures: {:?}",
553 regions
554 );
555 return;
556 }
557 Err(err) => {
558 error!(err; "Failed to check maintenance mode");
559 return;
560 }
561 }
562
563 let migrating_regions = regions
565 .extract_if(.., |(_, region_id)| {
566 self.region_migration_manager.tracker().contains(*region_id)
567 })
568 .collect::<Vec<_>>();
569
570 for (datanode_id, region_id) in migrating_regions {
571 debug!(
572 "Removed region failover for region: {region_id}, datanode: {datanode_id} because it's migrating"
573 );
574 }
575
576 if regions.is_empty() {
577 return;
579 }
580
581 let mut grouped_regions: HashMap<u64, Vec<RegionId>> =
582 HashMap::with_capacity(regions.len());
583 for (datanode_id, region_id) in regions {
584 grouped_regions
585 .entry(datanode_id)
586 .or_default()
587 .push(region_id);
588 }
589
590 for (datanode_id, regions) in grouped_regions {
591 warn!(
592 "Detects region failures on datanode: {}, regions: {:?}",
593 datanode_id, regions
594 );
595 let failed_datanodes = [datanode_id];
599 match self
600 .generate_failover_tasks(datanode_id, ®ions, &failed_datanodes)
601 .await
602 {
603 Ok(tasks) => {
604 let mut grouped_tasks: HashMap<(u64, u64), Vec<_>> = HashMap::new();
605 for (task, count) in tasks {
606 grouped_tasks
607 .entry((task.from_peer.id, task.to_peer.id))
608 .or_default()
609 .push((task, count));
610 }
611
612 for ((from_peer_id, to_peer_id), tasks) in grouped_tasks {
613 if tasks.is_empty() {
614 continue;
615 }
616 let task = RegionMigrationTaskBatch::from_tasks(tasks);
617 let region_ids = task.region_ids.clone();
618 if let Err(err) = self.do_failover_tasks(task).await {
619 error!(err; "Failed to execute region failover for regions: {:?}, from_peer: {}, to_peer: {}", region_ids, from_peer_id, to_peer_id);
620 }
621 }
622 }
623 Err(err) => error!(err; "Failed to generate failover tasks"),
624 }
625 }
626 }
627
628 pub(crate) async fn is_maintenance_mode_enabled(&self) -> Result<bool> {
629 self.runtime_switch_manager
630 .maintenance_mode()
631 .await
632 .context(error::RuntimeSwitchManagerSnafu)
633 }
634
635 async fn select_peers(
636 &self,
637 from_peer_id: DatanodeId,
638 regions: &[RegionId],
639 failure_datanodes: &[DatanodeId],
640 ) -> Result<Vec<(RegionId, Peer)>> {
641 let exclude_peer_ids = HashSet::from_iter(failure_datanodes.iter().cloned());
642 match &self.selector {
643 RegionSupervisorSelector::NaiveSelector(selector) => {
644 let opt = SelectorOptions {
645 min_required_items: regions.len(),
646 allow_duplication: true,
647 exclude_peer_ids,
648 workload_filter: Some(accept_ingest_workload),
649 extensions: Default::default(),
650 };
651 let peers = selector.select(&self.selector_context, opt).await?;
652 ensure!(
653 peers.len() == regions.len(),
654 error::NoEnoughAvailableNodeSnafu {
655 required: regions.len(),
656 available: peers.len(),
657 select_target: SelectTarget::Datanode,
658 }
659 );
660 let region_peers = regions
661 .iter()
662 .zip(peers)
663 .map(|(region_id, peer)| (*region_id, peer))
664 .collect::<Vec<_>>();
665
666 Ok(region_peers)
667 }
668 RegionSupervisorSelector::RegionStatAwareSelector(selector) => {
669 let peers = selector
670 .select(
671 &self.selector_context,
672 from_peer_id,
673 regions,
674 exclude_peer_ids,
675 )
676 .await?;
677 ensure!(
678 peers.len() == regions.len(),
679 error::NoEnoughAvailableNodeSnafu {
680 required: regions.len(),
681 available: peers.len(),
682 select_target: SelectTarget::Datanode,
683 }
684 );
685
686 Ok(peers)
687 }
688 }
689 }
690
691 async fn generate_failover_tasks(
692 &mut self,
693 from_peer_id: DatanodeId,
694 regions: &[RegionId],
695 failed_datanodes: &[DatanodeId],
696 ) -> Result<Vec<(RegionMigrationProcedureTask, u32)>> {
697 let mut tasks = Vec::with_capacity(regions.len());
698 let from_peer = self
699 .peer_resolver
700 .datanode(from_peer_id)
701 .await
702 .ok()
703 .flatten()
704 .unwrap_or_else(|| Peer::empty(from_peer_id));
705
706 let region_peers = self
707 .select_peers(from_peer_id, regions, failed_datanodes)
708 .await?;
709
710 for (region_id, peer) in region_peers {
711 let count = *self
712 .failover_counts
713 .entry((from_peer_id, region_id))
714 .and_modify(|count| *count += 1)
715 .or_insert(1);
716 let task = RegionMigrationProcedureTask {
717 region_id,
718 from_peer: from_peer.clone(),
719 to_peer: peer,
720 timeout: DEFAULT_REGION_MIGRATION_TIMEOUT * count,
721 trigger_reason: RegionMigrationTriggerReason::Failover,
722 };
723 tasks.push((task, count));
724 }
725
726 Ok(tasks)
727 }
728
729 async fn do_failover_tasks(&mut self, task: RegionMigrationTaskBatch) -> Result<()> {
730 let from_peer_id = task.from_peer.id;
731 let to_peer_id = task.to_peer.id;
732 let timeout = task.timeout;
733 let trigger_reason = task.trigger_reason;
734 let result = self
735 .region_migration_manager
736 .submit_region_migration_task(task)
737 .await?;
738 self.handle_submit_region_migration_task_result(
739 from_peer_id,
740 to_peer_id,
741 timeout,
742 trigger_reason,
743 result,
744 )
745 .await
746 }
747
748 async fn handle_submit_region_migration_task_result(
749 &mut self,
750 from_peer_id: DatanodeId,
751 to_peer_id: DatanodeId,
752 timeout: Duration,
753 trigger_reason: RegionMigrationTriggerReason,
754 result: SubmitRegionMigrationTaskResult,
755 ) -> Result<()> {
756 if !result.migrated.is_empty() {
757 let detecting_regions = result
758 .migrated
759 .iter()
760 .map(|region_id| (from_peer_id, *region_id))
761 .collect::<Vec<_>>();
762 self.deregister_failure_detectors(detecting_regions).await;
763 info!(
764 "Region has been migrated to target peer: {}, removed failover detectors for regions: {:?}",
765 to_peer_id, result.migrated,
766 )
767 }
768 if !result.migrating.is_empty() {
769 info!(
770 "Region is still migrating, skipping failover for regions: {:?}",
771 result.migrating
772 );
773 }
774 if !result.region_not_found.is_empty() {
775 let detecting_regions = result
776 .region_not_found
777 .iter()
778 .map(|region_id| (from_peer_id, *region_id))
779 .collect::<Vec<_>>();
780 self.deregister_failure_detectors(detecting_regions).await;
781 info!(
782 "Region route not found, removed failover detectors for regions: {:?}",
783 result.region_not_found
784 );
785 }
786 if !result.table_not_found.is_empty() {
787 let detecting_regions = result
788 .table_not_found
789 .iter()
790 .map(|region_id| (from_peer_id, *region_id))
791 .collect::<Vec<_>>();
792 self.deregister_failure_detectors(detecting_regions).await;
793 info!(
794 "Table is not found, removed failover detectors for regions: {:?}",
795 result.table_not_found
796 );
797 }
798 if !result.leader_changed.is_empty() {
799 let detecting_regions = result
800 .leader_changed
801 .iter()
802 .map(|region_id| (from_peer_id, *region_id))
803 .collect::<Vec<_>>();
804 self.deregister_failure_detectors(detecting_regions).await;
805 info!(
806 "Region's leader peer changed, removed failover detectors for regions: {:?}",
807 result.leader_changed
808 );
809 }
810 if !result.peer_conflict.is_empty() {
811 info!(
812 "Region has peer conflict, ignore failover for regions: {:?}",
813 result.peer_conflict
814 );
815 }
816 if !result.submitted.is_empty() {
817 info!(
818 "Failover for regions: {:?}, from_peer: {}, to_peer: {}, procedure_id: {:?}, timeout: {:?}, trigger_reason: {:?}",
819 result.submitted,
820 from_peer_id,
821 to_peer_id,
822 result.procedure_id,
823 timeout,
824 trigger_reason,
825 );
826 }
827
828 Ok(())
829 }
830
831 fn detect_region_failure(&self) -> Vec<(DatanodeId, RegionId)> {
833 self.failure_detector
834 .iter()
835 .filter_map(|e| {
836 if !e.failure_detector().is_available(current_time_millis()) {
843 Some(*e.region_ident())
844 } else {
845 None
846 }
847 })
848 .collect::<Vec<_>>()
849 }
850
851 fn regions(&self) -> HashSet<RegionId> {
853 self.failure_detector
854 .iter()
855 .map(|e| e.region_ident().1)
856 .collect::<HashSet<_>>()
857 }
858
859 fn on_heartbeat_arrived(&self, heartbeat: DatanodeHeartbeat) {
861 for region_id in heartbeat.regions {
862 let detecting_region = (heartbeat.datanode_id, region_id);
863 let mut detector = self
864 .failure_detector
865 .region_failure_detector(detecting_region);
866 detector.heartbeat(heartbeat.timestamp);
867 }
868 }
869
870 fn clear(&self) {
871 self.failure_detector.clear();
872 }
873}
874
875#[cfg(test)]
876pub(crate) mod tests {
877 use std::assert_matches;
878 use std::collections::HashMap;
879 use std::sync::{Arc, Mutex};
880 use std::time::Duration;
881
882 use common_meta::ddl::RegionFailureDetectorController;
883 use common_meta::ddl::test_util::{
884 test_create_logical_table_task, test_create_physical_table_task,
885 };
886 use common_meta::key::table_route::{
887 LogicalTableRouteValue, PhysicalTableRouteValue, TableRouteValue,
888 };
889 use common_meta::key::{TableMetadataManager, runtime_switch};
890 use common_meta::peer::Peer;
891 use common_meta::rpc::router::{Region, RegionRoute};
892 use common_meta::test_util::NoopPeerResolver;
893 use common_telemetry::info;
894 use common_time::util::current_time_millis;
895 use rand::Rng;
896 use store_api::storage::RegionId;
897 use tokio::sync::mpsc::Sender;
898 use tokio::sync::oneshot;
899 use tokio::time::sleep;
900
901 use super::RegionSupervisorSelector;
902 use crate::procedure::region_migration::RegionMigrationTriggerReason;
903 use crate::procedure::region_migration::manager::{
904 RegionMigrationManager, SubmitRegionMigrationTaskResult,
905 };
906 use crate::procedure::region_migration::test_util::TestingEnv;
907 use crate::region::supervisor::{
908 DatanodeHeartbeat, Event, RegionFailureDetectorControl, RegionSupervisor,
909 RegionSupervisorTicker,
910 };
911 use crate::selector::test_utils::{RandomNodeSelector, new_test_selector_context};
912
913 pub(crate) fn new_test_supervisor() -> (RegionSupervisor, Sender<Event>) {
914 let env = TestingEnv::new();
915 let selector_context = new_test_selector_context();
916 let selector = Arc::new(RandomNodeSelector::new(vec![Peer::empty(1)]));
917 let context_factory = env.context_factory();
918 let region_migration_manager = Arc::new(RegionMigrationManager::new(
919 env.procedure_manager().clone(),
920 context_factory,
921 ));
922 let runtime_switch_manager =
923 Arc::new(runtime_switch::RuntimeSwitchManager::new(env.kv_backend()));
924 let peer_resolver = Arc::new(NoopPeerResolver);
925 let (tx, rx) = RegionSupervisor::channel();
926 let kv_backend = env.kv_backend();
927
928 (
929 RegionSupervisor::new(
930 rx,
931 Default::default(),
932 selector_context,
933 RegionSupervisorSelector::NaiveSelector(selector),
934 region_migration_manager,
935 runtime_switch_manager,
936 peer_resolver,
937 kv_backend,
938 ),
939 tx,
940 )
941 }
942
943 #[tokio::test]
944 async fn test_heartbeat() {
945 let (mut supervisor, sender) = new_test_supervisor();
946 tokio::spawn(async move { supervisor.run().await });
947
948 sender
949 .send(Event::HeartbeatArrived(DatanodeHeartbeat {
950 datanode_id: 0,
951 regions: vec![RegionId::new(1, 1)],
952 timestamp: 100,
953 }))
954 .await
955 .unwrap();
956 let (tx, rx) = oneshot::channel();
957 sender.send(Event::Dump(tx)).await.unwrap();
958 let detector = rx.await.unwrap();
959 assert!(detector.contains(&(0, RegionId::new(1, 1))));
960
961 sender.send(Event::Clear).await.unwrap();
963 let (tx, rx) = oneshot::channel();
964 sender.send(Event::Dump(tx)).await.unwrap();
965 assert!(rx.await.unwrap().is_empty());
966
967 fn generate_heartbeats(datanode_id: u64, region_ids: Vec<u32>) -> Vec<DatanodeHeartbeat> {
968 let mut rng = rand::rng();
969 let start = current_time_millis();
970 (0..2000)
971 .map(|i| DatanodeHeartbeat {
972 timestamp: start + i * 1000 + rng.random_range(0..100),
973 datanode_id,
974 regions: region_ids
975 .iter()
976 .map(|number| RegionId::new(0, *number))
977 .collect(),
978 })
979 .collect::<Vec<_>>()
980 }
981
982 let heartbeats = generate_heartbeats(100, vec![1, 2, 3]);
983 let last_heartbeat_time = heartbeats.last().unwrap().timestamp;
984 for heartbeat in heartbeats {
985 sender
986 .send(Event::HeartbeatArrived(heartbeat))
987 .await
988 .unwrap();
989 }
990
991 let (tx, rx) = oneshot::channel();
992 sender.send(Event::Dump(tx)).await.unwrap();
993 let detector = rx.await.unwrap();
994 assert_eq!(detector.len(), 3);
995
996 for e in detector.iter() {
997 let fd = e.failure_detector();
998 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis() as i64;
999 let start = last_heartbeat_time;
1000
1001 for i in 1..=acceptable_heartbeat_pause_millis / 1000 {
1003 let now = start + i * 1000;
1004 assert_eq!(fd.phi(now), 0.0);
1005 }
1006
1007 let now = start + acceptable_heartbeat_pause_millis + 1000;
1010 assert!(fd.phi(now) < fd.threshold() as _);
1011 let now = start + acceptable_heartbeat_pause_millis + 2000;
1012 assert!(fd.phi(now) > fd.threshold() as _);
1013 }
1014 }
1015
1016 #[tokio::test]
1017 async fn test_supervisor_ticker() {
1018 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1019 let ticker = RegionSupervisorTicker {
1020 tick_handle: Mutex::new(None),
1021 initialization_handle: Mutex::new(None),
1022 tick_interval: Duration::from_millis(10),
1023 initialization_delay: Duration::from_millis(100),
1024 initialization_retry_period: Duration::from_millis(100),
1025 sender: tx,
1026 };
1027 for _ in 0..2 {
1029 ticker.start();
1030 sleep(Duration::from_millis(100)).await;
1031 ticker.stop();
1032 assert!(!rx.is_empty());
1033 while let Ok(event) = rx.try_recv() {
1034 assert_matches!(
1035 event,
1036 Event::Tick | Event::Clear | Event::InitializeAllRegions(_)
1037 );
1038 }
1039 assert!(ticker.initialization_handle.lock().unwrap().is_none());
1040 assert!(ticker.tick_handle.lock().unwrap().is_none());
1041 }
1042 }
1043
1044 #[tokio::test]
1045 async fn test_initialize_all_regions_event_handling() {
1046 common_telemetry::init_default_ut_logging();
1047 let (tx, mut rx) = tokio::sync::mpsc::channel(128);
1048 let ticker = RegionSupervisorTicker {
1049 tick_handle: Mutex::new(None),
1050 initialization_handle: Mutex::new(None),
1051 tick_interval: Duration::from_millis(1000),
1052 initialization_delay: Duration::from_millis(50),
1053 initialization_retry_period: Duration::from_millis(50),
1054 sender: tx,
1055 };
1056 ticker.start();
1057 sleep(Duration::from_millis(60)).await;
1058 let handle = tokio::spawn(async move {
1059 let mut counter = 0;
1060 while let Some(event) = rx.recv().await {
1061 if let Event::InitializeAllRegions(tx) = event {
1062 if counter == 0 {
1063 counter += 1;
1065 continue;
1066 }
1067 tx.send(()).unwrap();
1068 info!("Responded initialize all regions event");
1069 break;
1070 }
1071 }
1072 rx
1073 });
1074
1075 let rx = handle.await.unwrap();
1076 for _ in 0..3 {
1077 sleep(Duration::from_millis(100)).await;
1078 assert!(rx.is_empty());
1079 }
1080 }
1081
1082 #[tokio::test]
1083 async fn test_initialize_all_regions() {
1084 common_telemetry::init_default_ut_logging();
1085 let (mut supervisor, sender) = new_test_supervisor();
1086 let table_metadata_manager = TableMetadataManager::new(supervisor.kv_backend.clone());
1087
1088 let table_id = 1024;
1090 let mut create_physical_table_task = test_create_physical_table_task("my_physical_table");
1091 create_physical_table_task.set_table_id(table_id);
1092 let table_info = create_physical_table_task.table_info;
1093 let table_route = PhysicalTableRouteValue::new(vec![RegionRoute {
1094 region: Region {
1095 id: RegionId::new(table_id, 0),
1096 ..Default::default()
1097 },
1098 leader_peer: Some(Peer::empty(1)),
1099 ..Default::default()
1100 }]);
1101 let table_route_value = TableRouteValue::Physical(table_route);
1102 table_metadata_manager
1103 .create_table_metadata(table_info, table_route_value, HashMap::new())
1104 .await
1105 .unwrap();
1106
1107 let logical_table_id = 1025;
1109 let mut test_create_logical_table_task = test_create_logical_table_task("my_logical_table");
1110 test_create_logical_table_task.set_table_id(logical_table_id);
1111 let table_info = test_create_logical_table_task.table_info;
1112 let table_route = LogicalTableRouteValue::new(1024);
1113 let table_route_value = TableRouteValue::Logical(table_route);
1114 table_metadata_manager
1115 .create_table_metadata(table_info, table_route_value, HashMap::new())
1116 .await
1117 .unwrap();
1118 tokio::spawn(async move { supervisor.run().await });
1119 let (tx, rx) = oneshot::channel();
1120 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1121 assert!(rx.await.is_ok());
1122
1123 let (tx, rx) = oneshot::channel();
1124 sender.send(Event::Dump(tx)).await.unwrap();
1125 let detector = rx.await.unwrap();
1126 assert_eq!(detector.len(), 1);
1127 assert!(detector.contains(&(1, RegionId::new(1024, 0))));
1128 }
1129
1130 #[tokio::test]
1131 async fn test_initialize_all_regions_with_maintenance_mode() {
1132 common_telemetry::init_default_ut_logging();
1133 let (mut supervisor, sender) = new_test_supervisor();
1134
1135 supervisor
1136 .runtime_switch_manager
1137 .set_maintenance_mode()
1138 .await
1139 .unwrap();
1140 tokio::spawn(async move { supervisor.run().await });
1141 let (tx, rx) = oneshot::channel();
1142 sender.send(Event::InitializeAllRegions(tx)).await.unwrap();
1143 assert!(rx.await.is_err());
1145 }
1146
1147 #[tokio::test]
1148 async fn test_region_failure_detector_controller() {
1149 let (mut supervisor, sender) = new_test_supervisor();
1150 let controller = RegionFailureDetectorControl::new(sender.clone());
1151 tokio::spawn(async move { supervisor.run().await });
1152 let detecting_region = (1, RegionId::new(1, 1));
1153 controller
1154 .register_failure_detectors(vec![detecting_region])
1155 .await;
1156
1157 let (tx, rx) = oneshot::channel();
1158 sender.send(Event::Dump(tx)).await.unwrap();
1159 let detector = rx.await.unwrap();
1160 let region_detector = detector.region_failure_detector(detecting_region).clone();
1161
1162 controller
1164 .register_failure_detectors(vec![detecting_region])
1165 .await;
1166 let (tx, rx) = oneshot::channel();
1167 sender.send(Event::Dump(tx)).await.unwrap();
1168 let detector = rx.await.unwrap();
1169 let got = detector.region_failure_detector(detecting_region).clone();
1170 assert_eq!(region_detector, got);
1171
1172 controller
1173 .deregister_failure_detectors(vec![detecting_region])
1174 .await;
1175 let (tx, rx) = oneshot::channel();
1176 sender.send(Event::Dump(tx)).await.unwrap();
1177 assert!(rx.await.unwrap().is_empty());
1178 }
1179
1180 #[tokio::test]
1181 async fn test_handle_submit_region_migration_task_result_migrated() {
1182 common_telemetry::init_default_ut_logging();
1183 let (mut supervisor, _) = new_test_supervisor();
1184 let region_id = RegionId::new(1, 1);
1185 let detecting_region = (1, region_id);
1186 supervisor
1187 .register_failure_detectors(vec![detecting_region])
1188 .await;
1189 supervisor.failover_counts.insert(detecting_region, 1);
1190 let result = SubmitRegionMigrationTaskResult {
1191 migrated: vec![region_id],
1192 ..Default::default()
1193 };
1194 supervisor
1195 .handle_submit_region_migration_task_result(
1196 1,
1197 2,
1198 Duration::from_millis(1000),
1199 RegionMigrationTriggerReason::Manual,
1200 result,
1201 )
1202 .await
1203 .unwrap();
1204 assert!(!supervisor.failure_detector.contains(&detecting_region));
1205 assert!(supervisor.failover_counts.is_empty());
1206 }
1207
1208 #[tokio::test]
1209 async fn test_handle_submit_region_migration_task_result_migrating() {
1210 common_telemetry::init_default_ut_logging();
1211 let (mut supervisor, _) = new_test_supervisor();
1212 let region_id = RegionId::new(1, 1);
1213 let detecting_region = (1, region_id);
1214 supervisor
1215 .register_failure_detectors(vec![detecting_region])
1216 .await;
1217 supervisor.failover_counts.insert(detecting_region, 1);
1218 let result = SubmitRegionMigrationTaskResult {
1219 migrating: vec![region_id],
1220 ..Default::default()
1221 };
1222 supervisor
1223 .handle_submit_region_migration_task_result(
1224 1,
1225 2,
1226 Duration::from_millis(1000),
1227 RegionMigrationTriggerReason::Manual,
1228 result,
1229 )
1230 .await
1231 .unwrap();
1232 assert!(supervisor.failure_detector.contains(&detecting_region));
1233 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1234 }
1235
1236 #[tokio::test]
1237 async fn test_handle_submit_region_migration_task_result_table_not_found() {
1238 common_telemetry::init_default_ut_logging();
1239 let (mut supervisor, _) = new_test_supervisor();
1240 let region_id = RegionId::new(1, 1);
1241 let detecting_region = (1, region_id);
1242 supervisor
1243 .register_failure_detectors(vec![detecting_region])
1244 .await;
1245 supervisor.failover_counts.insert(detecting_region, 1);
1246 let result = SubmitRegionMigrationTaskResult {
1247 table_not_found: vec![region_id],
1248 ..Default::default()
1249 };
1250 supervisor
1251 .handle_submit_region_migration_task_result(
1252 1,
1253 2,
1254 Duration::from_millis(1000),
1255 RegionMigrationTriggerReason::Manual,
1256 result,
1257 )
1258 .await
1259 .unwrap();
1260 assert!(!supervisor.failure_detector.contains(&detecting_region));
1261 assert!(supervisor.failover_counts.is_empty());
1262 }
1263
1264 #[tokio::test]
1265 async fn test_handle_submit_region_migration_task_result_region_not_found() {
1266 common_telemetry::init_default_ut_logging();
1267 let (mut supervisor, _) = new_test_supervisor();
1268 let region_id = RegionId::new(1, 1);
1269 let detecting_region = (1, region_id);
1270 supervisor
1271 .register_failure_detectors(vec![detecting_region])
1272 .await;
1273 supervisor.failover_counts.insert(detecting_region, 1);
1274 let result = SubmitRegionMigrationTaskResult {
1275 region_not_found: vec![region_id],
1276 ..Default::default()
1277 };
1278 supervisor
1279 .handle_submit_region_migration_task_result(
1280 1,
1281 2,
1282 Duration::from_millis(1000),
1283 RegionMigrationTriggerReason::Manual,
1284 result,
1285 )
1286 .await
1287 .unwrap();
1288 assert!(!supervisor.failure_detector.contains(&detecting_region));
1289 assert!(supervisor.failover_counts.is_empty());
1290 }
1291
1292 #[tokio::test]
1293 async fn test_handle_submit_region_migration_task_result_leader_changed() {
1294 common_telemetry::init_default_ut_logging();
1295 let (mut supervisor, _) = new_test_supervisor();
1296 let region_id = RegionId::new(1, 1);
1297 let detecting_region = (1, region_id);
1298 supervisor
1299 .register_failure_detectors(vec![detecting_region])
1300 .await;
1301 supervisor.failover_counts.insert(detecting_region, 1);
1302 let result = SubmitRegionMigrationTaskResult {
1303 leader_changed: vec![region_id],
1304 ..Default::default()
1305 };
1306 supervisor
1307 .handle_submit_region_migration_task_result(
1308 1,
1309 2,
1310 Duration::from_millis(1000),
1311 RegionMigrationTriggerReason::Manual,
1312 result,
1313 )
1314 .await
1315 .unwrap();
1316 assert!(!supervisor.failure_detector.contains(&detecting_region));
1317 assert!(supervisor.failover_counts.is_empty());
1318 }
1319
1320 #[tokio::test]
1321 async fn test_handle_submit_region_migration_task_result_peer_conflict() {
1322 common_telemetry::init_default_ut_logging();
1323 let (mut supervisor, _) = new_test_supervisor();
1324 let region_id = RegionId::new(1, 1);
1325 let detecting_region = (1, region_id);
1326 supervisor
1327 .register_failure_detectors(vec![detecting_region])
1328 .await;
1329 supervisor.failover_counts.insert(detecting_region, 1);
1330 let result = SubmitRegionMigrationTaskResult {
1331 peer_conflict: vec![region_id],
1332 ..Default::default()
1333 };
1334 supervisor
1335 .handle_submit_region_migration_task_result(
1336 1,
1337 2,
1338 Duration::from_millis(1000),
1339 RegionMigrationTriggerReason::Manual,
1340 result,
1341 )
1342 .await
1343 .unwrap();
1344 assert!(supervisor.failure_detector.contains(&detecting_region));
1345 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1346 }
1347
1348 #[tokio::test]
1349 async fn test_handle_submit_region_migration_task_result_submitted() {
1350 common_telemetry::init_default_ut_logging();
1351 let (mut supervisor, _) = new_test_supervisor();
1352 let region_id = RegionId::new(1, 1);
1353 let detecting_region = (1, region_id);
1354 supervisor
1355 .register_failure_detectors(vec![detecting_region])
1356 .await;
1357 supervisor.failover_counts.insert(detecting_region, 1);
1358 let result = SubmitRegionMigrationTaskResult {
1359 submitted: vec![region_id],
1360 ..Default::default()
1361 };
1362 supervisor
1363 .handle_submit_region_migration_task_result(
1364 1,
1365 2,
1366 Duration::from_millis(1000),
1367 RegionMigrationTriggerReason::Manual,
1368 result,
1369 )
1370 .await
1371 .unwrap();
1372 assert!(supervisor.failure_detector.contains(&detecting_region));
1373 assert!(supervisor.failover_counts.contains_key(&detecting_region));
1374 }
1375}