meta_srv/region/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::DerefMut;
16
17use common_meta::ddl::DetectingRegion;
18use dashmap::mapref::multiple::RefMulti;
19use dashmap::DashMap;
20
21use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
22
23/// Detects the region failures.
24pub(crate) struct RegionFailureDetector {
25    options: PhiAccrualFailureDetectorOptions,
26    detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
27}
28
29pub(crate) struct FailureDetectorEntry<'a> {
30    e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
31}
32
33impl FailureDetectorEntry<'_> {
34    pub(crate) fn region_ident(&self) -> &DetectingRegion {
35        self.e.key()
36    }
37
38    pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
39        self.e.value()
40    }
41}
42
43impl RegionFailureDetector {
44    pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
45        Self {
46            options,
47            detectors: DashMap::new(),
48        }
49    }
50
51    /// Returns [`PhiAccrualFailureDetector`] of the specific [`DetectingRegion`].
52    pub(crate) fn region_failure_detector(
53        &self,
54        detecting_region: DetectingRegion,
55    ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
56        self.detectors
57            .entry(detecting_region)
58            .or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
59    }
60
61    /// Returns A mutable reference to the [`PhiAccrualFailureDetector`] for the specified [`DetectingRegion`].
62    /// If a detector already exists for the region, it is returned. Otherwise, a new
63    /// detector is created and initialized with the provided timestamp.
64    pub(crate) fn maybe_init_region_failure_detector(
65        &self,
66        detecting_region: DetectingRegion,
67        ts_millis: i64,
68    ) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
69        self.detectors.entry(detecting_region).or_insert_with(|| {
70            let mut detector = PhiAccrualFailureDetector::from_options(self.options);
71            detector.heartbeat(ts_millis);
72            detector
73        })
74    }
75
76    /// Returns a [FailureDetectorEntry] iterator.
77    pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry> + '_ {
78        self.detectors
79            .iter()
80            .map(move |e| FailureDetectorEntry { e })
81    }
82
83    /// Removes the specific [PhiAccrualFailureDetector] if exists.
84    pub(crate) fn remove(&self, region: &DetectingRegion) {
85        self.detectors.remove(region);
86    }
87
88    /// Removes all [PhiAccrualFailureDetector]s.
89    pub(crate) fn clear(&self) {
90        self.detectors.clear()
91    }
92
93    /// Returns true if the specific [`DetectingRegion`] exists.
94    #[cfg(test)]
95    pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
96        self.detectors.contains_key(region)
97    }
98
99    /// Returns the length
100    #[cfg(test)]
101    pub(crate) fn len(&self) -> usize {
102        self.detectors.len()
103    }
104
105    /// Returns true if it's empty
106    #[cfg(test)]
107    pub(crate) fn is_empty(&self) -> bool {
108        self.detectors.is_empty()
109    }
110
111    #[cfg(test)]
112    pub(crate) fn dump(&self) -> RegionFailureDetector {
113        let mut m = DashMap::with_capacity(self.detectors.len());
114        m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
115        Self {
116            detectors: m,
117            options: self.options,
118        }
119    }
120}
121
122#[cfg(test)]
123mod tests {
124
125    use store_api::storage::RegionId;
126
127    use super::*;
128
129    #[test]
130    fn test_default_failure_detector_container() {
131        let container = RegionFailureDetector::new(Default::default());
132        let detecting_region = (2, RegionId::new(1, 1));
133        let _ = container.region_failure_detector(detecting_region);
134        assert!(container.contains(&detecting_region));
135
136        {
137            let mut iter = container.iter();
138            let _ = iter.next().unwrap();
139            assert!(iter.next().is_none());
140        }
141
142        container.clear();
143        assert!(container.is_empty());
144    }
145}