Skip to main content

meta_srv/region/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::DerefMut;
16
17use common_meta::ddl::DetectingRegion;
18use dashmap::DashMap;
19use dashmap::mapref::multiple::RefMulti;
20
21use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
22
23/// Detects the region failures.
24pub(crate) struct RegionFailureDetector {
25    options: PhiAccrualFailureDetectorOptions,
26    detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
27}
28
29pub(crate) struct FailureDetectorEntry<'a> {
30    e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
31}
32
33impl FailureDetectorEntry<'_> {
34    pub(crate) fn region_ident(&self) -> &DetectingRegion {
35        self.e.key()
36    }
37
38    pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
39        self.e.value()
40    }
41}
42
43impl RegionFailureDetector {
44    pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
45        Self {
46            options,
47            detectors: DashMap::new(),
48        }
49    }
50
51    /// Returns [`PhiAccrualFailureDetector`] of the specific [`DetectingRegion`].
52    pub(crate) fn region_failure_detector(
53        &self,
54        detecting_region: DetectingRegion,
55    ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
56        self.detectors
57            .entry(detecting_region)
58            .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
59    }
60
61    /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`].
62    /// If a detector already exists for the region, it is returned. Otherwise, a new
63    /// detector is created and initialized with the provided timestamp.
64    pub(crate) fn maybe_init_region_failure_detector(
65        &self,
66        detecting_region: DetectingRegion,
67        ts_millis: i64,
68    ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
69        self.detectors.entry(detecting_region).or_insert_with(|| {
70            let mut detector = PhiAccrualFailureDetector::from_options(self.options);
71            detector.heartbeat(ts_millis);
72            detector
73        })
74    }
75
76    /// Resets the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`] with the provided timestamp.
77    /// If a detector already exists for the region, it is reset. Otherwise, a new
78    /// detector is created and initialized with the provided timestamp.
79    pub(crate) fn reset_region_failure_detector(
80        &self,
81        detecting_region: DetectingRegion,
82        ts_millis: i64,
83    ) {
84        let mut detector = PhiAccrualFailureDetector::from_options(self.options);
85        detector.heartbeat(ts_millis);
86        self.detectors.insert(detecting_region, detector);
87    }
88
89    /// Returns a [FailureDetectorEntry] iterator.
90    pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry<'_>> + '_ {
91        self.detectors
92            .iter()
93            .map(move |e| FailureDetectorEntry { e })
94    }
95
96    /// Removes the specific [PhiAccrualFailureDetector] if exists.
97    pub(crate) fn remove(&self, region: &DetectingRegion) {
98        self.detectors.remove(region);
99    }
100
101    /// Removes all [PhiAccrualFailureDetector]s.
102    pub(crate) fn clear(&self) {
103        self.detectors.clear()
104    }
105
106    /// Returns true if the specific [`DetectingRegion`] exists.
107    #[cfg(test)]
108    pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
109        self.detectors.contains_key(region)
110    }
111
112    /// Returns the length
113    #[cfg(test)]
114    pub(crate) fn len(&self) -> usize {
115        self.detectors.len()
116    }
117
118    /// Returns true if it's empty
119    #[cfg(test)]
120    pub(crate) fn is_empty(&self) -> bool {
121        self.detectors.is_empty()
122    }
123
124    #[cfg(test)]
125    pub(crate) fn dump(&self) -> RegionFailureDetector {
126        let mut m = DashMap::with_capacity(self.detectors.len());
127        m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
128        Self {
129            detectors: m,
130            options: self.options,
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137
138    use store_api::storage::RegionId;
139
140    use super::*;
141
142    #[test]
143    fn test_default_failure_detector_container() {
144        let container = RegionFailureDetector::new(Default::default());
145        let detecting_region = (2, RegionId::new(1, 1));
146        let _ = container.region_failure_detector(detecting_region);
147        assert!(container.contains(&detecting_region));
148
149        {
150            let mut iter = container.iter();
151            let _ = iter.next().unwrap();
152            assert!(iter.next().is_none());
153        }
154
155        container.clear();
156        assert!(container.is_empty());
157    }
158
159    #[test]
160    fn test_reset_region_failure_detector() {
161        let container = RegionFailureDetector::new(Default::default());
162        let detecting_region = (2, RegionId::new(1, 1));
163        let first_heartbeat = 1_000;
164
165        {
166            let mut detector = container.region_failure_detector(detecting_region);
167            detector.heartbeat(first_heartbeat);
168            assert_eq!(Some(first_heartbeat), detector.last_heartbeat_millis());
169        }
170
171        let reset_at = 30_000;
172        container.reset_region_failure_detector(detecting_region, reset_at);
173
174        {
175            let detector = container.region_failure_detector(detecting_region);
176            assert_eq!(Some(reset_at), detector.last_heartbeat_millis());
177        }
178
179        let new_detecting_region = (3, RegionId::new(1, 1));
180        assert!(!container.contains(&new_detecting_region));
181
182        container.reset_region_failure_detector(new_detecting_region, reset_at);
183
184        assert!(container.contains(&new_detecting_region));
185        {
186            let detector = container.region_failure_detector(new_detecting_region);
187            assert_eq!(Some(reset_at), detector.last_heartbeat_millis());
188        }
189    }
190}