meta_srv/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Duration;
17
18use common_meta::distributed_time_constants;
19use serde::{Deserialize, Serialize};
20
21/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
22/// under Apache License 2.0.
23///
24/// You can find it's document here:
25/// <https://doc.akka.io/docs/akka/2.6.21/typed/failure-detector.html>
26///
27/// Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
28/// paper: <https://oneofus.la/have-emacs-will-hack/files/HDY04.pdf>
29///
30/// The suspicion level of failure is given by a value called φ (phi).
31/// The basic idea of the φ failure detector is to express the value of φ on a scale that
32/// is dynamically adjusted to reflect current network conditions. A configurable
33/// threshold is used to decide if φ is considered to be a failure.
34///
35/// The value of φ is calculated as:
36///
37/// φ = -log10(1 - F(timeSinceLastHeartbeat)
38///
39/// where F is the cumulative distribution function of a normal distribution with mean
40/// and standard deviation estimated from historical heartbeat inter-arrival times.
41#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
42pub(crate) struct PhiAccrualFailureDetector {
43    /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
44    /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
45    /// needs more time to detect actual crashes.
46    threshold: f32,
47
48    /// Minimum standard deviation to use for the normal distribution used when calculating phi.
49    /// Too low standard deviation might result in too much sensitivity for sudden, but normal,
50    /// deviations in heartbeat inter arrival times.
51    min_std_deviation_millis: f32,
52
53    /// Duration corresponding to number of potentially lost/delayed heartbeats that will be
54    /// accepted before considering it to be an anomaly.
55    /// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
56    /// arrivals, due to for example network drop.
57    acceptable_heartbeat_pause_millis: u32,
58
59    /// Bootstrap the stats with heartbeats that corresponds to this duration, with a rather high
60    /// standard deviation (since environment is unknown in the beginning).
61    first_heartbeat_estimate_millis: u32,
62
63    heartbeat_history: HeartbeatHistory,
64    last_heartbeat_millis: Option<i64>,
65}
66
67#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
68#[serde(default)]
69pub struct PhiAccrualFailureDetectorOptions {
70    pub threshold: f32,
71    #[serde(with = "humantime_serde")]
72    pub min_std_deviation: Duration,
73    #[serde(with = "humantime_serde")]
74    pub acceptable_heartbeat_pause: Duration,
75    #[serde(with = "humantime_serde")]
76    pub first_heartbeat_estimate: Duration,
77}
78
79impl Default for PhiAccrualFailureDetectorOptions {
80    fn default() -> Self {
81        // default configuration is the same as of Akka:
82        // https://github.com/akka/akka/blob/v2.6.21/akka-cluster/src/main/resources/reference.conf#L181
83        Self {
84            threshold: 8_f32,
85            min_std_deviation: Duration::from_millis(100),
86            acceptable_heartbeat_pause: Duration::from_secs(
87                distributed_time_constants::DATANODE_LEASE_SECS,
88            ),
89            first_heartbeat_estimate: Duration::from_millis(1000),
90        }
91    }
92}
93
94impl Default for PhiAccrualFailureDetector {
95    fn default() -> Self {
96        Self::from_options(Default::default())
97    }
98}
99
100impl PhiAccrualFailureDetector {
101    pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
102        Self {
103            threshold: options.threshold,
104            min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
105            acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
106                as u32,
107            first_heartbeat_estimate_millis: options.first_heartbeat_estimate.as_millis() as u32,
108            heartbeat_history: HeartbeatHistory::new(1000),
109            last_heartbeat_millis: None,
110        }
111    }
112
113    pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
114        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
115            if ts_millis < last_heartbeat_millis {
116                return;
117            }
118
119            if self.is_available(ts_millis) {
120                let interval = ts_millis - last_heartbeat_millis;
121                self.heartbeat_history.add(interval)
122            }
123        } else {
124            // guess statistics for first heartbeat,
125            // important so that connections with only one heartbeat becomes unavailable
126            // bootstrap with 2 entries with rather high standard deviation
127            let std_deviation = self.first_heartbeat_estimate_millis / 4;
128            self.heartbeat_history
129                .add((self.first_heartbeat_estimate_millis - std_deviation) as _);
130            self.heartbeat_history
131                .add((self.first_heartbeat_estimate_millis + std_deviation) as _);
132        }
133        let _ = self.last_heartbeat_millis.insert(ts_millis);
134    }
135
136    pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
137        self.phi(ts_millis) < self.threshold as _
138    }
139
140    /// The suspicion level of the accrual failure detector.
141    ///
142    /// If a connection does not have any records in failure detector then it is considered healthy.
143    pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
144        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
145            let time_diff = ts_millis - last_heartbeat_millis;
146            let mean = self.heartbeat_history.mean();
147            let std_deviation = self
148                .heartbeat_history
149                .std_deviation()
150                .max(self.min_std_deviation_millis as _);
151
152            phi(
153                time_diff,
154                mean + self.acceptable_heartbeat_pause_millis as f64,
155                std_deviation,
156            )
157        } else {
158            // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
159            0.0
160        }
161    }
162
163    #[cfg(test)]
164    pub(crate) fn threshold(&self) -> f32 {
165        self.threshold
166    }
167
168    #[cfg(test)]
169    pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
170        self.acceptable_heartbeat_pause_millis
171    }
172}
173
174/// Calculation of phi, derived from the Cumulative distribution function for
175/// N(mean, stdDeviation) normal distribution, given by
176/// 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
177/// where y = (x - mean) / standard_deviation
178/// This is an approximation defined in β Mathematics Handbook (Logistic approximation).
179/// Error is 0.00014 at +- 3.16
180/// The calculated value is equivalent to -log10(1 - CDF(y))
181///
182/// Usually phi = 1 means likeliness that we will make a mistake is about 10%.
183/// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on.
184fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
185    assert_ne!(std_deviation, 0.0);
186
187    let time_diff = time_diff as f64;
188    let y = (time_diff - mean) / std_deviation;
189    let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
190    if time_diff > mean {
191        -(e / (1.0 + e)).log10()
192    } else {
193        -(1.0 - 1.0 / (1.0 + e)).log10()
194    }
195}
196
197/// Holds the heartbeat statistics.
198/// It is capped by the number of samples specified in `max_sample_size`.
199///
200/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
201#[derive(Debug, Clone, PartialEq)]
202struct HeartbeatHistory {
203    /// Number of samples to use for calculation of mean and standard deviation of inter-arrival
204    /// times.
205    max_sample_size: u32,
206
207    intervals: VecDeque<i64>,
208    interval_sum: i64,
209    squared_interval_sum: i64,
210}
211
212impl HeartbeatHistory {
213    fn new(max_sample_size: u32) -> Self {
214        Self {
215            max_sample_size,
216            intervals: VecDeque::with_capacity(max_sample_size as usize),
217            interval_sum: 0,
218            squared_interval_sum: 0,
219        }
220    }
221
222    fn mean(&self) -> f64 {
223        self.interval_sum as f64 / self.intervals.len() as f64
224    }
225
226    fn variance(&self) -> f64 {
227        let mean = self.mean();
228        self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
229    }
230
231    fn std_deviation(&self) -> f64 {
232        self.variance().sqrt()
233    }
234
235    fn add(&mut self, interval: i64) {
236        if self.intervals.len() as u32 >= self.max_sample_size {
237            self.drop_oldest();
238        }
239        self.intervals.push_back(interval);
240        self.interval_sum += interval;
241        self.squared_interval_sum += interval * interval;
242    }
243
244    fn drop_oldest(&mut self) {
245        let oldest = self
246            .intervals
247            .pop_front()
248            .expect("intervals must not be empty here");
249        self.interval_sum -= oldest;
250        self.squared_interval_sum -= oldest * oldest;
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use common_time::util::current_time_millis;
257
258    use super::*;
259
260    #[test]
261    fn test_is_available() {
262        let ts_millis = current_time_millis();
263
264        let mut fd = PhiAccrualFailureDetector::default();
265
266        // is available before first heartbeat
267        assert!(fd.is_available(ts_millis));
268
269        fd.heartbeat(ts_millis);
270
271        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
272        // is available when heartbeat
273        assert!(fd.is_available(ts_millis));
274        // is available before heartbeat timeout
275        assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
276        // is not available after heartbeat timeout
277        assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
278    }
279
280    #[test]
281    fn test_last_heartbeat() {
282        let ts_millis = current_time_millis();
283
284        let mut fd = PhiAccrualFailureDetector::default();
285
286        // no heartbeat yet
287        assert!(fd.last_heartbeat_millis.is_none());
288
289        fd.heartbeat(ts_millis);
290        assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
291    }
292
293    #[test]
294    fn test_phi() {
295        let ts_millis = current_time_millis();
296
297        let mut fd = PhiAccrualFailureDetector::default();
298
299        // phi == 0 before first heartbeat
300        assert_eq!(fd.phi(ts_millis), 0.0);
301
302        fd.heartbeat(ts_millis);
303
304        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
305        // phi == 0 when heartbeat
306        assert_eq!(fd.phi(ts_millis), 0.0);
307        // phi < threshold before heartbeat timeout
308        let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
309        assert!(fd.phi(now) < fd.threshold as _);
310        // phi >= threshold after heartbeat timeout
311        let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
312        assert!(fd.phi(now) >= fd.threshold as _);
313    }
314
315    // The following test cases are port from Akka's tests under Apache License 2.0:
316    // [AccrualFailureDetectorSpec.scala](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala).
317
318    #[test]
319    fn test_use_good_enough_cumulative_distribution_function() {
320        fn cdf(phi: f64) -> f64 {
321            1.0 - 10.0_f64.powf(-phi)
322        }
323
324        assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
325        assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
326        assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
327        assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
328        assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
329        assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
330        assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
331
332        for w in (0..40).collect::<Vec<i64>>().windows(2) {
333            assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
334        }
335
336        assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
337    }
338
339    #[test]
340    fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
341        assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
342        assert_eq!(phi(-25, 0.0, 1.0), 0.0);
343    }
344
345    #[test]
346    fn test_return_realistic_phi_values() {
347        let test = vec![
348            (0, 0.0),
349            (500, 0.1),
350            (1000, 0.3),
351            (1200, 1.6),
352            (1400, 4.7),
353            (1600, 10.8),
354            (1700, 15.3),
355        ];
356        for (time_diff, expected_phi) in test {
357            assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
358        }
359
360        // larger std_deviation results => lower phi
361        assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
362    }
363
364    #[test]
365    fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
366        let fd = PhiAccrualFailureDetector {
367            threshold: 8.0,
368            min_std_deviation_millis: 100.0,
369            acceptable_heartbeat_pause_millis: 0,
370            first_heartbeat_estimate_millis: 1000,
371            heartbeat_history: HeartbeatHistory::new(1000),
372            last_heartbeat_millis: None,
373        };
374        assert_eq!(fd.phi(current_time_millis()), 0.0);
375        assert_eq!(fd.phi(current_time_millis()), 0.0);
376    }
377
378    #[test]
379    fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
380        let mut fd = PhiAccrualFailureDetector {
381            threshold: 8.0,
382            min_std_deviation_millis: 100.0,
383            acceptable_heartbeat_pause_millis: 0,
384            first_heartbeat_estimate_millis: 1000,
385            heartbeat_history: HeartbeatHistory::new(1000),
386            last_heartbeat_millis: None,
387        };
388        fd.heartbeat(0);
389        assert!((fd.phi(1000)).abs() - 0.3 < 0.2);
390        assert!((fd.phi(2000)).abs() - 4.5 < 0.3);
391        assert!((fd.phi(3000)).abs() > 15.0);
392    }
393
394    #[test]
395    fn test_return_phi_using_first_interval_after_second_heartbeat() {
396        let mut fd = PhiAccrualFailureDetector {
397            threshold: 8.0,
398            min_std_deviation_millis: 100.0,
399            acceptable_heartbeat_pause_millis: 0,
400            first_heartbeat_estimate_millis: 1000,
401            heartbeat_history: HeartbeatHistory::new(1000),
402            last_heartbeat_millis: None,
403        };
404        fd.heartbeat(0);
405        assert!(fd.phi(100) > 0.0);
406        fd.heartbeat(200);
407        assert!(fd.phi(300) > 0.0);
408    }
409
410    #[test]
411    fn test_is_available_after_a_series_of_successful_heartbeats() {
412        let mut fd = PhiAccrualFailureDetector {
413            threshold: 8.0,
414            min_std_deviation_millis: 100.0,
415            acceptable_heartbeat_pause_millis: 0,
416            first_heartbeat_estimate_millis: 1000,
417            heartbeat_history: HeartbeatHistory::new(1000),
418            last_heartbeat_millis: None,
419        };
420        assert!(fd.last_heartbeat_millis.is_none());
421        fd.heartbeat(0);
422        fd.heartbeat(1000);
423        fd.heartbeat(1100);
424        let _ = fd.last_heartbeat_millis.unwrap();
425        assert!(fd.is_available(1200));
426    }
427
428    #[test]
429    fn test_is_not_available_if_heartbeat_are_missed() {
430        let mut fd = PhiAccrualFailureDetector {
431            threshold: 3.0,
432            min_std_deviation_millis: 100.0,
433            acceptable_heartbeat_pause_millis: 0,
434            first_heartbeat_estimate_millis: 1000,
435            heartbeat_history: HeartbeatHistory::new(1000),
436            last_heartbeat_millis: None,
437        };
438        fd.heartbeat(0);
439        fd.heartbeat(1000);
440        fd.heartbeat(1100);
441        assert!(fd.is_available(1200));
442        assert!(!fd.is_available(8200));
443    }
444
445    #[test]
446    fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure(
447    ) {
448        let mut fd = PhiAccrualFailureDetector {
449            threshold: 8.0,
450            min_std_deviation_millis: 100.0,
451            acceptable_heartbeat_pause_millis: 3000,
452            first_heartbeat_estimate_millis: 1000,
453            heartbeat_history: HeartbeatHistory::new(1000),
454            last_heartbeat_millis: None,
455        };
456
457        // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger
458        // unreachable again
459
460        let mut now = 0;
461        for _ in 0..1000 {
462            fd.heartbeat(now);
463            now += 1000;
464        }
465        now += 5 * 60 * 1000;
466        assert!(!fd.is_available(now)); // after the long pause
467        now += 100;
468        fd.heartbeat(now);
469        now += 900;
470        assert!(fd.is_available(now));
471        now += 100;
472        fd.heartbeat(now);
473        now += 7000;
474        assert!(!fd.is_available(now)); // after the 7 seconds pause
475        now += 100;
476        fd.heartbeat(now);
477        now += 900;
478        assert!(fd.is_available(now));
479        now += 100;
480        fd.heartbeat(now);
481        now += 900;
482        assert!(fd.is_available(now));
483    }
484
485    #[test]
486    fn test_accept_some_configured_missing_heartbeats() {
487        let mut fd = PhiAccrualFailureDetector {
488            threshold: 8.0,
489            min_std_deviation_millis: 100.0,
490            acceptable_heartbeat_pause_millis: 3000,
491            first_heartbeat_estimate_millis: 1000,
492            heartbeat_history: HeartbeatHistory::new(1000),
493            last_heartbeat_millis: None,
494        };
495        fd.heartbeat(0);
496        fd.heartbeat(1000);
497        fd.heartbeat(2000);
498        fd.heartbeat(3000);
499        assert!(fd.is_available(7000));
500        fd.heartbeat(8000);
501        assert!(fd.is_available(9000));
502    }
503
504    #[test]
505    fn test_fail_after_configured_acceptable_missing_heartbeats() {
506        let mut fd = PhiAccrualFailureDetector {
507            threshold: 8.0,
508            min_std_deviation_millis: 100.0,
509            acceptable_heartbeat_pause_millis: 3000,
510            first_heartbeat_estimate_millis: 1000,
511            heartbeat_history: HeartbeatHistory::new(1000),
512            last_heartbeat_millis: None,
513        };
514        fd.heartbeat(0);
515        fd.heartbeat(1000);
516        fd.heartbeat(2000);
517        fd.heartbeat(3000);
518        fd.heartbeat(4000);
519        fd.heartbeat(5000);
520        assert!(fd.is_available(5500));
521        fd.heartbeat(6000);
522        assert!(!fd.is_available(11000));
523    }
524
525    #[test]
526    fn test_use_max_sample_size_heartbeats() {
527        let mut fd = PhiAccrualFailureDetector {
528            threshold: 8.0,
529            min_std_deviation_millis: 100.0,
530            acceptable_heartbeat_pause_millis: 0,
531            first_heartbeat_estimate_millis: 1000,
532            heartbeat_history: HeartbeatHistory::new(3),
533            last_heartbeat_millis: None,
534        };
535        // 100 ms interval
536        fd.heartbeat(0);
537        fd.heartbeat(100);
538        fd.heartbeat(200);
539        fd.heartbeat(300);
540        let phi1 = fd.phi(400);
541        // 500 ms interval, should become same phi when 100 ms intervals have been dropped
542        fd.heartbeat(1000);
543        fd.heartbeat(1500);
544        fd.heartbeat(2000);
545        fd.heartbeat(2500);
546        let phi2 = fd.phi(3000);
547        assert_eq!(phi1, phi2);
548    }
549
550    #[test]
551    fn test_heartbeat_history_calculate_correct_mean_and_variance() {
552        let mut history = HeartbeatHistory::new(20);
553        for i in [100, 200, 125, 340, 130] {
554            history.add(i);
555        }
556        assert!((history.mean() - 179.0).abs() < 0.00001);
557        assert!((history.variance() - 7584.0).abs() < 0.00001);
558    }
559
560    #[test]
561    fn test_heartbeat_history_have_0_variance_for_one_sample() {
562        let mut history = HeartbeatHistory::new(600);
563        history.add(1000);
564        assert!((history.variance() - 0.0).abs() < 0.00001);
565    }
566
567    #[test]
568    fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
569        let mut history = HeartbeatHistory::new(3);
570        history.add(100);
571        history.add(110);
572        history.add(90);
573        assert!((history.mean() - 100.0).abs() < 0.00001);
574        assert!((history.variance() - 66.6666667).abs() < 0.00001);
575        history.add(140);
576        assert!((history.mean() - 113.333333).abs() < 0.00001);
577        assert!((history.variance() - 422.222222).abs() < 0.00001);
578        history.add(80);
579        assert!((history.mean() - 103.333333).abs() < 0.00001);
580        assert!((history.variance() - 688.88888889).abs() < 0.00001);
581    }
582}