1use std::collections::VecDeque;
16use std::time::Duration;
17
18use common_meta::distributed_time_constants;
19use serde::{Deserialize, Serialize};
20
21const FIRST_HEARTBEAT_ESTIMATE_MILLIS: i64 = 1000;
22
23#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
44pub(crate) struct PhiAccrualFailureDetector {
45 threshold: f32,
49
50 min_std_deviation_millis: f32,
54
55 acceptable_heartbeat_pause_millis: u32,
60
61 heartbeat_history: HeartbeatHistory,
62 last_heartbeat_millis: Option<i64>,
63}
64
65#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
66#[serde(default)]
67pub struct PhiAccrualFailureDetectorOptions {
68 pub threshold: f32,
69 #[serde(with = "humantime_serde")]
70 pub min_std_deviation: Duration,
71 #[serde(with = "humantime_serde")]
72 pub acceptable_heartbeat_pause: Duration,
73}
74
75impl Default for PhiAccrualFailureDetectorOptions {
76 fn default() -> Self {
77 Self {
80 threshold: 8_f32,
81 min_std_deviation: Duration::from_millis(100),
82 acceptable_heartbeat_pause: Duration::from_secs(
83 distributed_time_constants::DATANODE_LEASE_SECS,
84 ),
85 }
86 }
87}
88
89impl Default for PhiAccrualFailureDetector {
90 fn default() -> Self {
91 Self::from_options(Default::default())
92 }
93}
94
95impl PhiAccrualFailureDetector {
96 pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
97 Self {
98 threshold: options.threshold,
99 min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
100 acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
101 as u32,
102 heartbeat_history: HeartbeatHistory::new(1000),
103 last_heartbeat_millis: None,
104 }
105 }
106
107 pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
108 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
109 if ts_millis < last_heartbeat_millis {
110 return;
111 }
112
113 if self.is_available(ts_millis) {
114 let interval = ts_millis - last_heartbeat_millis;
115 self.heartbeat_history.add(interval)
116 }
117 } else {
118 let std_deviation = FIRST_HEARTBEAT_ESTIMATE_MILLIS / 4;
122 self.heartbeat_history
123 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS - std_deviation) as _);
124 self.heartbeat_history
125 .add((FIRST_HEARTBEAT_ESTIMATE_MILLIS + std_deviation) as _);
126 }
127 let _ = self.last_heartbeat_millis.insert(ts_millis);
128 }
129
130 pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
131 self.phi(ts_millis) < self.threshold as _
132 }
133
134 pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
138 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
139 let time_diff = ts_millis - last_heartbeat_millis;
140 let mean = self.heartbeat_history.mean();
141 let std_deviation = self
142 .heartbeat_history
143 .std_deviation()
144 .max(self.min_std_deviation_millis as _);
145
146 phi(
147 time_diff,
148 mean + self.acceptable_heartbeat_pause_millis as f64,
149 std_deviation,
150 )
151 } else {
152 0.0
154 }
155 }
156
157 #[cfg(test)]
158 pub(crate) fn threshold(&self) -> f32 {
159 self.threshold
160 }
161
162 #[cfg(test)]
163 pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
164 self.acceptable_heartbeat_pause_millis
165 }
166}
167
168fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
179 assert_ne!(std_deviation, 0.0);
180
181 let time_diff = time_diff as f64;
182 let y = (time_diff - mean) / std_deviation;
183 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
184 if time_diff > mean {
185 -(e / (1.0 + e)).log10()
186 } else {
187 -(1.0 - 1.0 / (1.0 + e)).log10()
188 }
189}
190
191#[derive(Debug, Clone, PartialEq)]
196struct HeartbeatHistory {
197 max_sample_size: u32,
200
201 intervals: VecDeque<i64>,
202 interval_sum: i64,
203 squared_interval_sum: i64,
204}
205
206impl HeartbeatHistory {
207 fn new(max_sample_size: u32) -> Self {
208 Self {
209 max_sample_size,
210 intervals: VecDeque::with_capacity(max_sample_size as usize),
211 interval_sum: 0,
212 squared_interval_sum: 0,
213 }
214 }
215
216 fn mean(&self) -> f64 {
217 self.interval_sum as f64 / self.intervals.len() as f64
218 }
219
220 fn variance(&self) -> f64 {
221 let mean = self.mean();
222 self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
223 }
224
225 fn std_deviation(&self) -> f64 {
226 self.variance().sqrt()
227 }
228
229 fn add(&mut self, interval: i64) {
230 if self.intervals.len() as u32 >= self.max_sample_size {
231 self.drop_oldest();
232 }
233 self.intervals.push_back(interval);
234 self.interval_sum += interval;
235 self.squared_interval_sum += interval * interval;
236 }
237
238 fn drop_oldest(&mut self) {
239 let oldest = self
240 .intervals
241 .pop_front()
242 .expect("intervals must not be empty here");
243 self.interval_sum -= oldest;
244 self.squared_interval_sum -= oldest * oldest;
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use common_time::util::current_time_millis;
251
252 use super::*;
253
254 #[test]
255 fn test_is_available() {
256 let ts_millis = current_time_millis();
257
258 let mut fd = PhiAccrualFailureDetector::default();
259
260 assert!(fd.is_available(ts_millis));
262
263 fd.heartbeat(ts_millis);
264
265 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
266 assert!(fd.is_available(ts_millis));
268 assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
270 assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
272 }
273
274 #[test]
275 fn test_last_heartbeat() {
276 let ts_millis = current_time_millis();
277
278 let mut fd = PhiAccrualFailureDetector::default();
279
280 assert!(fd.last_heartbeat_millis.is_none());
282
283 fd.heartbeat(ts_millis);
284 assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
285 }
286
287 #[test]
288 fn test_phi() {
289 let ts_millis = current_time_millis();
290
291 let mut fd = PhiAccrualFailureDetector::default();
292
293 assert_eq!(fd.phi(ts_millis), 0.0);
295
296 fd.heartbeat(ts_millis);
297
298 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
299 assert_eq!(fd.phi(ts_millis), 0.0);
301 let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
303 assert!(fd.phi(now) < fd.threshold as _);
304 let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
306 assert!(fd.phi(now) >= fd.threshold as _);
307 }
308
309 #[test]
313 fn test_use_good_enough_cumulative_distribution_function() {
314 fn cdf(phi: f64) -> f64 {
315 1.0 - 10.0_f64.powf(-phi)
316 }
317
318 assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
319 assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
320 assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
321 assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
322 assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
323 assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
324 assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
325
326 for w in (0..40).collect::<Vec<i64>>().windows(2) {
327 assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
328 }
329
330 assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
331 }
332
333 #[test]
334 fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
335 assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
336 assert_eq!(phi(-25, 0.0, 1.0), 0.0);
337 }
338
339 #[test]
340 fn test_return_realistic_phi_values() {
341 let test = vec![
342 (0, 0.0),
343 (500, 0.1),
344 (1000, 0.3),
345 (1200, 1.6),
346 (1400, 4.7),
347 (1600, 10.8),
348 (1700, 15.3),
349 ];
350 for (time_diff, expected_phi) in test {
351 assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
352 }
353
354 assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
356 }
357
358 #[test]
359 fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
360 let fd = PhiAccrualFailureDetector {
361 threshold: 8.0,
362 min_std_deviation_millis: 100.0,
363 acceptable_heartbeat_pause_millis: 0,
364 heartbeat_history: HeartbeatHistory::new(1000),
365 last_heartbeat_millis: None,
366 };
367 assert_eq!(fd.phi(current_time_millis()), 0.0);
368 assert_eq!(fd.phi(current_time_millis()), 0.0);
369 }
370
371 #[test]
372 fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
373 let mut fd = PhiAccrualFailureDetector {
374 threshold: 8.0,
375 min_std_deviation_millis: 100.0,
376 acceptable_heartbeat_pause_millis: 0,
377 heartbeat_history: HeartbeatHistory::new(1000),
378 last_heartbeat_millis: None,
379 };
380 fd.heartbeat(0);
381 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS)).abs() - 0.3 < 0.2);
382 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 2)).abs() - 4.5 < 0.3);
383 assert!((fd.phi(FIRST_HEARTBEAT_ESTIMATE_MILLIS * 3)).abs() > 15.0);
384 }
385
386 #[test]
387 fn test_return_phi_using_first_interval_after_second_heartbeat() {
388 let mut fd = PhiAccrualFailureDetector {
389 threshold: 8.0,
390 min_std_deviation_millis: 100.0,
391 acceptable_heartbeat_pause_millis: 0,
392 heartbeat_history: HeartbeatHistory::new(1000),
393 last_heartbeat_millis: None,
394 };
395 fd.heartbeat(0);
396 assert!(fd.phi(100) > 0.0);
397 fd.heartbeat(200);
398 assert!(fd.phi(300) > 0.0);
399 }
400
401 #[test]
402 fn test_is_available_after_a_series_of_successful_heartbeats() {
403 let mut fd = PhiAccrualFailureDetector {
404 threshold: 8.0,
405 min_std_deviation_millis: 100.0,
406 acceptable_heartbeat_pause_millis: 0,
407 heartbeat_history: HeartbeatHistory::new(1000),
408 last_heartbeat_millis: None,
409 };
410 assert!(fd.last_heartbeat_millis.is_none());
411 fd.heartbeat(0);
412 fd.heartbeat(1000);
413 fd.heartbeat(1100);
414 let _ = fd.last_heartbeat_millis.unwrap();
415 assert!(fd.is_available(1200));
416 }
417
418 #[test]
419 fn test_is_not_available_if_heartbeat_are_missed() {
420 let mut fd = PhiAccrualFailureDetector {
421 threshold: 3.0,
422 min_std_deviation_millis: 100.0,
423 acceptable_heartbeat_pause_millis: 0,
424 heartbeat_history: HeartbeatHistory::new(1000),
425 last_heartbeat_millis: None,
426 };
427 fd.heartbeat(0);
428 fd.heartbeat(1000);
429 fd.heartbeat(1100);
430 assert!(fd.is_available(1200));
431 assert!(!fd.is_available(8200));
432 }
433
434 #[test]
435 fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure()
436 {
437 let mut fd = PhiAccrualFailureDetector {
438 threshold: 8.0,
439 min_std_deviation_millis: 100.0,
440 acceptable_heartbeat_pause_millis: 3000,
441 heartbeat_history: HeartbeatHistory::new(1000),
442 last_heartbeat_millis: None,
443 };
444
445 let mut now = 0;
449 for _ in 0..1000 {
450 fd.heartbeat(now);
451 now += 1000;
452 }
453 now += 5 * 60 * 1000;
454 assert!(!fd.is_available(now)); now += 100;
456 fd.heartbeat(now);
457 now += 900;
458 assert!(fd.is_available(now));
459 now += 100;
460 fd.heartbeat(now);
461 now += 7000;
462 assert!(!fd.is_available(now)); now += 100;
464 fd.heartbeat(now);
465 now += 900;
466 assert!(fd.is_available(now));
467 now += 100;
468 fd.heartbeat(now);
469 now += 900;
470 assert!(fd.is_available(now));
471 }
472
473 #[test]
474 fn test_accept_some_configured_missing_heartbeats() {
475 let mut fd = PhiAccrualFailureDetector {
476 threshold: 8.0,
477 min_std_deviation_millis: 100.0,
478 acceptable_heartbeat_pause_millis: 3000,
479 heartbeat_history: HeartbeatHistory::new(1000),
480 last_heartbeat_millis: None,
481 };
482 fd.heartbeat(0);
483 fd.heartbeat(1000);
484 fd.heartbeat(2000);
485 fd.heartbeat(3000);
486 assert!(fd.is_available(7000));
487 fd.heartbeat(8000);
488 assert!(fd.is_available(9000));
489 }
490
491 #[test]
492 fn test_fail_after_configured_acceptable_missing_heartbeats() {
493 let mut fd = PhiAccrualFailureDetector {
494 threshold: 8.0,
495 min_std_deviation_millis: 100.0,
496 acceptable_heartbeat_pause_millis: 3000,
497 heartbeat_history: HeartbeatHistory::new(1000),
498 last_heartbeat_millis: None,
499 };
500 fd.heartbeat(0);
501 fd.heartbeat(1000);
502 fd.heartbeat(2000);
503 fd.heartbeat(3000);
504 fd.heartbeat(4000);
505 fd.heartbeat(5000);
506 assert!(fd.is_available(5500));
507 fd.heartbeat(6000);
508 assert!(!fd.is_available(11000));
509 }
510
511 #[test]
512 fn test_use_max_sample_size_heartbeats() {
513 let mut fd = PhiAccrualFailureDetector {
514 threshold: 8.0,
515 min_std_deviation_millis: 100.0,
516 acceptable_heartbeat_pause_millis: 0,
517 heartbeat_history: HeartbeatHistory::new(3),
518 last_heartbeat_millis: None,
519 };
520 fd.heartbeat(0);
522 fd.heartbeat(100);
523 fd.heartbeat(200);
524 fd.heartbeat(300);
525 let phi1 = fd.phi(400);
526 fd.heartbeat(1000);
528 fd.heartbeat(1500);
529 fd.heartbeat(2000);
530 fd.heartbeat(2500);
531 let phi2 = fd.phi(3000);
532 assert_eq!(phi1, phi2);
533 }
534
535 #[test]
536 fn test_heartbeat_history_calculate_correct_mean_and_variance() {
537 let mut history = HeartbeatHistory::new(20);
538 for i in [100, 200, 125, 340, 130] {
539 history.add(i);
540 }
541 assert!((history.mean() - 179.0).abs() < 0.00001);
542 assert!((history.variance() - 7584.0).abs() < 0.00001);
543 }
544
545 #[test]
546 fn test_heartbeat_history_have_0_variance_for_one_sample() {
547 let mut history = HeartbeatHistory::new(600);
548 history.add(1000);
549 assert!((history.variance() - 0.0).abs() < 0.00001);
550 }
551
552 #[test]
553 fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
554 let mut history = HeartbeatHistory::new(3);
555 history.add(100);
556 history.add(110);
557 history.add(90);
558 assert!((history.mean() - 100.0).abs() < 0.00001);
559 assert!((history.variance() - 66.6666667).abs() < 0.00001);
560 history.add(140);
561 assert!((history.mean() - 113.333333).abs() < 0.00001);
562 assert!((history.variance() - 422.222222).abs() < 0.00001);
563 history.add(80);
564 assert!((history.mean() - 103.333333).abs() < 0.00001);
565 assert!((history.variance() - 688.88888889).abs() < 0.00001);
566 }
567}