1use std::collections::VecDeque;
16use std::time::Duration;
17
18use common_meta::distributed_time_constants;
19use serde::{Deserialize, Serialize};
20
21#[cfg_attr(test, derive(Debug, Clone, PartialEq))]
42pub(crate) struct PhiAccrualFailureDetector {
43 threshold: f32,
47
48 min_std_deviation_millis: f32,
52
53 acceptable_heartbeat_pause_millis: u32,
58
59 first_heartbeat_estimate_millis: u32,
62
63 heartbeat_history: HeartbeatHistory,
64 last_heartbeat_millis: Option<i64>,
65}
66
67#[derive(Clone, Copy, Debug, PartialEq, Serialize, Deserialize)]
68#[serde(default)]
69pub struct PhiAccrualFailureDetectorOptions {
70 pub threshold: f32,
71 #[serde(with = "humantime_serde")]
72 pub min_std_deviation: Duration,
73 #[serde(with = "humantime_serde")]
74 pub acceptable_heartbeat_pause: Duration,
75 #[serde(with = "humantime_serde")]
76 pub first_heartbeat_estimate: Duration,
77}
78
79impl Default for PhiAccrualFailureDetectorOptions {
80 fn default() -> Self {
81 Self {
84 threshold: 8_f32,
85 min_std_deviation: Duration::from_millis(100),
86 acceptable_heartbeat_pause: Duration::from_secs(
87 distributed_time_constants::DATANODE_LEASE_SECS,
88 ),
89 first_heartbeat_estimate: Duration::from_millis(1000),
90 }
91 }
92}
93
94impl Default for PhiAccrualFailureDetector {
95 fn default() -> Self {
96 Self::from_options(Default::default())
97 }
98}
99
100impl PhiAccrualFailureDetector {
101 pub(crate) fn from_options(options: PhiAccrualFailureDetectorOptions) -> Self {
102 Self {
103 threshold: options.threshold,
104 min_std_deviation_millis: options.min_std_deviation.as_millis() as f32,
105 acceptable_heartbeat_pause_millis: options.acceptable_heartbeat_pause.as_millis()
106 as u32,
107 first_heartbeat_estimate_millis: options.first_heartbeat_estimate.as_millis() as u32,
108 heartbeat_history: HeartbeatHistory::new(1000),
109 last_heartbeat_millis: None,
110 }
111 }
112
113 pub(crate) fn heartbeat(&mut self, ts_millis: i64) {
114 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
115 if ts_millis < last_heartbeat_millis {
116 return;
117 }
118
119 if self.is_available(ts_millis) {
120 let interval = ts_millis - last_heartbeat_millis;
121 self.heartbeat_history.add(interval)
122 }
123 } else {
124 let std_deviation = self.first_heartbeat_estimate_millis / 4;
128 self.heartbeat_history
129 .add((self.first_heartbeat_estimate_millis - std_deviation) as _);
130 self.heartbeat_history
131 .add((self.first_heartbeat_estimate_millis + std_deviation) as _);
132 }
133 let _ = self.last_heartbeat_millis.insert(ts_millis);
134 }
135
136 pub(crate) fn is_available(&self, ts_millis: i64) -> bool {
137 self.phi(ts_millis) < self.threshold as _
138 }
139
140 pub(crate) fn phi(&self, ts_millis: i64) -> f64 {
144 if let Some(last_heartbeat_millis) = self.last_heartbeat_millis {
145 let time_diff = ts_millis - last_heartbeat_millis;
146 let mean = self.heartbeat_history.mean();
147 let std_deviation = self
148 .heartbeat_history
149 .std_deviation()
150 .max(self.min_std_deviation_millis as _);
151
152 phi(
153 time_diff,
154 mean + self.acceptable_heartbeat_pause_millis as f64,
155 std_deviation,
156 )
157 } else {
158 0.0
160 }
161 }
162
163 #[cfg(test)]
164 pub(crate) fn threshold(&self) -> f32 {
165 self.threshold
166 }
167
168 #[cfg(test)]
169 pub(crate) fn acceptable_heartbeat_pause_millis(&self) -> u32 {
170 self.acceptable_heartbeat_pause_millis
171 }
172}
173
174fn phi(time_diff: i64, mean: f64, std_deviation: f64) -> f64 {
185 assert_ne!(std_deviation, 0.0);
186
187 let time_diff = time_diff as f64;
188 let y = (time_diff - mean) / std_deviation;
189 let e = (-y * (1.5976 + 0.070566 * y * y)).exp();
190 if time_diff > mean {
191 -(e / (1.0 + e)).log10()
192 } else {
193 -(1.0 - 1.0 / (1.0 + e)).log10()
194 }
195}
196
197#[derive(Debug, Clone, PartialEq)]
202struct HeartbeatHistory {
203 max_sample_size: u32,
206
207 intervals: VecDeque<i64>,
208 interval_sum: i64,
209 squared_interval_sum: i64,
210}
211
212impl HeartbeatHistory {
213 fn new(max_sample_size: u32) -> Self {
214 Self {
215 max_sample_size,
216 intervals: VecDeque::with_capacity(max_sample_size as usize),
217 interval_sum: 0,
218 squared_interval_sum: 0,
219 }
220 }
221
222 fn mean(&self) -> f64 {
223 self.interval_sum as f64 / self.intervals.len() as f64
224 }
225
226 fn variance(&self) -> f64 {
227 let mean = self.mean();
228 self.squared_interval_sum as f64 / self.intervals.len() as f64 - mean * mean
229 }
230
231 fn std_deviation(&self) -> f64 {
232 self.variance().sqrt()
233 }
234
235 fn add(&mut self, interval: i64) {
236 if self.intervals.len() as u32 >= self.max_sample_size {
237 self.drop_oldest();
238 }
239 self.intervals.push_back(interval);
240 self.interval_sum += interval;
241 self.squared_interval_sum += interval * interval;
242 }
243
244 fn drop_oldest(&mut self) {
245 let oldest = self
246 .intervals
247 .pop_front()
248 .expect("intervals must not be empty here");
249 self.interval_sum -= oldest;
250 self.squared_interval_sum -= oldest * oldest;
251 }
252}
253
254#[cfg(test)]
255mod tests {
256 use common_time::util::current_time_millis;
257
258 use super::*;
259
260 #[test]
261 fn test_is_available() {
262 let ts_millis = current_time_millis();
263
264 let mut fd = PhiAccrualFailureDetector::default();
265
266 assert!(fd.is_available(ts_millis));
268
269 fd.heartbeat(ts_millis);
270
271 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
272 assert!(fd.is_available(ts_millis));
274 assert!(fd.is_available(ts_millis + acceptable_heartbeat_pause_millis / 2));
276 assert!(!fd.is_available(ts_millis + acceptable_heartbeat_pause_millis * 2));
278 }
279
280 #[test]
281 fn test_last_heartbeat() {
282 let ts_millis = current_time_millis();
283
284 let mut fd = PhiAccrualFailureDetector::default();
285
286 assert!(fd.last_heartbeat_millis.is_none());
288
289 fd.heartbeat(ts_millis);
290 assert_eq!(fd.last_heartbeat_millis, Some(ts_millis));
291 }
292
293 #[test]
294 fn test_phi() {
295 let ts_millis = current_time_millis();
296
297 let mut fd = PhiAccrualFailureDetector::default();
298
299 assert_eq!(fd.phi(ts_millis), 0.0);
301
302 fd.heartbeat(ts_millis);
303
304 let acceptable_heartbeat_pause_millis = fd.acceptable_heartbeat_pause_millis as i64;
305 assert_eq!(fd.phi(ts_millis), 0.0);
307 let now = ts_millis + acceptable_heartbeat_pause_millis / 2;
309 assert!(fd.phi(now) < fd.threshold as _);
310 let now = ts_millis + acceptable_heartbeat_pause_millis * 2;
312 assert!(fd.phi(now) >= fd.threshold as _);
313 }
314
315 #[test]
319 fn test_use_good_enough_cumulative_distribution_function() {
320 fn cdf(phi: f64) -> f64 {
321 1.0 - 10.0_f64.powf(-phi)
322 }
323
324 assert!((cdf(phi(0, 0.0, 10.0)) - 0.5).abs() < 0.001);
325 assert!((cdf(phi(6, 0.0, 10.0)) - 0.7257).abs() < 0.001);
326 assert!((cdf(phi(15, 0.0, 10.0)) - 0.9332).abs() < 0.001);
327 assert!((cdf(phi(20, 0.0, 10.0)) - 0.97725).abs() < 0.001);
328 assert!((cdf(phi(25, 0.0, 10.0)) - 0.99379).abs() < 0.001);
329 assert!((cdf(phi(35, 0.0, 10.0)) - 0.99977).abs() < 0.001);
330 assert!((cdf(phi(40, 0.0, 10.0)) - 0.99997).abs() < 0.0001);
331
332 for w in (0..40).collect::<Vec<i64>>().windows(2) {
333 assert!(phi(w[0], 0.0, 10.0) < phi(w[1], 0.0, 10.0));
334 }
335
336 assert!((cdf(phi(22, 20.0, 3.0)) - 0.7475).abs() < 0.001);
337 }
338
339 #[test]
340 fn test_handle_outliers_without_losing_precision_or_hitting_exceptions() {
341 assert!((phi(10, 0.0, 1.0) - 38.0).abs() < 1.0);
342 assert_eq!(phi(-25, 0.0, 1.0), 0.0);
343 }
344
345 #[test]
346 fn test_return_realistic_phi_values() {
347 let test = vec![
348 (0, 0.0),
349 (500, 0.1),
350 (1000, 0.3),
351 (1200, 1.6),
352 (1400, 4.7),
353 (1600, 10.8),
354 (1700, 15.3),
355 ];
356 for (time_diff, expected_phi) in test {
357 assert!((phi(time_diff, 1000.0, 100.0) - expected_phi).abs() < 0.1);
358 }
359
360 assert!(phi(1100, 1000.0, 500.0) < phi(1100, 1000.0, 100.0));
362 }
363
364 #[test]
365 fn test_return_phi_of_0_on_startup_when_no_heartbeats() {
366 let fd = PhiAccrualFailureDetector {
367 threshold: 8.0,
368 min_std_deviation_millis: 100.0,
369 acceptable_heartbeat_pause_millis: 0,
370 first_heartbeat_estimate_millis: 1000,
371 heartbeat_history: HeartbeatHistory::new(1000),
372 last_heartbeat_millis: None,
373 };
374 assert_eq!(fd.phi(current_time_millis()), 0.0);
375 assert_eq!(fd.phi(current_time_millis()), 0.0);
376 }
377
378 #[test]
379 fn test_return_phi_based_on_guess_when_only_one_heartbeat() {
380 let mut fd = PhiAccrualFailureDetector {
381 threshold: 8.0,
382 min_std_deviation_millis: 100.0,
383 acceptable_heartbeat_pause_millis: 0,
384 first_heartbeat_estimate_millis: 1000,
385 heartbeat_history: HeartbeatHistory::new(1000),
386 last_heartbeat_millis: None,
387 };
388 fd.heartbeat(0);
389 assert!((fd.phi(1000)).abs() - 0.3 < 0.2);
390 assert!((fd.phi(2000)).abs() - 4.5 < 0.3);
391 assert!((fd.phi(3000)).abs() > 15.0);
392 }
393
394 #[test]
395 fn test_return_phi_using_first_interval_after_second_heartbeat() {
396 let mut fd = PhiAccrualFailureDetector {
397 threshold: 8.0,
398 min_std_deviation_millis: 100.0,
399 acceptable_heartbeat_pause_millis: 0,
400 first_heartbeat_estimate_millis: 1000,
401 heartbeat_history: HeartbeatHistory::new(1000),
402 last_heartbeat_millis: None,
403 };
404 fd.heartbeat(0);
405 assert!(fd.phi(100) > 0.0);
406 fd.heartbeat(200);
407 assert!(fd.phi(300) > 0.0);
408 }
409
410 #[test]
411 fn test_is_available_after_a_series_of_successful_heartbeats() {
412 let mut fd = PhiAccrualFailureDetector {
413 threshold: 8.0,
414 min_std_deviation_millis: 100.0,
415 acceptable_heartbeat_pause_millis: 0,
416 first_heartbeat_estimate_millis: 1000,
417 heartbeat_history: HeartbeatHistory::new(1000),
418 last_heartbeat_millis: None,
419 };
420 assert!(fd.last_heartbeat_millis.is_none());
421 fd.heartbeat(0);
422 fd.heartbeat(1000);
423 fd.heartbeat(1100);
424 let _ = fd.last_heartbeat_millis.unwrap();
425 assert!(fd.is_available(1200));
426 }
427
428 #[test]
429 fn test_is_not_available_if_heartbeat_are_missed() {
430 let mut fd = PhiAccrualFailureDetector {
431 threshold: 3.0,
432 min_std_deviation_millis: 100.0,
433 acceptable_heartbeat_pause_millis: 0,
434 first_heartbeat_estimate_millis: 1000,
435 heartbeat_history: HeartbeatHistory::new(1000),
436 last_heartbeat_millis: None,
437 };
438 fd.heartbeat(0);
439 fd.heartbeat(1000);
440 fd.heartbeat(1100);
441 assert!(fd.is_available(1200));
442 assert!(!fd.is_available(8200));
443 }
444
445 #[test]
446 fn test_is_available_if_it_starts_heartbeat_again_after_being_marked_dead_due_to_detection_of_failure(
447 ) {
448 let mut fd = PhiAccrualFailureDetector {
449 threshold: 8.0,
450 min_std_deviation_millis: 100.0,
451 acceptable_heartbeat_pause_millis: 3000,
452 first_heartbeat_estimate_millis: 1000,
453 heartbeat_history: HeartbeatHistory::new(1000),
454 last_heartbeat_millis: None,
455 };
456
457 let mut now = 0;
461 for _ in 0..1000 {
462 fd.heartbeat(now);
463 now += 1000;
464 }
465 now += 5 * 60 * 1000;
466 assert!(!fd.is_available(now)); now += 100;
468 fd.heartbeat(now);
469 now += 900;
470 assert!(fd.is_available(now));
471 now += 100;
472 fd.heartbeat(now);
473 now += 7000;
474 assert!(!fd.is_available(now)); now += 100;
476 fd.heartbeat(now);
477 now += 900;
478 assert!(fd.is_available(now));
479 now += 100;
480 fd.heartbeat(now);
481 now += 900;
482 assert!(fd.is_available(now));
483 }
484
485 #[test]
486 fn test_accept_some_configured_missing_heartbeats() {
487 let mut fd = PhiAccrualFailureDetector {
488 threshold: 8.0,
489 min_std_deviation_millis: 100.0,
490 acceptable_heartbeat_pause_millis: 3000,
491 first_heartbeat_estimate_millis: 1000,
492 heartbeat_history: HeartbeatHistory::new(1000),
493 last_heartbeat_millis: None,
494 };
495 fd.heartbeat(0);
496 fd.heartbeat(1000);
497 fd.heartbeat(2000);
498 fd.heartbeat(3000);
499 assert!(fd.is_available(7000));
500 fd.heartbeat(8000);
501 assert!(fd.is_available(9000));
502 }
503
504 #[test]
505 fn test_fail_after_configured_acceptable_missing_heartbeats() {
506 let mut fd = PhiAccrualFailureDetector {
507 threshold: 8.0,
508 min_std_deviation_millis: 100.0,
509 acceptable_heartbeat_pause_millis: 3000,
510 first_heartbeat_estimate_millis: 1000,
511 heartbeat_history: HeartbeatHistory::new(1000),
512 last_heartbeat_millis: None,
513 };
514 fd.heartbeat(0);
515 fd.heartbeat(1000);
516 fd.heartbeat(2000);
517 fd.heartbeat(3000);
518 fd.heartbeat(4000);
519 fd.heartbeat(5000);
520 assert!(fd.is_available(5500));
521 fd.heartbeat(6000);
522 assert!(!fd.is_available(11000));
523 }
524
525 #[test]
526 fn test_use_max_sample_size_heartbeats() {
527 let mut fd = PhiAccrualFailureDetector {
528 threshold: 8.0,
529 min_std_deviation_millis: 100.0,
530 acceptable_heartbeat_pause_millis: 0,
531 first_heartbeat_estimate_millis: 1000,
532 heartbeat_history: HeartbeatHistory::new(3),
533 last_heartbeat_millis: None,
534 };
535 fd.heartbeat(0);
537 fd.heartbeat(100);
538 fd.heartbeat(200);
539 fd.heartbeat(300);
540 let phi1 = fd.phi(400);
541 fd.heartbeat(1000);
543 fd.heartbeat(1500);
544 fd.heartbeat(2000);
545 fd.heartbeat(2500);
546 let phi2 = fd.phi(3000);
547 assert_eq!(phi1, phi2);
548 }
549
550 #[test]
551 fn test_heartbeat_history_calculate_correct_mean_and_variance() {
552 let mut history = HeartbeatHistory::new(20);
553 for i in [100, 200, 125, 340, 130] {
554 history.add(i);
555 }
556 assert!((history.mean() - 179.0).abs() < 0.00001);
557 assert!((history.variance() - 7584.0).abs() < 0.00001);
558 }
559
560 #[test]
561 fn test_heartbeat_history_have_0_variance_for_one_sample() {
562 let mut history = HeartbeatHistory::new(600);
563 history.add(1000);
564 assert!((history.variance() - 0.0).abs() < 0.00001);
565 }
566
567 #[test]
568 fn test_heartbeat_history_be_capped_by_the_specified_max_sample_size() {
569 let mut history = HeartbeatHistory::new(3);
570 history.add(100);
571 history.add(110);
572 history.add(90);
573 assert!((history.mean() - 100.0).abs() < 0.00001);
574 assert!((history.variance() - 66.6666667).abs() < 0.00001);
575 history.add(140);
576 assert!((history.mean() - 113.333333).abs() < 0.00001);
577 assert!((history.variance() - 422.222222).abs() < 0.00001);
578 history.add(80);
579 assert!((history.mean() - 103.333333).abs() < 0.00001);
580 assert!((history.variance() - 688.88888889).abs() < 0.00001);
581 }
582}