common_function/scalars/anomaly/
iqr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `anomaly_score_iqr` window function — IQR-based anomaly scoring.
16//!
17//! Algorithm:
18//! - Compute Q1 (25th percentile) and Q3 (75th percentile)
19//! - IQR = Q3 - Q1
20//! - Lower fence = Q1 - k * IQR, Upper fence = Q3 + k * IQR
21//! - If value is outside fences, score = |distance to nearest fence| / IQR
22//! - Otherwise, score = 0.0
23//!
24//! When IQR = 0 (constant quartiles), returns 0.0 if value is on the fence,
25//! or +inf if value is outside.
26
27use std::any::Any;
28use std::fmt::Debug;
29use std::ops::Range;
30use std::sync::Arc;
31
32use arrow::array::{Array, ArrayRef, Float64Array};
33use arrow::datatypes::{DataType, Field, FieldRef};
34use datafusion_common::{DataFusionError, Result, ScalarValue};
35use datafusion_expr::type_coercion::aggregates::NUMERICS;
36use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
37use datafusion_functions_window_common::field::WindowUDFFieldArgs;
38use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
39
40use crate::scalars::anomaly::utils::{cast_to_f64, collect_window_values, percentile_sorted};
41
42/// Minimum valid samples for IQR (linear-interpolated Q1 != Q3 is possible at n >= 3).
43const MIN_SAMPLES: usize = 3;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub struct AnomalyScoreIqr {
47    signature: Signature,
48}
49
50impl AnomalyScoreIqr {
51    pub fn new() -> Self {
52        Self {
53            signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable),
54        }
55    }
56}
57
58impl WindowUDFImpl for AnomalyScoreIqr {
59    fn as_any(&self) -> &dyn Any {
60        self
61    }
62
63    fn name(&self) -> &str {
64        "anomaly_score_iqr"
65    }
66
67    fn signature(&self) -> &Signature {
68        &self.signature
69    }
70
71    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
72        Ok(Arc::new(Field::new(
73            field_args.name(),
74            DataType::Float64,
75            true, // nullable
76        )))
77    }
78
79    fn partition_evaluator(
80        &self,
81        _partition_evaluator_args: PartitionEvaluatorArgs,
82    ) -> Result<Box<dyn PartitionEvaluator>> {
83        Ok(Box::new(AnomalyScoreIqrEvaluator { current_row: 0 }))
84    }
85}
86
87#[derive(Debug)]
88struct AnomalyScoreIqrEvaluator {
89    /// Tracks the current row index within the partition.
90    current_row: usize,
91}
92
93impl PartitionEvaluator for AnomalyScoreIqrEvaluator {
94    fn uses_window_frame(&self) -> bool {
95        true
96    }
97
98    fn supports_bounded_execution(&self) -> bool {
99        false
100    }
101
102    fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
103        // TODO(dennis): memoize the converted arrays to avoid re-casting inside each row evaluation
104        let values_f64 = cast_to_f64(&values[0])?;
105        let array = values_f64
106            .as_any()
107            .downcast_ref::<Float64Array>()
108            .ok_or_else(|| {
109                DataFusionError::Internal(format!(
110                    "Expected Float64Array, got: {:?}",
111                    values_f64.data_type()
112                ))
113            })?;
114
115        // Extract k from the second argument (constant across the window)
116        let k_f64 = cast_to_f64(&values[1])?;
117        let k_array = k_f64
118            .as_any()
119            .downcast_ref::<Float64Array>()
120            .ok_or_else(|| {
121                DataFusionError::Internal(format!(
122                    "Expected Float64Array for k, got: {:?}",
123                    k_f64.data_type()
124                ))
125            })?;
126
127        // Use the tracked current row index — correct for any window frame.
128        let current_idx = self.current_row;
129        self.current_row += 1;
130
131        // Check bounds and validity of the current row.
132        if current_idx >= array.len()
133            || !array.is_valid(current_idx)
134            || !array.value(current_idx).is_finite()
135        {
136            return Ok(ScalarValue::Float64(None));
137        }
138        let current_value = array.value(current_idx);
139
140        // Get k value (use current row's k)
141        if current_idx >= k_array.len()
142            || !k_array.is_valid(current_idx)
143            || !k_array.value(current_idx).is_finite()
144        {
145            return Ok(ScalarValue::Float64(None));
146        }
147        let k = k_array.value(current_idx);
148        if k < 0.0 {
149            return Ok(ScalarValue::Float64(None));
150        }
151
152        let mut window_values = collect_window_values(array, range);
153        if window_values.len() < MIN_SAMPLES {
154            return Ok(ScalarValue::Float64(None));
155        }
156
157        // Sort for percentile computation
158        window_values.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap());
159
160        let q1 = match percentile_sorted(&window_values, 0.25) {
161            Some(v) => v,
162            None => return Ok(ScalarValue::Float64(None)),
163        };
164        let q3 = match percentile_sorted(&window_values, 0.75) {
165            Some(v) => v,
166            None => return Ok(ScalarValue::Float64(None)),
167        };
168        let iqr = q3 - q1;
169
170        let lower_fence = q1 - k * iqr;
171        let upper_fence = q3 + k * iqr;
172
173        let score = if current_value < lower_fence {
174            (lower_fence - current_value) / iqr
175        } else if current_value > upper_fence {
176            (current_value - upper_fence) / iqr
177        } else {
178            0.0
179        };
180
181        Ok(ScalarValue::Float64(Some(score)))
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use std::sync::Arc;
188
189    use arrow::array::Float64Array;
190    use datafusion_expr::WindowUDF;
191
192    use super::*;
193
194    fn eval_iqr(values: &[Option<f64>], k: f64, range: Range<usize>) -> ScalarValue {
195        let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
196        let k_array = Arc::new(Float64Array::from(vec![Some(k); values.len()])) as ArrayRef;
197        let current_row = range.end.saturating_sub(1);
198        let mut evaluator = AnomalyScoreIqrEvaluator { current_row };
199        evaluator.evaluate(&[array, k_array], &range).unwrap()
200    }
201
202    #[test]
203    fn test_basic_outlier() {
204        // Normal range [1..10] with outlier at 100
205        let mut values: Vec<Option<f64>> = (1..=10).map(|x| Some(x as f64)).collect();
206        values.push(Some(100.0));
207        let result = eval_iqr(&values, 1.5, 0..11);
208        match result {
209            ScalarValue::Float64(Some(score)) => assert!(score > 0.0, "score={score}"),
210            other => panic!("expected Some(score), got {other:?}"),
211        }
212    }
213
214    #[test]
215    fn test_value_within_fences() {
216        let values: Vec<Option<f64>> = (1..=10).map(|x| Some(x as f64)).collect();
217        // Last value is 10, which is within fences for k=1.5
218        let result = eval_iqr(&values, 1.5, 0..10);
219        match result {
220            ScalarValue::Float64(Some(score)) => {
221                assert!(score >= 0.0, "score should be non-negative, got {score}");
222            }
223            other => panic!("expected Some(score), got {other:?}"),
224        }
225    }
226
227    #[test]
228    fn test_constant_sequence() {
229        let values: Vec<Option<f64>> = vec![Some(5.0); 10];
230        // IQR=0 and current is on the fence -> 0.0
231        let result = eval_iqr(&values, 1.5, 0..10);
232        assert_eq!(result, ScalarValue::Float64(Some(0.0)));
233    }
234
235    #[test]
236    fn test_zero_iqr_outlier_is_infinite() {
237        // Q1=Q3=1.0 -> IQR=0.0, current value is outside fence.
238        let values: Vec<Option<f64>> =
239            vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(100.0)];
240        let result = eval_iqr(&values, 1.5, 0..5);
241        match result {
242            ScalarValue::Float64(Some(score)) => {
243                assert!(score.is_infinite() && score.is_sign_positive())
244            }
245            other => panic!("expected Some(+inf), got {other:?}"),
246        }
247    }
248
249    #[test]
250    fn test_all_null() {
251        let values: Vec<Option<f64>> = vec![None; 5];
252        let result = eval_iqr(&values, 1.5, 0..5);
253        assert_eq!(result, ScalarValue::Float64(None));
254    }
255
256    #[test]
257    fn test_insufficient_samples() {
258        // IQR requires min_samples=3; 2 points should return NULL
259        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
260        let result = eval_iqr(&values, 1.5, 0..2);
261        assert_eq!(result, ScalarValue::Float64(None));
262    }
263
264    #[test]
265    fn test_negative_k() {
266        let values: Vec<Option<f64>> =
267            vec![Some(48.0), Some(49.0), Some(50.0), Some(51.0), Some(52.0)];
268        let result = eval_iqr(&values, -1.0, 0..5);
269        assert_eq!(result, ScalarValue::Float64(None));
270    }
271
272    #[test]
273    fn test_current_row_null() {
274        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), None];
275        let result = eval_iqr(&values, 1.5, 0..4);
276        assert_eq!(result, ScalarValue::Float64(None));
277    }
278
279    #[test]
280    fn test_lower_outlier() {
281        // Data mostly around 50, one low value at the end
282        let values: Vec<Option<f64>> = vec![
283            Some(48.0),
284            Some(49.0),
285            Some(50.0),
286            Some(51.0),
287            Some(52.0),
288            Some(-100.0),
289        ];
290        let result = eval_iqr(&values, 1.5, 0..6);
291        match result {
292            ScalarValue::Float64(Some(score)) => assert!(score > 0.0, "score={score}"),
293            other => panic!("expected Some(score), got {other:?}"),
294        }
295
296        // Test that a value in the middle gets 0
297        let values: Vec<Option<f64>> = vec![
298            Some(48.0),
299            Some(49.0),
300            Some(50.0),
301            Some(51.0),
302            Some(52.0),
303            Some(-100.0),
304            Some(50.0),
305        ];
306        let result = eval_iqr(&values, 1.5, 0..7);
307        assert_eq!(result, ScalarValue::Float64(Some(0.0)));
308    }
309
310    #[test]
311    fn test_udwf_creation() {
312        let udwf = WindowUDF::from(AnomalyScoreIqr::new());
313        assert_eq!(udwf.name(), "anomaly_score_iqr");
314    }
315}