meta_srv/region/
failure_detector.rs1use std::ops::DerefMut;
16
17use common_meta::ddl::DetectingRegion;
18use dashmap::mapref::multiple::RefMulti;
19use dashmap::DashMap;
20
21use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
22
23pub(crate) struct RegionFailureDetector {
25 options: PhiAccrualFailureDetectorOptions,
26 detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
27}
28
29pub(crate) struct FailureDetectorEntry<'a> {
30 e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
31}
32
33impl FailureDetectorEntry<'_> {
34 pub(crate) fn region_ident(&self) -> &DetectingRegion {
35 self.e.key()
36 }
37
38 pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
39 self.e.value()
40 }
41}
42
43impl RegionFailureDetector {
44 pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
45 Self {
46 options,
47 detectors: DashMap::new(),
48 }
49 }
50
51 pub(crate) fn region_failure_detector(
53 &self,
54 detecting_region: DetectingRegion,
55 ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
56 self.detectors
57 .entry(detecting_region)
58 .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
59 }
60
61 pub(crate) fn maybe_init_region_failure_detector(
65 &self,
66 detecting_region: DetectingRegion,
67 ts_millis: i64,
68 ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
69 self.detectors.entry(detecting_region).or_insert_with(|| {
70 let mut detector = PhiAccrualFailureDetector::from_options(self.options);
71 detector.heartbeat(ts_millis);
72 detector
73 })
74 }
75
76 pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry> + '_ {
78 self.detectors
79 .iter()
80 .map(move |e| FailureDetectorEntry { e })
81 }
82
83 pub(crate) fn remove(&self, region: &DetectingRegion) {
85 self.detectors.remove(region);
86 }
87
88 pub(crate) fn clear(&self) {
90 self.detectors.clear()
91 }
92
93 #[cfg(test)]
95 pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
96 self.detectors.contains_key(region)
97 }
98
99 #[cfg(test)]
101 pub(crate) fn len(&self) -> usize {
102 self.detectors.len()
103 }
104
105 #[cfg(test)]
107 pub(crate) fn is_empty(&self) -> bool {
108 self.detectors.is_empty()
109 }
110
111 #[cfg(test)]
112 pub(crate) fn dump(&self) -> RegionFailureDetector {
113 let mut m = DashMap::with_capacity(self.detectors.len());
114 m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
115 Self {
116 detectors: m,
117 options: self.options,
118 }
119 }
120}
121
122#[cfg(test)]
123mod tests {
124
125 use store_api::storage::RegionId;
126
127 use super::*;
128
129 #[test]
130 fn test_default_failure_detector_container() {
131 let container = RegionFailureDetector::new(Default::default());
132 let detecting_region = (2, RegionId::new(1, 1));
133 let _ = container.region_failure_detector(detecting_region);
134 assert!(container.contains(&detecting_region));
135
136 {
137 let mut iter = container.iter();
138 let _ = iter.next().unwrap();
139 assert!(iter.next().is_none());
140 }
141
142 container.clear();
143 assert!(container.is_empty());
144 }
145}