meta_srv/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Duration;
17
18use serde::{Deserialize, Serialize};
19
20const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
21
22/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
23/// under Apache License 2.0.
24///
25/// You can find it's document here:
26/// <https://doc.akka.io/docs/akka/2.6.21/typed/failure-detector.html>
27///
28/// Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
29/// paper: <https://oneofus.la/have-emacs-will-hack/files/HDY04.pdf>
30///
31/// The suspicion level of failure is given by a value called φ (phi).
32/// The basic idea of the φ failure detector is to express the value of φ on a scale that
33/// is dynamically adjusted to reflect current network conditions. A configurable
34/// threshold is used to decide if φ is considered to be a failure.
35///
36/// The value of φ is calculated as:
37///
38/// φ = -log10(1 - F(timeSinceLastHeartbeat)
39///
40/// where F is the cumulative distribution function of a normal distribution with mean
41/// and standard deviation estimated from historical heartbeat inter-arrival times.
42#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
43pub(crate) struct PhiAccrualFailureDetector {
44    /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
45    /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
46    /// needs more time to detect actual crashes.
47    threshold: f32,
48
49    /// Minimum standard deviation to use for the normal distribution used when calculating phi.
50    /// Too low standard deviation might result in too much sensitivity for sudden, but normal,
51    /// deviations in heartbeat inter arrival times.
52    min_std_deviation_millis: f32,
53
54    /// Duration corresponding to number of potentially lost/delayed heartbeats that will be
55    /// accepted before considering it to be an anomaly.
56    /// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
57    /// arrivals, due to for example network drop.
58    acceptable_heartbeat_pause_millis: u32,
59
60    heartbeat_history: HeartbeatHistory,
61    last_heartbeat_millis: Option<i64>,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct PhiAccrualFailureDetectorOptions {
67    pub threshold: f32,
68    #[serde(with = "humantime_serde")]
69    pub min_std_deviation: Duration,
70    #[serde(with = "humantime_serde")]
71    pub acceptable_heartbeat_pause: Duration,
72}
73
74impl Default for PhiAccrualFailureDetectorOptions {
75    fn default() -> Self {
76        // default configuration is the same as of Akka:
77        // https://github.com/akka/akka/blob/v2.6.21/akka-cluster/src/main/resources/reference.conf#L181
78        Self {
79            threshold: 8_f32,
80            min_std_deviation: Duration::from_millis(100),
81            acceptable_heartbeat_pause: Duration::from_secs(10),
82        }
83    }
84}
85
86impl Default for PhiAccrualFailureDetector {
87    fn default() -> Self {
88        Self::from_options(Default::default())
89    }
90}
91
92impl PhiAccrualFailureDetector {
93    pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
94        Self {
95            threshold: options.threshold,
96            min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
97            acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
98                as u32,
99            heartbeat_history: HeartbeatHistory::new(1000),
100            last_heartbeat_millis: None,
101        }
102    }
103
104    pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
105        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
106            if ts_millis < last_heartbeat_millis {
107                return;
108            }
109
110            if self.is_available(ts_millis) {
111                let interval = ts_millis - last_heartbeat_millis;
112                self.heartbeat_history.add(interval)
113            }
114        } else {
115            // guess statistics for first heartbeat,
116            // important so that connections with only one heartbeat becomes unavailable
117            // bootstrap with 2 entries with rather high standard deviation
118            let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
119            self.heartbeat_history
120                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
121            self.heartbeat_history
122                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
123        }
124        let _ = self.last_heartbeat_millis.insert(ts_millis);
125    }
126
127    pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
128        self.phi(ts_millis) < self.threshold as _
129    }
130
131    /// The suspicion level of the accrual failure detector.
132    ///
133    /// If a connection does not have any records in failure detector then it is considered healthy.
134    pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
135        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
136            let time_diff = ts_millis - last_heartbeat_millis;
137            let mean = self.heartbeat_history.mean();
138            let std_deviation = self
139                .heartbeat_history
140                .std_deviation()
141                .max(self.min_std_deviation_millis as _);
142
143            phi(
144                time_diff,
145                mean + self.acceptable_heartbeat_pause_millis as f64,
146                std_deviation,
147            )
148        } else {
149            // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
150            0.0
151        }
152    }
153
154    #[cfg(test)]
155    pub(crate) fn threshold(&self) -> f32 {
156        self.threshold
157    }
158
159    #[cfg(test)]
160    pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
161        self.acceptable_heartbeat_pause_millis
162    }
163}
164
165/// Calculation of phi, derived from the Cumulative distribution function for
166/// N(mean, stdDeviation) normal distribution, given by
167/// 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
168/// where y = (x - mean) / standard_deviation
169/// This is an approximation defined in β Mathematics Handbook (Logistic approximation).
170/// Error is 0.00014 at +- 3.16
171/// The calculated value is equivalent to -log10(1 - CDF(y))
172///
173/// Usually phi = 1 means likeliness that we will make a mistake is about 10%.
174/// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on.
175fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
176    assert_ne!(std_deviation, 0.0);
177
178    let time_diff = time_diff as f64;
179    let y = (time_diff - mean) / std_deviation;
180    let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
181    if time_diff > mean {
182        -(e / (1.0 + e)).log10()
183    } else {
184        -(1.0 - 1.0 / (1.0 + e)).log10()
185    }
186}
187
188/// Holds the heartbeat statistics.
189/// It is capped by the number of samples specified in `max_sample_size`.
190///
191/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
192#[derive(Debug, Clone, PartialEq)]
193struct HeartbeatHistory {
194    /// Number of samples to use for calculation of mean and standard deviation of inter-arrival
195    /// times.
196    max_sample_size: u32,
197
198    intervals: VecDeque<i64>,
199    interval_sum: i64,
200    squared_interval_sum: i64,
201}
202
203impl HeartbeatHistory {
204    fn new(max_sample_size: u32) -> Self {
205        Self {
206            max_sample_size,
207            intervals: VecDeque::with_capacity(max_sample_size as usize),
208            interval_sum: 0,
209            squared_interval_sum: 0,
210        }
211    }
212
213    fn mean(&self) -> f64 {
214        self.interval_sum as f64 / self.intervals.len() as f64
215    }
216
217    fn variance(&self) -> f64 {
218        let mean = self.mean();
219        self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
220    }
221
222    fn std_deviation(&self) -> f64 {
223        self.variance().sqrt()
224    }
225
226    fn add(&mut self, interval: i64) {
227        if self.intervals.len() as u32 >= self.max_sample_size {
228            self.drop_oldest();
229        }
230        self.intervals.push_back(interval);
231        self.interval_sum += interval;
232        self.squared_interval_sum += interval * interval;
233    }
234
235    fn drop_oldest(&mut self) {
236        let oldest = self
237            .intervals
238            .pop_front()
239            .expect("intervals must not be empty here");
240        self.interval_sum -= oldest;
241        self.squared_interval_sum -= oldest * oldest;
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use common_time::util::current_time_millis;
248
249    use super::*;
250
251    #[test]
252    fn test_is_available() {
253        let ts_millis = current_time_millis();
254
255        let mut fd = PhiAccrualFailureDetector::default();
256
257        // is available before first heartbeat
258        assert!(fd.is_available(ts_millis));
259
260        fd.heartbeat(ts_millis);
261
262        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
263        // is available when heartbeat
264        assert!(fd.is_available(ts_millis));
265        // is available before heartbeat timeout
266        assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
267        // is not available after heartbeat timeout
268        assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
269    }
270
271    #[test]
272    fn test_last_heartbeat() {
273        let ts_millis = current_time_millis();
274
275        let mut fd = PhiAccrualFailureDetector::default();
276
277        // no heartbeat yet
278        assert!(fd.last_heartbeat_millis.is_none());
279
280        fd.heartbeat(ts_millis);
281        assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
282    }
283
284    #[test]
285    fn test_phi() {
286        let ts_millis = current_time_millis();
287
288        let mut fd = PhiAccrualFailureDetector::default();
289
290        // phi == 0 before first heartbeat
291        assert_eq!(fd.phi(ts_millis), 0.0);
292
293        fd.heartbeat(ts_millis);
294
295        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
296        // phi == 0 when heartbeat
297        assert_eq!(fd.phi(ts_millis), 0.0);
298        // phi < threshold before heartbeat timeout
299        let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
300        assert!(fd.phi(now) < fd.threshold as _);
301        // phi >= threshold after heartbeat timeout
302        let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
303        assert!(fd.phi(now) >= fd.threshold as _);
304    }
305
306    // The following test cases are port from Akka's tests under Apache License 2.0:
307    // [AccrualFailureDetectorSpec.scala](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala).
308
309    #[test]
310    fn test_use_good_enough_cumulative_distribution_function() {
311        fn cdf(phi: f64) -> f64 {
312            1.0 - 10.0_f64.powf(-phi)
313        }
314
315        assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
316        assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
317        assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
318        assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
319        assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
320        assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
321        assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
322
323        for w in (0..40).collect::<Vec<i64>>().windows(2) {
324            assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
325        }
326
327        assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
328    }
329
330    #[test]
331    fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
332        assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
333        assert_eq!(phi(-25, 0.0, 1.0), 0.0);
334    }
335
336    #[test]
337    fn test_return_realistic_phi_values() {
338        let test = vec![
339            (0, 0.0),
340            (500, 0.1),
341            (1000, 0.3),
342            (1200, 1.6),
343            (1400, 4.7),
344            (1600, 10.8),
345            (1700, 15.3),
346        ];
347        for (time_diff, expected_phi) in test {
348            assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
349        }
350
351        // larger std_deviation results => lower phi
352        assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
353    }
354
355    #[test]
356    fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
357        let fd = PhiAccrualFailureDetector {
358            threshold: 8.0,
359            min_std_deviation_millis: 100.0,
360            acceptable_heartbeat_pause_millis: 0,
361            heartbeat_history: HeartbeatHistory::new(1000),
362            last_heartbeat_millis: None,
363        };
364        assert_eq!(fd.phi(current_time_millis()), 0.0);
365        assert_eq!(fd.phi(current_time_millis()), 0.0);
366    }
367
368    #[test]
369    fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
370        let mut fd = PhiAccrualFailureDetector {
371            threshold: 8.0,
372            min_std_deviation_millis: 100.0,
373            acceptable_heartbeat_pause_millis: 0,
374            heartbeat_history: HeartbeatHistory::new(1000),
375            last_heartbeat_millis: None,
376        };
377        fd.heartbeat(0);
378        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
379        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
380        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
381    }
382
383    #[test]
384    fn test_return_phi_using_first_interval_after_second_heartbeat() {
385        let mut fd = PhiAccrualFailureDetector {
386            threshold: 8.0,
387            min_std_deviation_millis: 100.0,
388            acceptable_heartbeat_pause_millis: 0,
389            heartbeat_history: HeartbeatHistory::new(1000),
390            last_heartbeat_millis: None,
391        };
392        fd.heartbeat(0);
393        assert!(fd.phi(100) > 0.0);
394        fd.heartbeat(200);
395        assert!(fd.phi(300) > 0.0);
396    }
397
398    #[test]
399    fn test_is_available_after_a_series_of_successful_heartbeats() {
400        let mut fd = PhiAccrualFailureDetector {
401            threshold: 8.0,
402            min_std_deviation_millis: 100.0,
403            acceptable_heartbeat_pause_millis: 0,
404            heartbeat_history: HeartbeatHistory::new(1000),
405            last_heartbeat_millis: None,
406        };
407        assert!(fd.last_heartbeat_millis.is_none());
408        fd.heartbeat(0);
409        fd.heartbeat(1000);
410        fd.heartbeat(1100);
411        let _ = fd.last_heartbeat_millis.unwrap();
412        assert!(fd.is_available(1200));
413    }
414
415    #[test]
416    fn test_is_not_available_if_heartbeat_are_missed() {
417        let mut fd = PhiAccrualFailureDetector {
418            threshold: 3.0,
419            min_std_deviation_millis: 100.0,
420            acceptable_heartbeat_pause_millis: 0,
421            heartbeat_history: HeartbeatHistory::new(1000),
422            last_heartbeat_millis: None,
423        };
424        fd.heartbeat(0);
425        fd.heartbeat(1000);
426        fd.heartbeat(1100);
427        assert!(fd.is_available(1200));
428        assert!(!fd.is_available(8200));
429    }
430
431    #[test]
432    fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
433     {
434        let mut fd = PhiAccrualFailureDetector {
435            threshold: 8.0,
436            min_std_deviation_millis: 100.0,
437            acceptable_heartbeat_pause_millis: 3000,
438            heartbeat_history: HeartbeatHistory::new(1000),
439            last_heartbeat_millis: None,
440        };
441
442        // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger
443        // unreachable again
444
445        let mut now = 0;
446        for _ in 0..1000 {
447            fd.heartbeat(now);
448            now += 1000;
449        }
450        now += 5 * 60 * 1000;
451        assert!(!fd.is_available(now)); // after the long pause
452        now += 100;
453        fd.heartbeat(now);
454        now += 900;
455        assert!(fd.is_available(now));
456        now += 100;
457        fd.heartbeat(now);
458        now += 7000;
459        assert!(!fd.is_available(now)); // after the 7 seconds pause
460        now += 100;
461        fd.heartbeat(now);
462        now += 900;
463        assert!(fd.is_available(now));
464        now += 100;
465        fd.heartbeat(now);
466        now += 900;
467        assert!(fd.is_available(now));
468    }
469
470    #[test]
471    fn test_accept_some_configured_missing_heartbeats() {
472        let mut fd = PhiAccrualFailureDetector {
473            threshold: 8.0,
474            min_std_deviation_millis: 100.0,
475            acceptable_heartbeat_pause_millis: 3000,
476            heartbeat_history: HeartbeatHistory::new(1000),
477            last_heartbeat_millis: None,
478        };
479        fd.heartbeat(0);
480        fd.heartbeat(1000);
481        fd.heartbeat(2000);
482        fd.heartbeat(3000);
483        assert!(fd.is_available(7000));
484        fd.heartbeat(8000);
485        assert!(fd.is_available(9000));
486    }
487
488    #[test]
489    fn test_fail_after_configured_acceptable_missing_heartbeats() {
490        let mut fd = PhiAccrualFailureDetector {
491            threshold: 8.0,
492            min_std_deviation_millis: 100.0,
493            acceptable_heartbeat_pause_millis: 3000,
494            heartbeat_history: HeartbeatHistory::new(1000),
495            last_heartbeat_millis: None,
496        };
497        fd.heartbeat(0);
498        fd.heartbeat(1000);
499        fd.heartbeat(2000);
500        fd.heartbeat(3000);
501        fd.heartbeat(4000);
502        fd.heartbeat(5000);
503        assert!(fd.is_available(5500));
504        fd.heartbeat(6000);
505        assert!(!fd.is_available(11000));
506    }
507
508    #[test]
509    fn test_use_max_sample_size_heartbeats() {
510        let mut fd = PhiAccrualFailureDetector {
511            threshold: 8.0,
512            min_std_deviation_millis: 100.0,
513            acceptable_heartbeat_pause_millis: 0,
514            heartbeat_history: HeartbeatHistory::new(3),
515            last_heartbeat_millis: None,
516        };
517        // 100 ms interval
518        fd.heartbeat(0);
519        fd.heartbeat(100);
520        fd.heartbeat(200);
521        fd.heartbeat(300);
522        let phi1 = fd.phi(400);
523        // 500 ms interval, should become same phi when 100 ms intervals have been dropped
524        fd.heartbeat(1000);
525        fd.heartbeat(1500);
526        fd.heartbeat(2000);
527        fd.heartbeat(2500);
528        let phi2 = fd.phi(3000);
529        assert_eq!(phi1, phi2);
530    }
531
532    #[test]
533    fn test_heartbeat_history_calculate_correct_mean_and_variance() {
534        let mut history = HeartbeatHistory::new(20);
535        for i in [100, 200, 125, 340, 130] {
536            history.add(i);
537        }
538        assert!((history.mean() - 179.0).abs() < 0.00001);
539        assert!((history.variance() - 7584.0).abs() < 0.00001);
540    }
541
542    #[test]
543    fn test_heartbeat_history_have_0_variance_for_one_sample() {
544        let mut history = HeartbeatHistory::new(600);
545        history.add(1000);
546        assert!((history.variance() - 0.0).abs() < 0.00001);
547    }
548
549    #[test]
550    fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
551        let mut history = HeartbeatHistory::new(3);
552        history.add(100);
553        history.add(110);
554        history.add(90);
555        assert!((history.mean() - 100.0).abs() < 0.00001);
556        assert!((history.variance() - 66.6666667).abs() < 0.00001);
557        history.add(140);
558        assert!((history.mean() - 113.333333).abs() < 0.00001);
559        assert!((history.variance() - 422.222222).abs() < 0.00001);
560        history.add(80);
561        assert!((history.mean() - 103.333333).abs() < 0.00001);
562        assert!((history.variance() - 688.88888889).abs() < 0.00001);
563    }
564}