meta_srv/region/
failure_detector.rs1use std::ops::DerefMut;
16
17use common_meta::ddl::DetectingRegion;
18use dashmap::DashMap;
19use dashmap::mapref::multiple::RefMulti;
20
21use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
22
23pub(crate) struct RegionFailureDetector {
25 options: PhiAccrualFailureDetectorOptions,
26 detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
27}
28
29pub(crate) struct FailureDetectorEntry<'a> {
30 e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
31}
32
33impl FailureDetectorEntry<'_> {
34 pub(crate) fn region_ident(&self) -> &DetectingRegion {
35 self.e.key()
36 }
37
38 pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
39 self.e.value()
40 }
41}
42
43impl RegionFailureDetector {
44 pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
45 Self {
46 options,
47 detectors: DashMap::new(),
48 }
49 }
50
51 pub(crate) fn region_failure_detector(
53 &self,
54 detecting_region: DetectingRegion,
55 ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
56 self.detectors
57 .entry(detecting_region)
58 .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
59 }
60
61 pub(crate) fn maybe_init_region_failure_detector(
65 &self,
66 detecting_region: DetectingRegion,
67 ts_millis: i64,
68 ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
69 self.detectors.entry(detecting_region).or_insert_with(|| {
70 let mut detector = PhiAccrualFailureDetector::from_options(self.options);
71 detector.heartbeat(ts_millis);
72 detector
73 })
74 }
75
76 pub(crate) fn reset_region_failure_detector(
80 &self,
81 detecting_region: DetectingRegion,
82 ts_millis: i64,
83 ) {
84 let mut detector = PhiAccrualFailureDetector::from_options(self.options);
85 detector.heartbeat(ts_millis);
86 self.detectors.insert(detecting_region, detector);
87 }
88
89 pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry<'_>> + '_ {
91 self.detectors
92 .iter()
93 .map(move |e| FailureDetectorEntry { e })
94 }
95
96 pub(crate) fn remove(&self, region: &DetectingRegion) {
98 self.detectors.remove(region);
99 }
100
101 pub(crate) fn clear(&self) {
103 self.detectors.clear()
104 }
105
106 #[cfg(test)]
108 pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
109 self.detectors.contains_key(region)
110 }
111
112 #[cfg(test)]
114 pub(crate) fn len(&self) -> usize {
115 self.detectors.len()
116 }
117
118 #[cfg(test)]
120 pub(crate) fn is_empty(&self) -> bool {
121 self.detectors.is_empty()
122 }
123
124 #[cfg(test)]
125 pub(crate) fn dump(&self) -> RegionFailureDetector {
126 let mut m = DashMap::with_capacity(self.detectors.len());
127 m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
128 Self {
129 detectors: m,
130 options: self.options,
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137
138 use store_api::storage::RegionId;
139
140 use super::*;
141
142 #[test]
143 fn test_default_failure_detector_container() {
144 let container = RegionFailureDetector::new(Default::default());
145 let detecting_region = (2, RegionId::new(1, 1));
146 let _ = container.region_failure_detector(detecting_region);
147 assert!(container.contains(&detecting_region));
148
149 {
150 let mut iter = container.iter();
151 let _ = iter.next().unwrap();
152 assert!(iter.next().is_none());
153 }
154
155 container.clear();
156 assert!(container.is_empty());
157 }
158
159 #[test]
160 fn test_reset_region_failure_detector() {
161 let container = RegionFailureDetector::new(Default::default());
162 let detecting_region = (2, RegionId::new(1, 1));
163 let first_heartbeat = 1_000;
164
165 {
166 let mut detector = container.region_failure_detector(detecting_region);
167 detector.heartbeat(first_heartbeat);
168 assert_eq!(Some(first_heartbeat), detector.last_heartbeat_millis());
169 }
170
171 let reset_at = 30_000;
172 container.reset_region_failure_detector(detecting_region, reset_at);
173
174 {
175 let detector = container.region_failure_detector(detecting_region);
176 assert_eq!(Some(reset_at), detector.last_heartbeat_millis());
177 }
178
179 let new_detecting_region = (3, RegionId::new(1, 1));
180 assert!(!container.contains(&new_detecting_region));
181
182 container.reset_region_failure_detector(new_detecting_region, reset_at);
183
184 assert!(container.contains(&new_detecting_region));
185 {
186 let detector = container.region_failure_detector(new_detecting_region);
187 assert_eq!(Some(reset_at), detector.last_heartbeat_millis());
188 }
189 }
190}