1use std::collections::VecDeque;
16use std::time::Duration;
17
18use serde::{Deserialize, Serialize};
19
20const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
21
22#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
43pub(crate) struct PhiAccrualFailureDetector {
44 threshold: f32,
48
49 min_std_deviation_millis: f32,
53
54 acceptable_heartbeat_pause_millis: u32,
59
60 heartbeat_history: HeartbeatHistory,
61 last_heartbeat_millis: Option<i64>,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct PhiAccrualFailureDetectorOptions {
67 pub threshold: f32,
68 #[serde(with = "humantime_serde")]
69 pub min_std_deviation: Duration,
70 #[serde(with = "humantime_serde")]
71 pub acceptable_heartbeat_pause: Duration,
72}
73
74impl Default for PhiAccrualFailureDetectorOptions {
75 fn default() -> Self {
76 Self {
79 threshold: 8_f32,
80 min_std_deviation: Duration::from_millis(100),
81 acceptable_heartbeat_pause: Duration::from_secs(10),
82 }
83 }
84}
85
86impl Default for PhiAccrualFailureDetector {
87 fn default() -> Self {
88 Self::from_options(Default::default())
89 }
90}
91
92impl PhiAccrualFailureDetector {
93 pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
94 Self {
95 threshold: options.threshold,
96 min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
97 acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
98 as u32,
99 heartbeat_history: HeartbeatHistory::new(1000),
100 last_heartbeat_millis: None,
101 }
102 }
103
104 pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
105 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
106 if ts_millis < last_heartbeat_millis {
107 return;
108 }
109
110 if self.is_available(ts_millis) {
111 let interval = ts_millis - last_heartbeat_millis;
112 self.heartbeat_history.add(interval)
113 }
114 } else {
115 let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
119 self.heartbeat_history
120 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
121 self.heartbeat_history
122 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
123 }
124 let _ = self.last_heartbeat_millis.insert(ts_millis);
125 }
126
127 pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
128 self.phi(ts_millis) < self.threshold as _
129 }
130
131 pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
135 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
136 let time_diff = ts_millis - last_heartbeat_millis;
137 let mean = self.heartbeat_history.mean();
138 let std_deviation = self
139 .heartbeat_history
140 .std_deviation()
141 .max(self.min_std_deviation_millis as _);
142
143 phi(
144 time_diff,
145 mean + self.acceptable_heartbeat_pause_millis as f64,
146 std_deviation,
147 )
148 } else {
149 0.0
151 }
152 }
153
154 #[cfg(test)]
155 pub(crate) fn threshold(&self) -> f32 {
156 self.threshold
157 }
158
159 #[cfg(test)]
160 pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
161 self.acceptable_heartbeat_pause_millis
162 }
163
164 #[cfg(test)]
165 pub(crate) fn last_heartbeat_millis(&self) -> Option<i64> {
166 self.last_heartbeat_millis
167 }
168}
169
170fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
181 assert_ne!(std_deviation, 0.0);
182
183 let time_diff = time_diff as f64;
184 let y = (time_diff - mean) / std_deviation;
185 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
186 if time_diff > mean {
187 -(e / (1.0 + e)).log10()
188 } else {
189 -(1.0 - 1.0 / (1.0 + e)).log10()
190 }
191}
192
193#[derive(Debug, Clone, PartialEq)]
198struct HeartbeatHistory {
199 max_sample_size: u32,
202
203 intervals: VecDeque<i64>,
204 interval_sum: i64,
205 squared_interval_sum: i64,
206}
207
208impl HeartbeatHistory {
209 fn new(max_sample_size: u32) -> Self {
210 Self {
211 max_sample_size,
212 intervals: VecDeque::with_capacity(max_sample_size as usize),
213 interval_sum: 0,
214 squared_interval_sum: 0,
215 }
216 }
217
218 fn mean(&self) -> f64 {
219 self.interval_sum as f64 / self.intervals.len() as f64
220 }
221
222 fn variance(&self) -> f64 {
223 let mean = self.mean();
224 self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
225 }
226
227 fn std_deviation(&self) -> f64 {
228 self.variance().sqrt()
229 }
230
231 fn add(&mut self, interval: i64) {
232 if self.intervals.len() as u32 >= self.max_sample_size {
233 self.drop_oldest();
234 }
235 self.intervals.push_back(interval);
236 self.interval_sum += interval;
237 self.squared_interval_sum += interval * interval;
238 }
239
240 fn drop_oldest(&mut self) {
241 let oldest = self
242 .intervals
243 .pop_front()
244 .expect("intervals must not be empty here");
245 self.interval_sum -= oldest;
246 self.squared_interval_sum -= oldest * oldest;
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use common_time::util::current_time_millis;
253
254 use super::*;
255
256 #[test]
257 fn test_is_available() {
258 let ts_millis = current_time_millis();
259
260 let mut fd = PhiAccrualFailureDetector::default();
261
262 assert!(fd.is_available(ts_millis));
264
265 fd.heartbeat(ts_millis);
266
267 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
268 assert!(fd.is_available(ts_millis));
270 assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
272 assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
274 }
275
276 #[test]
277 fn test_last_heartbeat() {
278 let ts_millis = current_time_millis();
279
280 let mut fd = PhiAccrualFailureDetector::default();
281
282 assert!(fd.last_heartbeat_millis.is_none());
284
285 fd.heartbeat(ts_millis);
286 assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
287 }
288
289 #[test]
290 fn test_phi() {
291 let ts_millis = current_time_millis();
292
293 let mut fd = PhiAccrualFailureDetector::default();
294
295 assert_eq!(fd.phi(ts_millis), 0.0);
297
298 fd.heartbeat(ts_millis);
299
300 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
301 assert_eq!(fd.phi(ts_millis), 0.0);
303 let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
305 assert!(fd.phi(now) < fd.threshold as _);
306 let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
308 assert!(fd.phi(now) >= fd.threshold as _);
309 }
310
311 #[test]
315 fn test_use_good_enough_cumulative_distribution_function() {
316 fn cdf(phi: f64) -> f64 {
317 1.0 - 10.0_f64.powf(-phi)
318 }
319
320 assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
321 assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
322 assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
323 assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
324 assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
325 assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
326 assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
327
328 for w in (0..40).collect::<Vec<i64>>().windows(2) {
329 assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
330 }
331
332 assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
333 }
334
335 #[test]
336 fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
337 assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
338 assert_eq!(phi(-25, 0.0, 1.0), 0.0);
339 }
340
341 #[test]
342 fn test_return_realistic_phi_values() {
343 let test = vec![
344 (0, 0.0),
345 (500, 0.1),
346 (1000, 0.3),
347 (1200, 1.6),
348 (1400, 4.7),
349 (1600, 10.8),
350 (1700, 15.3),
351 ];
352 for (time_diff, expected_phi) in test {
353 assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
354 }
355
356 assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
358 }
359
360 #[test]
361 fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
362 let fd = PhiAccrualFailureDetector {
363 threshold: 8.0,
364 min_std_deviation_millis: 100.0,
365 acceptable_heartbeat_pause_millis: 0,
366 heartbeat_history: HeartbeatHistory::new(1000),
367 last_heartbeat_millis: None,
368 };
369 assert_eq!(fd.phi(current_time_millis()), 0.0);
370 assert_eq!(fd.phi(current_time_millis()), 0.0);
371 }
372
373 #[test]
374 fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
375 let mut fd = PhiAccrualFailureDetector {
376 threshold: 8.0,
377 min_std_deviation_millis: 100.0,
378 acceptable_heartbeat_pause_millis: 0,
379 heartbeat_history: HeartbeatHistory::new(1000),
380 last_heartbeat_millis: None,
381 };
382 fd.heartbeat(0);
383 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
384 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
385 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
386 }
387
388 #[test]
389 fn test_return_phi_using_first_interval_after_second_heartbeat() {
390 let mut fd = PhiAccrualFailureDetector {
391 threshold: 8.0,
392 min_std_deviation_millis: 100.0,
393 acceptable_heartbeat_pause_millis: 0,
394 heartbeat_history: HeartbeatHistory::new(1000),
395 last_heartbeat_millis: None,
396 };
397 fd.heartbeat(0);
398 assert!(fd.phi(100) > 0.0);
399 fd.heartbeat(200);
400 assert!(fd.phi(300) > 0.0);
401 }
402
403 #[test]
404 fn test_is_available_after_a_series_of_successful_heartbeats() {
405 let mut fd = PhiAccrualFailureDetector {
406 threshold: 8.0,
407 min_std_deviation_millis: 100.0,
408 acceptable_heartbeat_pause_millis: 0,
409 heartbeat_history: HeartbeatHistory::new(1000),
410 last_heartbeat_millis: None,
411 };
412 assert!(fd.last_heartbeat_millis.is_none());
413 fd.heartbeat(0);
414 fd.heartbeat(1000);
415 fd.heartbeat(1100);
416 let _ = fd.last_heartbeat_millis.unwrap();
417 assert!(fd.is_available(1200));
418 }
419
420 #[test]
421 fn test_is_not_available_if_heartbeat_are_missed() {
422 let mut fd = PhiAccrualFailureDetector {
423 threshold: 3.0,
424 min_std_deviation_millis: 100.0,
425 acceptable_heartbeat_pause_millis: 0,
426 heartbeat_history: HeartbeatHistory::new(1000),
427 last_heartbeat_millis: None,
428 };
429 fd.heartbeat(0);
430 fd.heartbeat(1000);
431 fd.heartbeat(1100);
432 assert!(fd.is_available(1200));
433 assert!(!fd.is_available(8200));
434 }
435
436 #[test]
437 fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
438 {
439 let mut fd = PhiAccrualFailureDetector {
440 threshold: 8.0,
441 min_std_deviation_millis: 100.0,
442 acceptable_heartbeat_pause_millis: 3000,
443 heartbeat_history: HeartbeatHistory::new(1000),
444 last_heartbeat_millis: None,
445 };
446
447 let mut now = 0;
451 for _ in 0..1000 {
452 fd.heartbeat(now);
453 now += 1000;
454 }
455 now += 5 * 60 * 1000;
456 assert!(!fd.is_available(now)); now += 100;
458 fd.heartbeat(now);
459 now += 900;
460 assert!(fd.is_available(now));
461 now += 100;
462 fd.heartbeat(now);
463 now += 7000;
464 assert!(!fd.is_available(now)); now += 100;
466 fd.heartbeat(now);
467 now += 900;
468 assert!(fd.is_available(now));
469 now += 100;
470 fd.heartbeat(now);
471 now += 900;
472 assert!(fd.is_available(now));
473 }
474
475 #[test]
476 fn test_accept_some_configured_missing_heartbeats() {
477 let mut fd = PhiAccrualFailureDetector {
478 threshold: 8.0,
479 min_std_deviation_millis: 100.0,
480 acceptable_heartbeat_pause_millis: 3000,
481 heartbeat_history: HeartbeatHistory::new(1000),
482 last_heartbeat_millis: None,
483 };
484 fd.heartbeat(0);
485 fd.heartbeat(1000);
486 fd.heartbeat(2000);
487 fd.heartbeat(3000);
488 assert!(fd.is_available(7000));
489 fd.heartbeat(8000);
490 assert!(fd.is_available(9000));
491 }
492
493 #[test]
494 fn test_fail_after_configured_acceptable_missing_heartbeats() {
495 let mut fd = PhiAccrualFailureDetector {
496 threshold: 8.0,
497 min_std_deviation_millis: 100.0,
498 acceptable_heartbeat_pause_millis: 3000,
499 heartbeat_history: HeartbeatHistory::new(1000),
500 last_heartbeat_millis: None,
501 };
502 fd.heartbeat(0);
503 fd.heartbeat(1000);
504 fd.heartbeat(2000);
505 fd.heartbeat(3000);
506 fd.heartbeat(4000);
507 fd.heartbeat(5000);
508 assert!(fd.is_available(5500));
509 fd.heartbeat(6000);
510 assert!(!fd.is_available(11000));
511 }
512
513 #[test]
514 fn test_use_max_sample_size_heartbeats() {
515 let mut fd = PhiAccrualFailureDetector {
516 threshold: 8.0,
517 min_std_deviation_millis: 100.0,
518 acceptable_heartbeat_pause_millis: 0,
519 heartbeat_history: HeartbeatHistory::new(3),
520 last_heartbeat_millis: None,
521 };
522 fd.heartbeat(0);
524 fd.heartbeat(100);
525 fd.heartbeat(200);
526 fd.heartbeat(300);
527 let phi1 = fd.phi(400);
528 fd.heartbeat(1000);
530 fd.heartbeat(1500);
531 fd.heartbeat(2000);
532 fd.heartbeat(2500);
533 let phi2 = fd.phi(3000);
534 assert_eq!(phi1, phi2);
535 }
536
537 #[test]
538 fn test_heartbeat_history_calculate_correct_mean_and_variance() {
539 let mut history = HeartbeatHistory::new(20);
540 for i in [100, 200, 125, 340, 130] {
541 history.add(i);
542 }
543 assert!((history.mean() - 179.0).abs() < 0.00001);
544 assert!((history.variance() - 7584.0).abs() < 0.00001);
545 }
546
547 #[test]
548 fn test_heartbeat_history_have_0_variance_for_one_sample() {
549 let mut history = HeartbeatHistory::new(600);
550 history.add(1000);
551 assert!((history.variance() - 0.0).abs() < 0.00001);
552 }
553
554 #[test]
555 fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
556 let mut history = HeartbeatHistory::new(3);
557 history.add(100);
558 history.add(110);
559 history.add(90);
560 assert!((history.mean() - 100.0).abs() < 0.00001);
561 assert!((history.variance() - 66.6666667).abs() < 0.00001);
562 history.add(140);
563 assert!((history.mean() - 113.333333).abs() < 0.00001);
564 assert!((history.variance() - 422.222222).abs() < 0.00001);
565 history.add(80);
566 assert!((history.mean() - 103.333333).abs() < 0.00001);
567 assert!((history.variance() - 688.88888889).abs() < 0.00001);
568 }
569}