meta_srv/region/
failure_detector.rsuse std::ops::DerefMut;
use common_meta::ddl::DetectingRegion;
use dashmap::mapref::multiple::RefMulti;
use dashmap::DashMap;
use crate::failure_detector::{PhiAccrualFailureDetector, PhiAccrualFailureDetectorOptions};
pub(crate) struct RegionFailureDetector {
options: PhiAccrualFailureDetectorOptions,
detectors: DashMap<DetectingRegion, PhiAccrualFailureDetector>,
}
pub(crate) struct FailureDetectorEntry<'a> {
e: RefMulti<'a, DetectingRegion, PhiAccrualFailureDetector>,
}
impl FailureDetectorEntry<'_> {
pub(crate) fn region_ident(&self) -> &DetectingRegion {
self.e.key()
}
pub(crate) fn failure_detector(&self) -> &PhiAccrualFailureDetector {
self.e.value()
}
}
impl RegionFailureDetector {
pub(crate) fn new(options: PhiAccrualFailureDetectorOptions) -> Self {
Self {
options,
detectors: DashMap::new(),
}
}
pub(crate) fn region_failure_detector(
&self,
detecting_region: DetectingRegion,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors
.entry(detecting_region)
.or_insert_with(|| PhiAccrualFailureDetector::from_options(self.options))
}
pub(crate) fn maybe_init_region_failure_detector(
&self,
detecting_region: DetectingRegion,
ts_millis: i64,
) -> impl DerefMut<Target = PhiAccrualFailureDetector> + '_ {
self.detectors.entry(detecting_region).or_insert_with(|| {
let mut detector = PhiAccrualFailureDetector::from_options(self.options);
detector.heartbeat(ts_millis);
detector
})
}
pub(crate) fn iter(&self) -> impl Iterator<Item = FailureDetectorEntry> + '_ {
self.detectors
.iter()
.map(move |e| FailureDetectorEntry { e })
}
pub(crate) fn remove(&self, region: &DetectingRegion) {
self.detectors.remove(region);
}
pub(crate) fn clear(&self) {
self.detectors.clear()
}
#[cfg(test)]
pub(crate) fn contains(&self, region: &DetectingRegion) -> bool {
self.detectors.contains_key(region)
}
#[cfg(test)]
pub(crate) fn len(&self) -> usize {
self.detectors.len()
}
#[cfg(test)]
pub(crate) fn is_empty(&self) -> bool {
self.detectors.is_empty()
}
#[cfg(test)]
pub(crate) fn dump(&self) -> RegionFailureDetector {
let mut m = DashMap::with_capacity(self.detectors.len());
m.extend(self.detectors.iter().map(|x| (*x.key(), x.value().clone())));
Self {
detectors: m,
options: self.options,
}
}
}
#[cfg(test)]
mod tests {
use store_api::storage::RegionId;
use super::*;
#[test]
fn test_default_failure_detector_container() {
let container = RegionFailureDetector::new(Default::default());
let detecting_region = (2, RegionId::new(1, 1));
let _ = container.region_failure_detector(detecting_region);
assert!(container.contains(&detecting_region));
{
let mut iter = container.iter();
let _ = iter.next().unwrap();
assert!(iter.next().is_none());
}
container.clear();
assert!(container.is_empty());
}
}