1use std::collections::VecDeque;
16use std::time::Duration;
17
18use serde::{Deserialize, Serialize};
19
20const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
21
22#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
43pub(crate) struct PhiAccrualFailureDetector {
44 threshold: f32,
48
49 min_std_deviation_millis: f32,
53
54 acceptable_heartbeat_pause_millis: u32,
59
60 heartbeat_history: HeartbeatHistory,
61 last_heartbeat_millis: Option<i64>,
62}
63
64#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
65#[serde(default)]
66pub struct PhiAccrualFailureDetectorOptions {
67 pub threshold: f32,
68 #[serde(with = "humantime_serde")]
69 pub min_std_deviation: Duration,
70 #[serde(with = "humantime_serde")]
71 pub acceptable_heartbeat_pause: Duration,
72}
73
74impl Default for PhiAccrualFailureDetectorOptions {
75 fn default() -> Self {
76 Self {
79 threshold: 8_f32,
80 min_std_deviation: Duration::from_millis(100),
81 acceptable_heartbeat_pause: Duration::from_secs(10),
82 }
83 }
84}
85
86impl Default for PhiAccrualFailureDetector {
87 fn default() -> Self {
88 Self::from_options(Default::default())
89 }
90}
91
92impl PhiAccrualFailureDetector {
93 pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
94 Self {
95 threshold: options.threshold,
96 min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
97 acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
98 as u32,
99 heartbeat_history: HeartbeatHistory::new(1000),
100 last_heartbeat_millis: None,
101 }
102 }
103
104 pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
105 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
106 if ts_millis < last_heartbeat_millis {
107 return;
108 }
109
110 if self.is_available(ts_millis) {
111 let interval = ts_millis - last_heartbeat_millis;
112 self.heartbeat_history.add(interval)
113 }
114 } else {
115 let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
119 self.heartbeat_history
120 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
121 self.heartbeat_history
122 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
123 }
124 let _ = self.last_heartbeat_millis.insert(ts_millis);
125 }
126
127 pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
128 self.phi(ts_millis) < self.threshold as _
129 }
130
131 pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
135 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
136 let time_diff = ts_millis - last_heartbeat_millis;
137 let mean = self.heartbeat_history.mean();
138 let std_deviation = self
139 .heartbeat_history
140 .std_deviation()
141 .max(self.min_std_deviation_millis as _);
142
143 phi(
144 time_diff,
145 mean + self.acceptable_heartbeat_pause_millis as f64,
146 std_deviation,
147 )
148 } else {
149 0.0
151 }
152 }
153
154 #[cfg(test)]
155 pub(crate) fn threshold(&self) -> f32 {
156 self.threshold
157 }
158
159 #[cfg(test)]
160 pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
161 self.acceptable_heartbeat_pause_millis
162 }
163}
164
165fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
176 assert_ne!(std_deviation, 0.0);
177
178 let time_diff = time_diff as f64;
179 let y = (time_diff - mean) / std_deviation;
180 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
181 if time_diff > mean {
182 -(e / (1.0 + e)).log10()
183 } else {
184 -(1.0 - 1.0 / (1.0 + e)).log10()
185 }
186}
187
188#[derive(Debug, Clone, PartialEq)]
193struct HeartbeatHistory {
194 max_sample_size: u32,
197
198 intervals: VecDeque<i64>,
199 interval_sum: i64,
200 squared_interval_sum: i64,
201}
202
203impl HeartbeatHistory {
204 fn new(max_sample_size: u32) -> Self {
205 Self {
206 max_sample_size,
207 intervals: VecDeque::with_capacity(max_sample_size as usize),
208 interval_sum: 0,
209 squared_interval_sum: 0,
210 }
211 }
212
213 fn mean(&self) -> f64 {
214 self.interval_sum as f64 / self.intervals.len() as f64
215 }
216
217 fn variance(&self) -> f64 {
218 let mean = self.mean();
219 self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
220 }
221
222 fn std_deviation(&self) -> f64 {
223 self.variance().sqrt()
224 }
225
226 fn add(&mut self, interval: i64) {
227 if self.intervals.len() as u32 >= self.max_sample_size {
228 self.drop_oldest();
229 }
230 self.intervals.push_back(interval);
231 self.interval_sum += interval;
232 self.squared_interval_sum += interval * interval;
233 }
234
235 fn drop_oldest(&mut self) {
236 let oldest = self
237 .intervals
238 .pop_front()
239 .expect("intervals must not be empty here");
240 self.interval_sum -= oldest;
241 self.squared_interval_sum -= oldest * oldest;
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use common_time::util::current_time_millis;
248
249 use super::*;
250
251 #[test]
252 fn test_is_available() {
253 let ts_millis = current_time_millis();
254
255 let mut fd = PhiAccrualFailureDetector::default();
256
257 assert!(fd.is_available(ts_millis));
259
260 fd.heartbeat(ts_millis);
261
262 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
263 assert!(fd.is_available(ts_millis));
265 assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
267 assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
269 }
270
271 #[test]
272 fn test_last_heartbeat() {
273 let ts_millis = current_time_millis();
274
275 let mut fd = PhiAccrualFailureDetector::default();
276
277 assert!(fd.last_heartbeat_millis.is_none());
279
280 fd.heartbeat(ts_millis);
281 assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
282 }
283
284 #[test]
285 fn test_phi() {
286 let ts_millis = current_time_millis();
287
288 let mut fd = PhiAccrualFailureDetector::default();
289
290 assert_eq!(fd.phi(ts_millis), 0.0);
292
293 fd.heartbeat(ts_millis);
294
295 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
296 assert_eq!(fd.phi(ts_millis), 0.0);
298 let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
300 assert!(fd.phi(now) < fd.threshold as _);
301 let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
303 assert!(fd.phi(now) >= fd.threshold as _);
304 }
305
306 #[test]
310 fn test_use_good_enough_cumulative_distribution_function() {
311 fn cdf(phi: f64) -> f64 {
312 1.0 - 10.0_f64.powf(-phi)
313 }
314
315 assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
316 assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
317 assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
318 assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
319 assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
320 assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
321 assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
322
323 for w in (0..40).collect::<Vec<i64>>().windows(2) {
324 assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
325 }
326
327 assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
328 }
329
330 #[test]
331 fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
332 assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
333 assert_eq!(phi(-25, 0.0, 1.0), 0.0);
334 }
335
336 #[test]
337 fn test_return_realistic_phi_values() {
338 let test = vec![
339 (0, 0.0),
340 (500, 0.1),
341 (1000, 0.3),
342 (1200, 1.6),
343 (1400, 4.7),
344 (1600, 10.8),
345 (1700, 15.3),
346 ];
347 for (time_diff, expected_phi) in test {
348 assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
349 }
350
351 assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
353 }
354
355 #[test]
356 fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
357 let fd = PhiAccrualFailureDetector {
358 threshold: 8.0,
359 min_std_deviation_millis: 100.0,
360 acceptable_heartbeat_pause_millis: 0,
361 heartbeat_history: HeartbeatHistory::new(1000),
362 last_heartbeat_millis: None,
363 };
364 assert_eq!(fd.phi(current_time_millis()), 0.0);
365 assert_eq!(fd.phi(current_time_millis()), 0.0);
366 }
367
368 #[test]
369 fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
370 let mut fd = PhiAccrualFailureDetector {
371 threshold: 8.0,
372 min_std_deviation_millis: 100.0,
373 acceptable_heartbeat_pause_millis: 0,
374 heartbeat_history: HeartbeatHistory::new(1000),
375 last_heartbeat_millis: None,
376 };
377 fd.heartbeat(0);
378 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
379 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
380 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
381 }
382
383 #[test]
384 fn test_return_phi_using_first_interval_after_second_heartbeat() {
385 let mut fd = PhiAccrualFailureDetector {
386 threshold: 8.0,
387 min_std_deviation_millis: 100.0,
388 acceptable_heartbeat_pause_millis: 0,
389 heartbeat_history: HeartbeatHistory::new(1000),
390 last_heartbeat_millis: None,
391 };
392 fd.heartbeat(0);
393 assert!(fd.phi(100) > 0.0);
394 fd.heartbeat(200);
395 assert!(fd.phi(300) > 0.0);
396 }
397
398 #[test]
399 fn test_is_available_after_a_series_of_successful_heartbeats() {
400 let mut fd = PhiAccrualFailureDetector {
401 threshold: 8.0,
402 min_std_deviation_millis: 100.0,
403 acceptable_heartbeat_pause_millis: 0,
404 heartbeat_history: HeartbeatHistory::new(1000),
405 last_heartbeat_millis: None,
406 };
407 assert!(fd.last_heartbeat_millis.is_none());
408 fd.heartbeat(0);
409 fd.heartbeat(1000);
410 fd.heartbeat(1100);
411 let _ = fd.last_heartbeat_millis.unwrap();
412 assert!(fd.is_available(1200));
413 }
414
415 #[test]
416 fn test_is_not_available_if_heartbeat_are_missed() {
417 let mut fd = PhiAccrualFailureDetector {
418 threshold: 3.0,
419 min_std_deviation_millis: 100.0,
420 acceptable_heartbeat_pause_millis: 0,
421 heartbeat_history: HeartbeatHistory::new(1000),
422 last_heartbeat_millis: None,
423 };
424 fd.heartbeat(0);
425 fd.heartbeat(1000);
426 fd.heartbeat(1100);
427 assert!(fd.is_available(1200));
428 assert!(!fd.is_available(8200));
429 }
430
431 #[test]
432 fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
433 {
434 let mut fd = PhiAccrualFailureDetector {
435 threshold: 8.0,
436 min_std_deviation_millis: 100.0,
437 acceptable_heartbeat_pause_millis: 3000,
438 heartbeat_history: HeartbeatHistory::new(1000),
439 last_heartbeat_millis: None,
440 };
441
442 let mut now = 0;
446 for _ in 0..1000 {
447 fd.heartbeat(now);
448 now += 1000;
449 }
450 now += 5 * 60 * 1000;
451 assert!(!fd.is_available(now)); now += 100;
453 fd.heartbeat(now);
454 now += 900;
455 assert!(fd.is_available(now));
456 now += 100;
457 fd.heartbeat(now);
458 now += 7000;
459 assert!(!fd.is_available(now)); now += 100;
461 fd.heartbeat(now);
462 now += 900;
463 assert!(fd.is_available(now));
464 now += 100;
465 fd.heartbeat(now);
466 now += 900;
467 assert!(fd.is_available(now));
468 }
469
470 #[test]
471 fn test_accept_some_configured_missing_heartbeats() {
472 let mut fd = PhiAccrualFailureDetector {
473 threshold: 8.0,
474 min_std_deviation_millis: 100.0,
475 acceptable_heartbeat_pause_millis: 3000,
476 heartbeat_history: HeartbeatHistory::new(1000),
477 last_heartbeat_millis: None,
478 };
479 fd.heartbeat(0);
480 fd.heartbeat(1000);
481 fd.heartbeat(2000);
482 fd.heartbeat(3000);
483 assert!(fd.is_available(7000));
484 fd.heartbeat(8000);
485 assert!(fd.is_available(9000));
486 }
487
488 #[test]
489 fn test_fail_after_configured_acceptable_missing_heartbeats() {
490 let mut fd = PhiAccrualFailureDetector {
491 threshold: 8.0,
492 min_std_deviation_millis: 100.0,
493 acceptable_heartbeat_pause_millis: 3000,
494 heartbeat_history: HeartbeatHistory::new(1000),
495 last_heartbeat_millis: None,
496 };
497 fd.heartbeat(0);
498 fd.heartbeat(1000);
499 fd.heartbeat(2000);
500 fd.heartbeat(3000);
501 fd.heartbeat(4000);
502 fd.heartbeat(5000);
503 assert!(fd.is_available(5500));
504 fd.heartbeat(6000);
505 assert!(!fd.is_available(11000));
506 }
507
508 #[test]
509 fn test_use_max_sample_size_heartbeats() {
510 let mut fd = PhiAccrualFailureDetector {
511 threshold: 8.0,
512 min_std_deviation_millis: 100.0,
513 acceptable_heartbeat_pause_millis: 0,
514 heartbeat_history: HeartbeatHistory::new(3),
515 last_heartbeat_millis: None,
516 };
517 fd.heartbeat(0);
519 fd.heartbeat(100);
520 fd.heartbeat(200);
521 fd.heartbeat(300);
522 let phi1 = fd.phi(400);
523 fd.heartbeat(1000);
525 fd.heartbeat(1500);
526 fd.heartbeat(2000);
527 fd.heartbeat(2500);
528 let phi2 = fd.phi(3000);
529 assert_eq!(phi1, phi2);
530 }
531
532 #[test]
533 fn test_heartbeat_history_calculate_correct_mean_and_variance() {
534 let mut history = HeartbeatHistory::new(20);
535 for i in [100, 200, 125, 340, 130] {
536 history.add(i);
537 }
538 assert!((history.mean() - 179.0).abs() < 0.00001);
539 assert!((history.variance() - 7584.0).abs() < 0.00001);
540 }
541
542 #[test]
543 fn test_heartbeat_history_have_0_variance_for_one_sample() {
544 let mut history = HeartbeatHistory::new(600);
545 history.add(1000);
546 assert!((history.variance() - 0.0).abs() < 0.00001);
547 }
548
549 #[test]
550 fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
551 let mut history = HeartbeatHistory::new(3);
552 history.add(100);
553 history.add(110);
554 history.add(90);
555 assert!((history.mean() - 100.0).abs() < 0.00001);
556 assert!((history.variance() - 66.6666667).abs() < 0.00001);
557 history.add(140);
558 assert!((history.mean() - 113.333333).abs() < 0.00001);
559 assert!((history.variance() - 422.222222).abs() < 0.00001);
560 history.add(80);
561 assert!((history.mean() - 103.333333).abs() < 0.00001);
562 assert!((history.variance() - 688.88888889).abs() < 0.00001);
563 }
564}