common_function/scalars/anomaly/
mad.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `anomaly_score_mad` window function — MAD-based anomaly scoring.
16//!
17//! Algorithm: `score = |x - median(window)| / (MAD * 1.4826)`
18//! where `MAD = median(|xi - median(window)|)`
19//!
20//! When MAD = 0 (majority-constant window), returns 0.0 if value equals
21//! median, or +inf otherwise.
22
23use std::any::Any;
24use std::fmt::Debug;
25use std::ops::Range;
26use std::sync::Arc;
27
28use arrow::array::{Array, ArrayRef, Float64Array};
29use arrow::datatypes::{DataType, Field, FieldRef};
30use datafusion_common::{DataFusionError, Result, ScalarValue};
31use datafusion_expr::type_coercion::aggregates::NUMERICS;
32use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
33use datafusion_functions_window_common::field::WindowUDFFieldArgs;
34use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
35
36use crate::scalars::anomaly::utils::{
37    anomaly_ratio, cast_to_f64, collect_window_values, median_f64,
38};
39
40/// Minimum valid samples for MAD (n <= 2 makes MAD almost always 0, yielding spurious +inf).
41const MIN_SAMPLES: usize = 3;
42
43/// MAD consistency constant for normal distribution: `1 / Φ⁻¹(3/4) ≈ 1.4826`
44const MAD_CONSISTENCY_CONSTANT: f64 = 1.4826;
45
46#[derive(Debug, Clone, PartialEq, Eq, Hash)]
47pub struct AnomalyScoreMad {
48    signature: Signature,
49}
50
51impl AnomalyScoreMad {
52    pub fn new() -> Self {
53        Self {
54            signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
55        }
56    }
57}
58
59impl WindowUDFImpl for AnomalyScoreMad {
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63
64    fn name(&self) -> &str {
65        "anomaly_score_mad"
66    }
67
68    fn signature(&self) -> &Signature {
69        &self.signature
70    }
71
72    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
73        Ok(Arc::new(Field::new(
74            field_args.name(),
75            DataType::Float64,
76            true, // nullable
77        )))
78    }
79
80    fn partition_evaluator(
81        &self,
82        _partition_evaluator_args: PartitionEvaluatorArgs,
83    ) -> Result<Box<dyn PartitionEvaluator>> {
84        Ok(Box::new(AnomalyScoreMadEvaluator { current_row: 0 }))
85    }
86}
87
88#[derive(Debug)]
89struct AnomalyScoreMadEvaluator {
90    /// Tracks the current row index within the partition.
91    /// DataFusion calls `evaluate()` sequentially for row 0, 1, 2, …
92    /// and creates a fresh evaluator per partition.
93    current_row: usize,
94}
95
96impl PartitionEvaluator for AnomalyScoreMadEvaluator {
97    fn uses_window_frame(&self) -> bool {
98        true
99    }
100
101    fn supports_bounded_execution(&self) -> bool {
102        false
103    }
104
105    fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
106        let values_f64 = cast_to_f64(&values[0])?;
107        let array = values_f64
108            .as_any()
109            .downcast_ref::<Float64Array>()
110            .ok_or_else(|| {
111                DataFusionError::Internal(format!(
112                    "Expected Float64Array, got: {:?}",
113                    values_f64.data_type()
114                ))
115            })?;
116
117        // Use the tracked current row index — correct for any window frame
118        // (trailing, leading, centered, unbounded).
119        let current_idx = self.current_row;
120        self.current_row += 1;
121
122        if current_idx >= array.len()
123            || !array.is_valid(current_idx)
124            || !array.value(current_idx).is_finite()
125        {
126            return Ok(ScalarValue::Float64(None));
127        }
128        let current_value = array.value(current_idx);
129
130        let mut window_values = collect_window_values(array, range);
131        if window_values.len() < MIN_SAMPLES {
132            return Ok(ScalarValue::Float64(None));
133        }
134
135        // Compute median of window
136        let median = match median_f64(&mut window_values) {
137            Some(m) => m,
138            None => return Ok(ScalarValue::Float64(None)),
139        };
140
141        // Compute MAD = median(|xi - median|)
142        let mut abs_deviations: Vec<f64> =
143            window_values.iter().map(|x| (x - median).abs()).collect();
144        let mad = match median_f64(&mut abs_deviations) {
145            Some(m) => m,
146            None => return Ok(ScalarValue::Float64(None)),
147        };
148
149        let distance = (current_value - median).abs();
150        let scale = mad * MAD_CONSISTENCY_CONSTANT;
151        let score = anomaly_ratio(distance, scale);
152        Ok(ScalarValue::Float64(Some(score)))
153    }
154}
155
156#[cfg(test)]
157mod tests {
158    use std::sync::Arc;
159
160    use arrow::array::Float64Array;
161    use datafusion_expr::WindowUDF;
162
163    use super::*;
164
165    fn eval_mad(values: &[Option<f64>], range: Range<usize>) -> ScalarValue {
166        let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
167        // current_row is the last index in the range — the row being scored
168        let current_row = range.end.saturating_sub(1);
169        let mut evaluator = AnomalyScoreMadEvaluator { current_row };
170        evaluator.evaluate(&[array], &range).unwrap()
171    }
172
173    #[test]
174    fn test_basic_outlier() {
175        // Normal data with one outlier (100.0)
176        let values: Vec<Option<f64>> = vec![
177            Some(1.0),
178            Some(2.0),
179            Some(1.5),
180            Some(2.5),
181            Some(1.0),
182            Some(100.0),
183        ];
184        let result = eval_mad(&values, 0..6);
185        match result {
186            ScalarValue::Float64(Some(score)) => assert!(score > 3.0, "score={score}"),
187            other => panic!("expected Some(score), got {other:?}"),
188        }
189    }
190
191    #[test]
192    fn test_constant_sequence() {
193        let values: Vec<Option<f64>> = vec![Some(5.0); 10];
194        // All values are the same -> zero spread and zero distance => 0.0
195        let result = eval_mad(&values, 0..10);
196        match result {
197            ScalarValue::Float64(Some(score)) => assert_eq!(score, 0.0),
198            other => panic!("expected Some(0.0), got {other:?}"),
199        }
200    }
201
202    #[test]
203    fn test_mad_zero_non_median() {
204        // More than 50% identical and current differs from median -> non-zero / 0 => +inf
205        let values: Vec<Option<f64>> = vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(5.0)];
206        let result = eval_mad(&values, 0..5);
207        match result {
208            ScalarValue::Float64(Some(score)) => {
209                assert!(score.is_infinite() && score.is_sign_positive())
210            }
211            other => panic!("expected Some(+inf), got {other:?}"),
212        }
213    }
214
215    #[test]
216    fn test_all_null() {
217        let values: Vec<Option<f64>> = vec![None, None, None, None];
218        let result = eval_mad(&values, 0..4);
219        assert_eq!(result, ScalarValue::Float64(None));
220    }
221
222    #[test]
223    fn test_insufficient_samples() {
224        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
225        let result = eval_mad(&values, 0..2);
226        assert_eq!(result, ScalarValue::Float64(None));
227    }
228
229    #[test]
230    fn test_nan_inf_skipped() {
231        let values: Vec<Option<f64>> = vec![
232            Some(1.0),
233            Some(f64::NAN),
234            Some(f64::INFINITY),
235            Some(2.0),
236            Some(3.0),
237        ];
238        let result = eval_mad(&values, 0..5);
239        // Should compute using [1.0, 2.0, 3.0] only
240        match result {
241            ScalarValue::Float64(Some(_)) => {}
242            other => panic!("expected Some(score), got {other:?}"),
243        }
244    }
245
246    #[test]
247    fn test_current_row_null() {
248        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), None];
249        let result = eval_mad(&values, 0..5);
250        assert_eq!(result, ScalarValue::Float64(None));
251    }
252
253    #[test]
254    fn test_udwf_creation() {
255        let udwf = WindowUDF::from(AnomalyScoreMad::new());
256        assert_eq!(udwf.name(), "anomaly_score_mad");
257    }
258}