meta_srv/
failure_detector.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::time::Duration;
17
18use common_meta::distributed_time_constants;
19use serde::{Deserialize, Serialize};
20
21const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
22
23/// This is our port of Akka's "[PhiAccrualFailureDetector](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala)"
24/// under Apache License 2.0.
25///
26/// You can find it's document here:
27/// <https://doc.akka.io/docs/akka/2.6.21/typed/failure-detector.html>
28///
29/// Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their
30/// paper: <https://oneofus.la/have-emacs-will-hack/files/HDY04.pdf>
31///
32/// The suspicion level of failure is given by a value called φ (phi).
33/// The basic idea of the φ failure detector is to express the value of φ on a scale that
34/// is dynamically adjusted to reflect current network conditions. A configurable
35/// threshold is used to decide if φ is considered to be a failure.
36///
37/// The value of φ is calculated as:
38///
39/// φ = -log10(1 - F(timeSinceLastHeartbeat)
40///
41/// where F is the cumulative distribution function of a normal distribution with mean
42/// and standard deviation estimated from historical heartbeat inter-arrival times.
43#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
44pub(crate) struct PhiAccrualFailureDetector {
45    /// A low threshold is prone to generate many wrong suspicions but ensures a quick detection
46    /// in the event of a real crash. Conversely, a high threshold generates fewer mistakes but
47    /// needs more time to detect actual crashes.
48    threshold: f32,
49
50    /// Minimum standard deviation to use for the normal distribution used when calculating phi.
51    /// Too low standard deviation might result in too much sensitivity for sudden, but normal,
52    /// deviations in heartbeat inter arrival times.
53    min_std_deviation_millis: f32,
54
55    /// Duration corresponding to number of potentially lost/delayed heartbeats that will be
56    /// accepted before considering it to be an anomaly.
57    /// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
58    /// arrivals, due to for example network drop.
59    acceptable_heartbeat_pause_millis: u32,
60
61    heartbeat_history: HeartbeatHistory,
62    last_heartbeat_millis: Option<i64>,
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
66#[serde(default)]
67pub struct PhiAccrualFailureDetectorOptions {
68    pub threshold: f32,
69    #[serde(with = "humantime_serde")]
70    pub min_std_deviation: Duration,
71    #[serde(with = "humantime_serde")]
72    pub acceptable_heartbeat_pause: Duration,
73}
74
75impl Default for PhiAccrualFailureDetectorOptions {
76    fn default() -> Self {
77        // default configuration is the same as of Akka:
78        // https://github.com/akka/akka/blob/v2.6.21/akka-cluster/src/main/resources/reference.conf#L181
79        Self {
80            threshold: 8_f32,
81            min_std_deviation: Duration::from_millis(100),
82            acceptable_heartbeat_pause: Duration::from_secs(
83                distributed_time_constants::DATANODE_LEASE_SECS,
84            ),
85        }
86    }
87}
88
89impl Default for PhiAccrualFailureDetector {
90    fn default() -> Self {
91        Self::from_options(Default::default())
92    }
93}
94
95impl PhiAccrualFailureDetector {
96    pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
97        Self {
98            threshold: options.threshold,
99            min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
100            acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
101                as u32,
102            heartbeat_history: HeartbeatHistory::new(1000),
103            last_heartbeat_millis: None,
104        }
105    }
106
107    pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
108        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
109            if ts_millis < last_heartbeat_millis {
110                return;
111            }
112
113            if self.is_available(ts_millis) {
114                let interval = ts_millis - last_heartbeat_millis;
115                self.heartbeat_history.add(interval)
116            }
117        } else {
118            // guess statistics for first heartbeat,
119            // important so that connections with only one heartbeat becomes unavailable
120            // bootstrap with 2 entries with rather high standard deviation
121            let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
122            self.heartbeat_history
123                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
124            self.heartbeat_history
125                .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
126        }
127        let _ = self.last_heartbeat_millis.insert(ts_millis);
128    }
129
130    pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
131        self.phi(ts_millis) < self.threshold as _
132    }
133
134    /// The suspicion level of the accrual failure detector.
135    ///
136    /// If a connection does not have any records in failure detector then it is considered healthy.
137    pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
138        if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
139            let time_diff = ts_millis - last_heartbeat_millis;
140            let mean = self.heartbeat_history.mean();
141            let std_deviation = self
142                .heartbeat_history
143                .std_deviation()
144                .max(self.min_std_deviation_millis as _);
145
146            phi(
147                time_diff,
148                mean + self.acceptable_heartbeat_pause_millis as f64,
149                std_deviation,
150            )
151        } else {
152            // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
153            0.0
154        }
155    }
156
157    #[cfg(test)]
158    pub(crate) fn threshold(&self) -> f32 {
159        self.threshold
160    }
161
162    #[cfg(test)]
163    pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
164        self.acceptable_heartbeat_pause_millis
165    }
166}
167
168/// Calculation of phi, derived from the Cumulative distribution function for
169/// N(mean, stdDeviation) normal distribution, given by
170/// 1.0 / (1.0 + math.exp(-y * (1.5976 + 0.070566 * y * y)))
171/// where y = (x - mean) / standard_deviation
172/// This is an approximation defined in β Mathematics Handbook (Logistic approximation).
173/// Error is 0.00014 at +- 3.16
174/// The calculated value is equivalent to -log10(1 - CDF(y))
175///
176/// Usually phi = 1 means likeliness that we will make a mistake is about 10%.
177/// The likeliness is about 1% with phi = 2, 0.1% with phi = 3 and so on.
178fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
179    assert_ne!(std_deviation, 0.0);
180
181    let time_diff = time_diff as f64;
182    let y = (time_diff - mean) / std_deviation;
183    let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
184    if time_diff > mean {
185        -(e / (1.0 + e)).log10()
186    } else {
187        -(1.0 - 1.0 / (1.0 + e)).log10()
188    }
189}
190
191/// Holds the heartbeat statistics.
192/// It is capped by the number of samples specified in `max_sample_size`.
193///
194/// The stats (mean, variance, std_deviation) are not defined for empty HeartbeatHistory.
195#[derive(Debug, Clone, PartialEq)]
196struct HeartbeatHistory {
197    /// Number of samples to use for calculation of mean and standard deviation of inter-arrival
198    /// times.
199    max_sample_size: u32,
200
201    intervals: VecDeque<i64>,
202    interval_sum: i64,
203    squared_interval_sum: i64,
204}
205
206impl HeartbeatHistory {
207    fn new(max_sample_size: u32) -> Self {
208        Self {
209            max_sample_size,
210            intervals: VecDeque::with_capacity(max_sample_size as usize),
211            interval_sum: 0,
212            squared_interval_sum: 0,
213        }
214    }
215
216    fn mean(&self) -> f64 {
217        self.interval_sum as f64 / self.intervals.len() as f64
218    }
219
220    fn variance(&self) -> f64 {
221        let mean = self.mean();
222        self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
223    }
224
225    fn std_deviation(&self) -> f64 {
226        self.variance().sqrt()
227    }
228
229    fn add(&mut self, interval: i64) {
230        if self.intervals.len() as u32 >= self.max_sample_size {
231            self.drop_oldest();
232        }
233        self.intervals.push_back(interval);
234        self.interval_sum += interval;
235        self.squared_interval_sum += interval * interval;
236    }
237
238    fn drop_oldest(&mut self) {
239        let oldest = self
240            .intervals
241            .pop_front()
242            .expect("intervals must not be empty here");
243        self.interval_sum -= oldest;
244        self.squared_interval_sum -= oldest * oldest;
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use common_time::util::current_time_millis;
251
252    use super::*;
253
254    #[test]
255    fn test_is_available() {
256        let ts_millis = current_time_millis();
257
258        let mut fd = PhiAccrualFailureDetector::default();
259
260        // is available before first heartbeat
261        assert!(fd.is_available(ts_millis));
262
263        fd.heartbeat(ts_millis);
264
265        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
266        // is available when heartbeat
267        assert!(fd.is_available(ts_millis));
268        // is available before heartbeat timeout
269        assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
270        // is not available after heartbeat timeout
271        assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
272    }
273
274    #[test]
275    fn test_last_heartbeat() {
276        let ts_millis = current_time_millis();
277
278        let mut fd = PhiAccrualFailureDetector::default();
279
280        // no heartbeat yet
281        assert!(fd.last_heartbeat_millis.is_none());
282
283        fd.heartbeat(ts_millis);
284        assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
285    }
286
287    #[test]
288    fn test_phi() {
289        let ts_millis = current_time_millis();
290
291        let mut fd = PhiAccrualFailureDetector::default();
292
293        // phi == 0 before first heartbeat
294        assert_eq!(fd.phi(ts_millis), 0.0);
295
296        fd.heartbeat(ts_millis);
297
298        let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
299        // phi == 0 when heartbeat
300        assert_eq!(fd.phi(ts_millis), 0.0);
301        // phi < threshold before heartbeat timeout
302        let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
303        assert!(fd.phi(now) < fd.threshold as _);
304        // phi >= threshold after heartbeat timeout
305        let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
306        assert!(fd.phi(now) >= fd.threshold as _);
307    }
308
309    // The following test cases are port from Akka's tests under Apache License 2.0:
310    // [AccrualFailureDetectorSpec.scala](https://github.com/akka/akka/blob/v2.6.21/akka-remote/src/test/scala/akka/remote/AccrualFailureDetectorSpec.scala).
311
312    #[test]
313    fn test_use_good_enough_cumulative_distribution_function() {
314        fn cdf(phi: f64) -> f64 {
315            1.0 - 10.0_f64.powf(-phi)
316        }
317
318        assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
319        assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
320        assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
321        assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
322        assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
323        assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
324        assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
325
326        for w in (0..40).collect::<Vec<i64>>().windows(2) {
327            assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
328        }
329
330        assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
331    }
332
333    #[test]
334    fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
335        assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
336        assert_eq!(phi(-25, 0.0, 1.0), 0.0);
337    }
338
339    #[test]
340    fn test_return_realistic_phi_values() {
341        let test = vec![
342            (0, 0.0),
343            (500, 0.1),
344            (1000, 0.3),
345            (1200, 1.6),
346            (1400, 4.7),
347            (1600, 10.8),
348            (1700, 15.3),
349        ];
350        for (time_diff, expected_phi) in test {
351            assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
352        }
353
354        // larger std_deviation results => lower phi
355        assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
356    }
357
358    #[test]
359    fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
360        let fd = PhiAccrualFailureDetector {
361            threshold: 8.0,
362            min_std_deviation_millis: 100.0,
363            acceptable_heartbeat_pause_millis: 0,
364            heartbeat_history: HeartbeatHistory::new(1000),
365            last_heartbeat_millis: None,
366        };
367        assert_eq!(fd.phi(current_time_millis()), 0.0);
368        assert_eq!(fd.phi(current_time_millis()), 0.0);
369    }
370
371    #[test]
372    fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
373        let mut fd = PhiAccrualFailureDetector {
374            threshold: 8.0,
375            min_std_deviation_millis: 100.0,
376            acceptable_heartbeat_pause_millis: 0,
377            heartbeat_history: HeartbeatHistory::new(1000),
378            last_heartbeat_millis: None,
379        };
380        fd.heartbeat(0);
381        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
382        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
383        assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
384    }
385
386    #[test]
387    fn test_return_phi_using_first_interval_after_second_heartbeat() {
388        let mut fd = PhiAccrualFailureDetector {
389            threshold: 8.0,
390            min_std_deviation_millis: 100.0,
391            acceptable_heartbeat_pause_millis: 0,
392            heartbeat_history: HeartbeatHistory::new(1000),
393            last_heartbeat_millis: None,
394        };
395        fd.heartbeat(0);
396        assert!(fd.phi(100) > 0.0);
397        fd.heartbeat(200);
398        assert!(fd.phi(300) > 0.0);
399    }
400
401    #[test]
402    fn test_is_available_after_a_series_of_successful_heartbeats() {
403        let mut fd = PhiAccrualFailureDetector {
404            threshold: 8.0,
405            min_std_deviation_millis: 100.0,
406            acceptable_heartbeat_pause_millis: 0,
407            heartbeat_history: HeartbeatHistory::new(1000),
408            last_heartbeat_millis: None,
409        };
410        assert!(fd.last_heartbeat_millis.is_none());
411        fd.heartbeat(0);
412        fd.heartbeat(1000);
413        fd.heartbeat(1100);
414        let _ = fd.last_heartbeat_millis.unwrap();
415        assert!(fd.is_available(1200));
416    }
417
418    #[test]
419    fn test_is_not_available_if_heartbeat_are_missed() {
420        let mut fd = PhiAccrualFailureDetector {
421            threshold: 3.0,
422            min_std_deviation_millis: 100.0,
423            acceptable_heartbeat_pause_millis: 0,
424            heartbeat_history: HeartbeatHistory::new(1000),
425            last_heartbeat_millis: None,
426        };
427        fd.heartbeat(0);
428        fd.heartbeat(1000);
429        fd.heartbeat(1100);
430        assert!(fd.is_available(1200));
431        assert!(!fd.is_available(8200));
432    }
433
434    #[test]
435    fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
436     {
437        let mut fd = PhiAccrualFailureDetector {
438            threshold: 8.0,
439            min_std_deviation_millis: 100.0,
440            acceptable_heartbeat_pause_millis: 3000,
441            heartbeat_history: HeartbeatHistory::new(1000),
442            last_heartbeat_millis: None,
443        };
444
445        // 1000 regular intervals, 5 minute pause, and then a short pause again that should trigger
446        // unreachable again
447
448        let mut now = 0;
449        for _ in 0..1000 {
450            fd.heartbeat(now);
451            now += 1000;
452        }
453        now += 5 * 60 * 1000;
454        assert!(!fd.is_available(now)); // after the long pause
455        now += 100;
456        fd.heartbeat(now);
457        now += 900;
458        assert!(fd.is_available(now));
459        now += 100;
460        fd.heartbeat(now);
461        now += 7000;
462        assert!(!fd.is_available(now)); // after the 7 seconds pause
463        now += 100;
464        fd.heartbeat(now);
465        now += 900;
466        assert!(fd.is_available(now));
467        now += 100;
468        fd.heartbeat(now);
469        now += 900;
470        assert!(fd.is_available(now));
471    }
472
473    #[test]
474    fn test_accept_some_configured_missing_heartbeats() {
475        let mut fd = PhiAccrualFailureDetector {
476            threshold: 8.0,
477            min_std_deviation_millis: 100.0,
478            acceptable_heartbeat_pause_millis: 3000,
479            heartbeat_history: HeartbeatHistory::new(1000),
480            last_heartbeat_millis: None,
481        };
482        fd.heartbeat(0);
483        fd.heartbeat(1000);
484        fd.heartbeat(2000);
485        fd.heartbeat(3000);
486        assert!(fd.is_available(7000));
487        fd.heartbeat(8000);
488        assert!(fd.is_available(9000));
489    }
490
491    #[test]
492    fn test_fail_after_configured_acceptable_missing_heartbeats() {
493        let mut fd = PhiAccrualFailureDetector {
494            threshold: 8.0,
495            min_std_deviation_millis: 100.0,
496            acceptable_heartbeat_pause_millis: 3000,
497            heartbeat_history: HeartbeatHistory::new(1000),
498            last_heartbeat_millis: None,
499        };
500        fd.heartbeat(0);
501        fd.heartbeat(1000);
502        fd.heartbeat(2000);
503        fd.heartbeat(3000);
504        fd.heartbeat(4000);
505        fd.heartbeat(5000);
506        assert!(fd.is_available(5500));
507        fd.heartbeat(6000);
508        assert!(!fd.is_available(11000));
509    }
510
511    #[test]
512    fn test_use_max_sample_size_heartbeats() {
513        let mut fd = PhiAccrualFailureDetector {
514            threshold: 8.0,
515            min_std_deviation_millis: 100.0,
516            acceptable_heartbeat_pause_millis: 0,
517            heartbeat_history: HeartbeatHistory::new(3),
518            last_heartbeat_millis: None,
519        };
520        // 100 ms interval
521        fd.heartbeat(0);
522        fd.heartbeat(100);
523        fd.heartbeat(200);
524        fd.heartbeat(300);
525        let phi1 = fd.phi(400);
526        // 500 ms interval, should become same phi when 100 ms intervals have been dropped
527        fd.heartbeat(1000);
528        fd.heartbeat(1500);
529        fd.heartbeat(2000);
530        fd.heartbeat(2500);
531        let phi2 = fd.phi(3000);
532        assert_eq!(phi1, phi2);
533    }
534
535    #[test]
536    fn test_heartbeat_history_calculate_correct_mean_and_variance() {
537        let mut history = HeartbeatHistory::new(20);
538        for i in [100, 200, 125, 340, 130] {
539            history.add(i);
540        }
541        assert!((history.mean() - 179.0).abs() < 0.00001);
542        assert!((history.variance() - 7584.0).abs() < 0.00001);
543    }
544
545    #[test]
546    fn test_heartbeat_history_have_0_variance_for_one_sample() {
547        let mut history = HeartbeatHistory::new(600);
548        history.add(1000);
549        assert!((history.variance() - 0.0).abs() < 0.00001);
550    }
551
552    #[test]
553    fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
554        let mut history = HeartbeatHistory::new(3);
555        history.add(100);
556        history.add(110);
557        history.add(90);
558        assert!((history.mean() - 100.0).abs() < 0.00001);
559        assert!((history.variance() - 66.6666667).abs() < 0.00001);
560        history.add(140);
561        assert!((history.mean() - 113.333333).abs() < 0.00001);
562        assert!((history.variance() - 422.222222).abs() < 0.00001);
563        history.add(80);
564        assert!((history.mean() - 103.333333).abs() < 0.00001);
565        assert!((history.variance() - 688.88888889).abs() < 0.00001);
566    }
567}