common_function/scalars/anomaly/
mad.rs1use std::any::Any;
24use std::fmt::Debug;
25use std::ops::Range;
26use std::sync::Arc;
27
28use arrow::array::{Array, ArrayRef, Float64Array};
29use arrow::datatypes::{DataType, Field, FieldRef};
30use datafusion_common::{DataFusionError, Result, ScalarValue};
31use datafusion_expr::type_coercion::aggregates::NUMERICS;
32use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
33use datafusion_functions_window_common::field::WindowUDFFieldArgs;
34use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
35
36use crate::scalars::anomaly::utils::{
37 anomaly_ratio, cast_to_f64, collect_window_values, median_f64,
38};
39
40const MIN_SAMPLES: usize = 3;
42
43const MAD_CONSISTENCY_CONSTANT: f64 = 1.4826;
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct AnomalyScoreMad {
48 signature: Signature,
49}
50
51impl AnomalyScoreMad {
52 pub fn new() -> Self {
53 Self {
54 signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
55 }
56 }
57}
58
59impl WindowUDFImpl for AnomalyScoreMad {
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 fn name(&self) -> &str {
65 "anomaly_score_mad"
66 }
67
68 fn signature(&self) -> &Signature {
69 &self.signature
70 }
71
72 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
73 Ok(Arc::new(Field::new(
74 field_args.name(),
75 DataType::Float64,
76 true, )))
78 }
79
80 fn partition_evaluator(
81 &self,
82 _partition_evaluator_args: PartitionEvaluatorArgs,
83 ) -> Result<Box<dyn PartitionEvaluator>> {
84 Ok(Box::new(AnomalyScoreMadEvaluator { current_row: 0 }))
85 }
86}
87
88#[derive(Debug)]
89struct AnomalyScoreMadEvaluator {
90 current_row: usize,
94}
95
96impl PartitionEvaluator for AnomalyScoreMadEvaluator {
97 fn uses_window_frame(&self) -> bool {
98 true
99 }
100
101 fn supports_bounded_execution(&self) -> bool {
102 false
103 }
104
105 fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
106 let values_f64 = cast_to_f64(&values[0])?;
107 let array = values_f64
108 .as_any()
109 .downcast_ref::<Float64Array>()
110 .ok_or_else(|| {
111 DataFusionError::Internal(format!(
112 "Expected Float64Array, got: {:?}",
113 values_f64.data_type()
114 ))
115 })?;
116
117 let current_idx = self.current_row;
120 self.current_row += 1;
121
122 if current_idx >= array.len()
123 || !array.is_valid(current_idx)
124 || !array.value(current_idx).is_finite()
125 {
126 return Ok(ScalarValue::Float64(None));
127 }
128 let current_value = array.value(current_idx);
129
130 let mut window_values = collect_window_values(array, range);
131 if window_values.len() < MIN_SAMPLES {
132 return Ok(ScalarValue::Float64(None));
133 }
134
135 let median = match median_f64(&mut window_values) {
137 Some(m) => m,
138 None => return Ok(ScalarValue::Float64(None)),
139 };
140
141 let mut abs_deviations: Vec<f64> =
143 window_values.iter().map(|x| (x - median).abs()).collect();
144 let mad = match median_f64(&mut abs_deviations) {
145 Some(m) => m,
146 None => return Ok(ScalarValue::Float64(None)),
147 };
148
149 let distance = (current_value - median).abs();
150 let scale = mad * MAD_CONSISTENCY_CONSTANT;
151 let score = anomaly_ratio(distance, scale);
152 Ok(ScalarValue::Float64(Some(score)))
153 }
154}
155
156#[cfg(test)]
157mod tests {
158 use std::sync::Arc;
159
160 use arrow::array::Float64Array;
161 use datafusion_expr::WindowUDF;
162
163 use super::*;
164
165 fn eval_mad(values: &[Option<f64>], range: Range<usize>) -> ScalarValue {
166 let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
167 let current_row = range.end.saturating_sub(1);
169 let mut evaluator = AnomalyScoreMadEvaluator { current_row };
170 evaluator.evaluate(&[array], &range).unwrap()
171 }
172
173 #[test]
174 fn test_basic_outlier() {
175 let values: Vec<Option<f64>> = vec![
177 Some(1.0),
178 Some(2.0),
179 Some(1.5),
180 Some(2.5),
181 Some(1.0),
182 Some(100.0),
183 ];
184 let result = eval_mad(&values, 0..6);
185 match result {
186 ScalarValue::Float64(Some(score)) => assert!(score > 3.0, "score={score}"),
187 other => panic!("expected Some(score), got {other:?}"),
188 }
189 }
190
191 #[test]
192 fn test_constant_sequence() {
193 let values: Vec<Option<f64>> = vec![Some(5.0); 10];
194 let result = eval_mad(&values, 0..10);
196 match result {
197 ScalarValue::Float64(Some(score)) => assert_eq!(score, 0.0),
198 other => panic!("expected Some(0.0), got {other:?}"),
199 }
200 }
201
202 #[test]
203 fn test_mad_zero_non_median() {
204 let values: Vec<Option<f64>> = vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(5.0)];
206 let result = eval_mad(&values, 0..5);
207 match result {
208 ScalarValue::Float64(Some(score)) => {
209 assert!(score.is_infinite() && score.is_sign_positive())
210 }
211 other => panic!("expected Some(+inf), got {other:?}"),
212 }
213 }
214
215 #[test]
216 fn test_all_null() {
217 let values: Vec<Option<f64>> = vec![None, None, None, None];
218 let result = eval_mad(&values, 0..4);
219 assert_eq!(result, ScalarValue::Float64(None));
220 }
221
222 #[test]
223 fn test_insufficient_samples() {
224 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
225 let result = eval_mad(&values, 0..2);
226 assert_eq!(result, ScalarValue::Float64(None));
227 }
228
229 #[test]
230 fn test_nan_inf_skipped() {
231 let values: Vec<Option<f64>> = vec![
232 Some(1.0),
233 Some(f64::NAN),
234 Some(f64::INFINITY),
235 Some(2.0),
236 Some(3.0),
237 ];
238 let result = eval_mad(&values, 0..5);
239 match result {
241 ScalarValue::Float64(Some(_)) => {}
242 other => panic!("expected Some(score), got {other:?}"),
243 }
244 }
245
246 #[test]
247 fn test_current_row_null() {
248 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), None];
249 let result = eval_mad(&values, 0..5);
250 assert_eq!(result, ScalarValue::Float64(None));
251 }
252
253 #[test]
254 fn test_udwf_creation() {
255 let udwf = WindowUDF::from(AnomalyScoreMad::new());
256 assert_eq!(udwf.name(), "anomaly_score_mad");
257 }
258}