common_function/scalars/anomaly/
utils.rs1use std::ops::Range;
18
19use arrow::array::{Array, ArrayRef, Float64Array};
20use arrow::compute;
21use arrow::datatypes::DataType;
22use datafusion_common::DataFusionError;
23
24pub fn cast_to_f64(array: &ArrayRef) -> datafusion_common::Result<ArrayRef> {
26 if array.data_type() == &DataType::Float64 {
27 return Ok(array.clone());
28 }
29 compute::cast(array, &DataType::Float64)
30 .map_err(|e| DataFusionError::Internal(format!("Failed to cast to Float64: {e}")))
31}
32
33pub fn collect_window_values(array: &Float64Array, range: &Range<usize>) -> Vec<f64> {
36 let mut values = Vec::with_capacity(range.len());
37 for i in range.clone() {
38 if array.is_valid(i) {
39 let v = array.value(i);
40 if v.is_finite() {
41 values.push(v);
42 }
43 }
44 }
45 values
46}
47
48pub fn median_f64(values: &mut [f64]) -> Option<f64> {
53 let len = values.len();
54 if len == 0 {
55 return None;
56 }
57 let mid = len / 2;
58 let (lower, median, _) = values.select_nth_unstable_by(mid, |a, b| a.total_cmp(b));
59 if len % 2 == 1 {
60 Some(*median)
61 } else {
62 let right = *median;
63 let left = lower.iter().copied().max_by(|a, b| a.total_cmp(b)).unwrap();
65 Some((left + right) / 2.0)
66 }
67}
68
69pub fn percentile_sorted(sorted: &[f64], p: f64) -> Option<f64> {
74 let len = sorted.len();
75 if len == 0 {
76 return None;
77 }
78 if len == 1 {
79 return Some(sorted[0]);
80 }
81 let idx = p * (len - 1) as f64;
82 let lower = idx.floor() as usize;
83 let upper = idx.ceil() as usize;
84 if lower == upper {
85 Some(sorted[lower])
86 } else {
87 let frac = idx - lower as f64;
88 Some(sorted[lower] * (1.0 - frac) + sorted[upper] * frac)
89 }
90}
91
92#[inline]
98pub fn anomaly_ratio(distance: f64, scale: f64) -> f64 {
99 if scale == 0.0 {
100 if distance == 0.0 { 0.0 } else { f64::INFINITY }
101 } else {
102 distance / scale
103 }
104}
105
106#[cfg(test)]
107mod tests {
108 use super::*;
109
110 #[test]
111 fn test_median_odd() {
112 let mut v = vec![5.0, 1.0, 3.0, 2.0, 4.0];
113 assert_eq!(median_f64(&mut v), Some(3.0));
114 }
115
116 #[test]
117 fn test_median_even() {
118 let mut v = vec![4.0, 1.0, 3.0, 2.0];
119 assert_eq!(median_f64(&mut v), Some(2.5));
120 }
121
122 #[test]
123 fn test_median_single() {
124 let mut v = vec![42.0];
125 assert_eq!(median_f64(&mut v), Some(42.0));
126 }
127
128 #[test]
129 fn test_median_empty() {
130 let mut v: Vec<f64> = vec![];
131 assert_eq!(median_f64(&mut v), None);
132 }
133
134 #[test]
135 fn test_percentile_sorted_quartiles() {
136 let sorted = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
137 let q1 = percentile_sorted(&sorted, 0.25).unwrap();
138 let q3 = percentile_sorted(&sorted, 0.75).unwrap();
139 assert!((q1 - 3.25).abs() < 1e-10);
140 assert!((q3 - 7.75).abs() < 1e-10);
141 }
142
143 #[test]
144 fn test_percentile_sorted_empty() {
145 assert_eq!(percentile_sorted(&[], 0.5), None);
146 }
147
148 #[test]
149 fn test_collect_window_values_filters_invalid() {
150 let array = Float64Array::from(vec![
151 Some(1.0),
152 None,
153 Some(f64::NAN),
154 Some(f64::INFINITY),
155 Some(f64::NEG_INFINITY),
156 Some(2.0),
157 Some(3.0),
158 ]);
159 let values = collect_window_values(&array, &(0..7));
160 assert_eq!(values, vec![1.0, 2.0, 3.0]);
161 }
162
163 #[test]
164 fn test_anomaly_ratio_zero_scale() {
165 assert_eq!(anomaly_ratio(0.0, 0.0), 0.0);
166 assert!(anomaly_ratio(1.0, 0.0).is_infinite());
167 }
168}