Skip to main content

meta_srv/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Duration;
17
18use serde::{Deserialize, Serialize};
19
20const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
21
22/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
23/// under Apache License 2.0.
24///
25/// You can find it's document here:
26/// <https://doc.akka.io/docs/akka/2.6.21/typed/failure-detector.html>
27///
28/// Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
29/// paper: <https://oneofus.la/have-emacs-will-hack/files/HDY04.pdf>
30///
31/// The suspicion level of failure is given by a value called φ (phi).
32/// The basic idea of the φ failure detector is to express the value of φ on a scale that
33/// is dynamically adjusted to reflect current network conditions. A configurable
34/// threshold is used to decide if φ is considered to be a failure.
35///
36/// The value of φ is calculated as:
37///
38/// φ = -log10(1 - F(timeSinceLastHeartbeat)
39///
40/// where F is the cumulative distribution function of a normal distribution with mean
41/// and standard deviation estimated from historical heartbeat inter-arrival times.
42#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
43pub(crate) struct PhiAccrualFailureDetector {
44    /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
45    /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
46    /// needs more time to detect actual crashes.
47    threshold: f32,
48
49    /// Minimum standard deviation to use for the normal distribution used when calculating phi.
50    /// Too low standard deviation might result in too much sensitivity for sudden, but normal,
51    /// deviations in heartbeat inter arrival times.
52    min_std_deviation_millis: f32,
53
54    /// Duration corresponding to number of potentially lost/delayed heartbeats that will be
55    /// accepted before considering it to be an anomaly.
56    /// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
57    /// arrivals, due to for example network drop.
58    acceptable_heartbeat_pause_millis: u32,
59
60    heartbeat_history: HeartbeatHistory,
61    last_heartbeat_millis: Option<i64>,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct PhiAccrualFailureDetectorOptions {
67    pub threshold: f32,
68    #[serde(with = "humantime_serde")]
69    pub min_std_deviation: Duration,
70    #[serde(with = "humantime_serde")]
71    pub acceptable_heartbeat_pause: Duration,
72}
73
74impl Default for PhiAccrualFailureDetectorOptions {
75    fn default() -> Self {
76        // default configuration is the same as of Akka:
77        // https://github.com/akka/akka/blob/v2.6.21/akka-cluster/src/main/resources/reference.conf#L181
78        Self {
79            threshold: 8_f32,
80            min_std_deviation: Duration::from_millis(100),
81            acceptable_heartbeat_pause: Duration::from_secs(10),
82        }
83    }
84}
85
86impl Default for PhiAccrualFailureDetector {
87    fn default() -> Self {
88        Self::from_options(Default::default())
89    }
90}
91
92impl PhiAccrualFailureDetector {
93    pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
94        Self {
95            threshold: options.threshold,
96            min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
97            acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
98                as u32,
99            heartbeat_history: HeartbeatHistory::new(1000),
100            last_heartbeat_millis: None,
101        }
102    }
103
104    pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
105        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
106            if ts_millis < last_heartbeat_millis {
107                return;
108            }
109
110            if self.is_available(ts_millis) {
111                let interval = ts_millis - last_heartbeat_millis;
112                self.heartbeat_history.add(interval)
113            }
114        } else {
115            // guess statistics for first heartbeat,
116            // important so that connections with only one heartbeat becomes unavailable
117            // bootstrap with 2 entries with rather high standard deviation
118            let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
119            self.heartbeat_history
120                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
121            self.heartbeat_history
122                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
123        }
124        let _ = self.last_heartbeat_millis.insert(ts_millis);
125    }
126
127    pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
128        self.phi(ts_millis) < self.threshold as _
129    }
130
131    /// The suspicion level of the accrual failure detector.
132    ///
133    /// If a connection does not have any records in failure detector then it is considered healthy.
134    pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
135        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
136            let time_diff = ts_millis - last_heartbeat_millis;
137            let mean = self.heartbeat_history.mean();
138            let std_deviation = self
139                .heartbeat_history
140                .std_deviation()
141                .max(self.min_std_deviation_millis as _);
142
143            phi(
144                time_diff,
145                mean + self.acceptable_heartbeat_pause_millis as f64,
146                std_deviation,
147            )
148        } else {
149            // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
150            0.0
151        }
152    }
153
154    #[cfg(test)]
155    pub(crate) fn threshold(&self) -> f32 {
156        self.threshold
157    }
158
159    #[cfg(test)]
160    pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
161        self.acceptable_heartbeat_pause_millis
162    }
163
164    #[cfg(test)]
165    pub(crate) fn last_heartbeat_millis(&self) -> Option<i64> {
166        self.last_heartbeat_millis
167    }
168}
169
170/// Calculation of phi, derived from the Cumulative distribution function for
171/// N(mean, stdDeviation) normal distribution, given by
172/// 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
173/// where y = (x - mean) / standard_deviation
174/// This is an approximation defined in β Mathematics Handbook (Logistic approximation).
175/// Error is 0.00014 at +- 3.16
176/// The calculated value is equivalent to -log10(1 - CDF(y))
177///
178/// Usually phi = 1 means likeliness that we will make a mistake is about 10%.
179/// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on.
180fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
181    assert_ne!(std_deviation, 0.0);
182
183    let time_diff = time_diff as f64;
184    let y = (time_diff - mean) / std_deviation;
185    let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
186    if time_diff > mean {
187        -(e / (1.0 + e)).log10()
188    } else {
189        -(1.0 - 1.0 / (1.0 + e)).log10()
190    }
191}
192
193/// Holds the heartbeat statistics.
194/// It is capped by the number of samples specified in `max_sample_size`.
195///
196/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
197#[derive(Debug, Clone, PartialEq)]
198struct HeartbeatHistory {
199    /// Number of samples to use for calculation of mean and standard deviation of inter-arrival
200    /// times.
201    max_sample_size: u32,
202
203    intervals: VecDeque<i64>,
204    interval_sum: i64,
205    squared_interval_sum: i64,
206}
207
208impl HeartbeatHistory {
209    fn new(max_sample_size: u32) -> Self {
210        Self {
211            max_sample_size,
212            intervals: VecDeque::with_capacity(max_sample_size as usize),
213            interval_sum: 0,
214            squared_interval_sum: 0,
215        }
216    }
217
218    fn mean(&self) -> f64 {
219        self.interval_sum as f64 / self.intervals.len() as f64
220    }
221
222    fn variance(&self) -> f64 {
223        let mean = self.mean();
224        self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
225    }
226
227    fn std_deviation(&self) -> f64 {
228        self.variance().sqrt()
229    }
230
231    fn add(&mut self, interval: i64) {
232        if self.intervals.len() as u32 >= self.max_sample_size {
233            self.drop_oldest();
234        }
235        self.intervals.push_back(interval);
236        self.interval_sum += interval;
237        self.squared_interval_sum += interval * interval;
238    }
239
240    fn drop_oldest(&mut self) {
241        let oldest = self
242            .intervals
243            .pop_front()
244            .expect("intervals must not be empty here");
245        self.interval_sum -= oldest;
246        self.squared_interval_sum -= oldest * oldest;
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use common_time::util::current_time_millis;
253
254    use super::*;
255
256    #[test]
257    fn test_is_available() {
258        let ts_millis = current_time_millis();
259
260        let mut fd = PhiAccrualFailureDetector::default();
261
262        // is available before first heartbeat
263        assert!(fd.is_available(ts_millis));
264
265        fd.heartbeat(ts_millis);
266
267        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
268        // is available when heartbeat
269        assert!(fd.is_available(ts_millis));
270        // is available before heartbeat timeout
271        assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
272        // is not available after heartbeat timeout
273        assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
274    }
275
276    #[test]
277    fn test_last_heartbeat() {
278        let ts_millis = current_time_millis();
279
280        let mut fd = PhiAccrualFailureDetector::default();
281
282        // no heartbeat yet
283        assert!(fd.last_heartbeat_millis.is_none());
284
285        fd.heartbeat(ts_millis);
286        assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
287    }
288
289    #[test]
290    fn test_phi() {
291        let ts_millis = current_time_millis();
292
293        let mut fd = PhiAccrualFailureDetector::default();
294
295        // phi == 0 before first heartbeat
296        assert_eq!(fd.phi(ts_millis), 0.0);
297
298        fd.heartbeat(ts_millis);
299
300        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
301        // phi == 0 when heartbeat
302        assert_eq!(fd.phi(ts_millis), 0.0);
303        // phi < threshold before heartbeat timeout
304        let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
305        assert!(fd.phi(now) < fd.threshold as _);
306        // phi >= threshold after heartbeat timeout
307        let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
308        assert!(fd.phi(now) >= fd.threshold as _);
309    }
310
311    // The following test cases are port from Akka's tests under Apache License 2.0:
312    // [AccrualFailureDetectorSpec.scala](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala).
313
314    #[test]
315    fn test_use_good_enough_cumulative_distribution_function() {
316        fn cdf(phi: f64) -> f64 {
317            1.0 - 10.0_f64.powf(-phi)
318        }
319
320        assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
321        assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
322        assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
323        assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
324        assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
325        assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
326        assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
327
328        for w in (0..40).collect::<Vec<i64>>().windows(2) {
329            assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
330        }
331
332        assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
333    }
334
335    #[test]
336    fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
337        assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
338        assert_eq!(phi(-25, 0.0, 1.0), 0.0);
339    }
340
341    #[test]
342    fn test_return_realistic_phi_values() {
343        let test = vec![
344            (0, 0.0),
345            (500, 0.1),
346            (1000, 0.3),
347            (1200, 1.6),
348            (1400, 4.7),
349            (1600, 10.8),
350            (1700, 15.3),
351        ];
352        for (time_diff, expected_phi) in test {
353            assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
354        }
355
356        // larger std_deviation results => lower phi
357        assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
358    }
359
360    #[test]
361    fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
362        let fd = PhiAccrualFailureDetector {
363            threshold: 8.0,
364            min_std_deviation_millis: 100.0,
365            acceptable_heartbeat_pause_millis: 0,
366            heartbeat_history: HeartbeatHistory::new(1000),
367            last_heartbeat_millis: None,
368        };
369        assert_eq!(fd.phi(current_time_millis()), 0.0);
370        assert_eq!(fd.phi(current_time_millis()), 0.0);
371    }
372
373    #[test]
374    fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
375        let mut fd = PhiAccrualFailureDetector {
376            threshold: 8.0,
377            min_std_deviation_millis: 100.0,
378            acceptable_heartbeat_pause_millis: 0,
379            heartbeat_history: HeartbeatHistory::new(1000),
380            last_heartbeat_millis: None,
381        };
382        fd.heartbeat(0);
383        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
384        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
385        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
386    }
387
388    #[test]
389    fn test_return_phi_using_first_interval_after_second_heartbeat() {
390        let mut fd = PhiAccrualFailureDetector {
391            threshold: 8.0,
392            min_std_deviation_millis: 100.0,
393            acceptable_heartbeat_pause_millis: 0,
394            heartbeat_history: HeartbeatHistory::new(1000),
395            last_heartbeat_millis: None,
396        };
397        fd.heartbeat(0);
398        assert!(fd.phi(100) > 0.0);
399        fd.heartbeat(200);
400        assert!(fd.phi(300) > 0.0);
401    }
402
403    #[test]
404    fn test_is_available_after_a_series_of_successful_heartbeats() {
405        let mut fd = PhiAccrualFailureDetector {
406            threshold: 8.0,
407            min_std_deviation_millis: 100.0,
408            acceptable_heartbeat_pause_millis: 0,
409            heartbeat_history: HeartbeatHistory::new(1000),
410            last_heartbeat_millis: None,
411        };
412        assert!(fd.last_heartbeat_millis.is_none());
413        fd.heartbeat(0);
414        fd.heartbeat(1000);
415        fd.heartbeat(1100);
416        let _ = fd.last_heartbeat_millis.unwrap();
417        assert!(fd.is_available(1200));
418    }
419
420    #[test]
421    fn test_is_not_available_if_heartbeat_are_missed() {
422        let mut fd = PhiAccrualFailureDetector {
423            threshold: 3.0,
424            min_std_deviation_millis: 100.0,
425            acceptable_heartbeat_pause_millis: 0,
426            heartbeat_history: HeartbeatHistory::new(1000),
427            last_heartbeat_millis: None,
428        };
429        fd.heartbeat(0);
430        fd.heartbeat(1000);
431        fd.heartbeat(1100);
432        assert!(fd.is_available(1200));
433        assert!(!fd.is_available(8200));
434    }
435
436    #[test]
437    fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
438     {
439        let mut fd = PhiAccrualFailureDetector {
440            threshold: 8.0,
441            min_std_deviation_millis: 100.0,
442            acceptable_heartbeat_pause_millis: 3000,
443            heartbeat_history: HeartbeatHistory::new(1000),
444            last_heartbeat_millis: None,
445        };
446
447        // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger
448        // unreachable again
449
450        let mut now = 0;
451        for _ in 0..1000 {
452            fd.heartbeat(now);
453            now += 1000;
454        }
455        now += 5 * 60 * 1000;
456        assert!(!fd.is_available(now)); // after the long pause
457        now += 100;
458        fd.heartbeat(now);
459        now += 900;
460        assert!(fd.is_available(now));
461        now += 100;
462        fd.heartbeat(now);
463        now += 7000;
464        assert!(!fd.is_available(now)); // after the 7 seconds pause
465        now += 100;
466        fd.heartbeat(now);
467        now += 900;
468        assert!(fd.is_available(now));
469        now += 100;
470        fd.heartbeat(now);
471        now += 900;
472        assert!(fd.is_available(now));
473    }
474
475    #[test]
476    fn test_accept_some_configured_missing_heartbeats() {
477        let mut fd = PhiAccrualFailureDetector {
478            threshold: 8.0,
479            min_std_deviation_millis: 100.0,
480            acceptable_heartbeat_pause_millis: 3000,
481            heartbeat_history: HeartbeatHistory::new(1000),
482            last_heartbeat_millis: None,
483        };
484        fd.heartbeat(0);
485        fd.heartbeat(1000);
486        fd.heartbeat(2000);
487        fd.heartbeat(3000);
488        assert!(fd.is_available(7000));
489        fd.heartbeat(8000);
490        assert!(fd.is_available(9000));
491    }
492
493    #[test]
494    fn test_fail_after_configured_acceptable_missing_heartbeats() {
495        let mut fd = PhiAccrualFailureDetector {
496            threshold: 8.0,
497            min_std_deviation_millis: 100.0,
498            acceptable_heartbeat_pause_millis: 3000,
499            heartbeat_history: HeartbeatHistory::new(1000),
500            last_heartbeat_millis: None,
501        };
502        fd.heartbeat(0);
503        fd.heartbeat(1000);
504        fd.heartbeat(2000);
505        fd.heartbeat(3000);
506        fd.heartbeat(4000);
507        fd.heartbeat(5000);
508        assert!(fd.is_available(5500));
509        fd.heartbeat(6000);
510        assert!(!fd.is_available(11000));
511    }
512
513    #[test]
514    fn test_use_max_sample_size_heartbeats() {
515        let mut fd = PhiAccrualFailureDetector {
516            threshold: 8.0,
517            min_std_deviation_millis: 100.0,
518            acceptable_heartbeat_pause_millis: 0,
519            heartbeat_history: HeartbeatHistory::new(3),
520            last_heartbeat_millis: None,
521        };
522        // 100 ms interval
523        fd.heartbeat(0);
524        fd.heartbeat(100);
525        fd.heartbeat(200);
526        fd.heartbeat(300);
527        let phi1 = fd.phi(400);
528        // 500 ms interval, should become same phi when 100 ms intervals have been dropped
529        fd.heartbeat(1000);
530        fd.heartbeat(1500);
531        fd.heartbeat(2000);
532        fd.heartbeat(2500);
533        let phi2 = fd.phi(3000);
534        assert_eq!(phi1, phi2);
535    }
536
537    #[test]
538    fn test_heartbeat_history_calculate_correct_mean_and_variance() {
539        let mut history = HeartbeatHistory::new(20);
540        for i in [100, 200, 125, 340, 130] {
541            history.add(i);
542        }
543        assert!((history.mean() - 179.0).abs() < 0.00001);
544        assert!((history.variance() - 7584.0).abs() < 0.00001);
545    }
546
547    #[test]
548    fn test_heartbeat_history_have_0_variance_for_one_sample() {
549        let mut history = HeartbeatHistory::new(600);
550        history.add(1000);
551        assert!((history.variance() - 0.0).abs() < 0.00001);
552    }
553
554    #[test]
555    fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
556        let mut history = HeartbeatHistory::new(3);
557        history.add(100);
558        history.add(110);
559        history.add(90);
560        assert!((history.mean() - 100.0).abs() < 0.00001);
561        assert!((history.variance() - 66.6666667).abs() < 0.00001);
562        history.add(140);
563        assert!((history.mean() - 113.333333).abs() < 0.00001);
564        assert!((history.variance() - 422.222222).abs() < 0.00001);
565        history.add(80);
566        assert!((history.mean() - 103.333333).abs() < 0.00001);
567        assert!((history.variance() - 688.88888889).abs() < 0.00001);
568    }
569}