common_function/scalars/anomaly/
zscore.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! `anomaly_score_zscore` window function — Z-Score-based anomaly scoring.
16//!
17//! Algorithm: `score = |x - mean(window)| / stddev(window)`
18//!
19//! When stddev = 0 (constant window), returns 0.0 if value equals mean,
20//! or +inf otherwise.
21
22use std::any::Any;
23use std::fmt::Debug;
24use std::ops::Range;
25use std::sync::Arc;
26
27use arrow::array::{Array, ArrayRef, Float64Array};
28use arrow::datatypes::{DataType, Field, FieldRef};
29use datafusion_common::{DataFusionError, Result, ScalarValue};
30use datafusion_expr::type_coercion::aggregates::NUMERICS;
31use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
32use datafusion_functions_window_common::field::WindowUDFFieldArgs;
33use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
34
35use crate::scalars::anomaly::utils::{anomaly_ratio, cast_to_f64, collect_window_values};
36
37/// Minimum valid samples for zscore (stddev requires n >= 2).
38const MIN_SAMPLES: usize = 2;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct AnomalyScoreZscore {
42    signature: Signature,
43}
44
45impl AnomalyScoreZscore {
46    pub fn new() -> Self {
47        Self {
48            signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
49        }
50    }
51}
52
53impl WindowUDFImpl for AnomalyScoreZscore {
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn name(&self) -> &str {
59        "anomaly_score_zscore"
60    }
61
62    fn signature(&self) -> &Signature {
63        &self.signature
64    }
65
66    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
67        Ok(Arc::new(Field::new(
68            field_args.name(),
69            DataType::Float64,
70            true, // nullable
71        )))
72    }
73
74    fn partition_evaluator(
75        &self,
76        _partition_evaluator_args: PartitionEvaluatorArgs,
77    ) -> Result<Box<dyn PartitionEvaluator>> {
78        Ok(Box::new(AnomalyScoreZscoreEvaluator { current_row: 0 }))
79    }
80}
81
82#[derive(Debug)]
83struct AnomalyScoreZscoreEvaluator {
84    /// Tracks the current row index within the partition.
85    current_row: usize,
86}
87
88impl PartitionEvaluator for AnomalyScoreZscoreEvaluator {
89    fn uses_window_frame(&self) -> bool {
90        true
91    }
92
93    fn supports_bounded_execution(&self) -> bool {
94        false
95    }
96
97    fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
98        let values_f64 = cast_to_f64(&values[0])?;
99        let array = values_f64
100            .as_any()
101            .downcast_ref::<Float64Array>()
102            .ok_or_else(|| {
103                DataFusionError::Internal(format!(
104                    "Expected Float64Array, got: {:?}",
105                    values_f64.data_type()
106                ))
107            })?;
108
109        // Use the tracked current row index — correct for any window frame.
110        let current_idx = self.current_row;
111        self.current_row += 1;
112
113        if current_idx >= array.len()
114            || !array.is_valid(current_idx)
115            || !array.value(current_idx).is_finite()
116        {
117            return Ok(ScalarValue::Float64(None));
118        }
119        let current_value = array.value(current_idx);
120
121        let window_values = collect_window_values(array, range);
122        if window_values.len() < MIN_SAMPLES {
123            return Ok(ScalarValue::Float64(None));
124        }
125
126        let n = window_values.len() as f64;
127        let mean = window_values.iter().sum::<f64>() / n;
128
129        let variance = window_values
130            .iter()
131            .map(|x| (x - mean).powi(2))
132            .sum::<f64>()
133            / n;
134        let stddev = variance.sqrt();
135
136        let distance = (current_value - mean).abs();
137        let score = anomaly_ratio(distance, stddev);
138        Ok(ScalarValue::Float64(Some(score)))
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use std::sync::Arc;
145
146    use arrow::array::Float64Array;
147    use datafusion_expr::WindowUDF;
148
149    use super::*;
150
151    fn eval_zscore(values: &[Option<f64>], range: Range<usize>) -> ScalarValue {
152        let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
153        let current_row = range.end.saturating_sub(1);
154        let mut evaluator = AnomalyScoreZscoreEvaluator { current_row };
155        evaluator.evaluate(&[array], &range).unwrap()
156    }
157
158    #[test]
159    fn test_basic_outlier() {
160        // Use enough normal points so the outlier doesn't dominate stddev
161        let values: Vec<Option<f64>> = vec![
162            Some(1.0),
163            Some(2.0),
164            Some(1.5),
165            Some(2.5),
166            Some(1.0),
167            Some(2.0),
168            Some(1.5),
169            Some(2.5),
170            Some(1.0),
171            Some(2.0),
172            Some(100.0),
173        ];
174        let len = values.len();
175        let result = eval_zscore(&values, 0..len);
176        match result {
177            ScalarValue::Float64(Some(score)) => assert!(score > 3.0, "score={score}"),
178            other => panic!("expected Some(score), got {other:?}"),
179        }
180    }
181
182    #[test]
183    fn test_constant_sequence() {
184        let values: Vec<Option<f64>> = vec![Some(5.0); 10];
185        let result = eval_zscore(&values, 0..10);
186        match result {
187            ScalarValue::Float64(Some(score)) => assert_eq!(score, 0.0),
188            other => panic!("expected Some(0.0), got {other:?}"),
189        }
190    }
191
192    #[test]
193    fn test_outlier_in_window_gives_finite_score() {
194        // 5.0 is included in the window, so stddev is non-zero → finite positive score
195        let values: Vec<Option<f64>> = vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(5.0)];
196        let result = eval_zscore(&values, 0..5);
197        match result {
198            ScalarValue::Float64(Some(score)) => assert!(score > 0.0),
199            other => panic!("expected Some(score>0), got {other:?}"),
200        }
201    }
202
203    #[test]
204    fn test_all_null() {
205        let values: Vec<Option<f64>> = vec![None, None, None, None];
206        let result = eval_zscore(&values, 0..4);
207        assert_eq!(result, ScalarValue::Float64(None));
208    }
209
210    #[test]
211    fn test_two_samples_returns_score() {
212        // zscore min_samples=2, so two points should produce a score
213        // [1.0, 2.0]: mean=1.5, stddev=0.5, zscore(2.0) = |2.0-1.5|/0.5 = 1.0
214        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
215        let result = eval_zscore(&values, 0..2);
216        match result {
217            ScalarValue::Float64(Some(score)) => {
218                assert!((score - 1.0).abs() < 1e-10, "score={score}")
219            }
220            other => panic!("expected Some(1.0), got {other:?}"),
221        }
222    }
223
224    #[test]
225    fn test_insufficient_samples() {
226        // Single point is insufficient even for zscore
227        let values: Vec<Option<f64>> = vec![Some(1.0)];
228        let result = eval_zscore(&values, 0..1);
229        assert_eq!(result, ScalarValue::Float64(None));
230    }
231
232    #[test]
233    fn test_nan_inf_skipped() {
234        let values: Vec<Option<f64>> = vec![
235            Some(1.0),
236            Some(f64::NAN),
237            Some(f64::INFINITY),
238            Some(2.0),
239            Some(3.0),
240        ];
241        let result = eval_zscore(&values, 0..5);
242        match result {
243            ScalarValue::Float64(Some(_)) => {}
244            other => panic!("expected Some(score), got {other:?}"),
245        }
246    }
247
248    #[test]
249    fn test_current_row_null() {
250        let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), None];
251        let result = eval_zscore(&values, 0..5);
252        assert_eq!(result, ScalarValue::Float64(None));
253    }
254
255    #[test]
256    fn test_known_zscore() {
257        // For data [0, 0, 0, 0, 10]:
258        // mean = 2.0, variance = (4+4+4+4+64)/5 = 16, stddev = 4.0
259        // zscore of 10 = |10-2|/4 = 2.0
260        let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), Some(10.0)];
261        let result = eval_zscore(&values, 0..5);
262        match result {
263            ScalarValue::Float64(Some(score)) => {
264                assert!((score - 2.0).abs() < 1e-10, "score={score}")
265            }
266            other => panic!("expected Some(2.0), got {other:?}"),
267        }
268    }
269
270    #[test]
271    fn test_udwf_creation() {
272        let udwf = WindowUDF::from(AnomalyScoreZscore::new());
273        assert_eq!(udwf.name(), "anomaly_score_zscore");
274    }
275
276    /// Verify the current_row counter increments correctly across sequential
277    /// evaluate() calls, and that each call scores the right row.
278    #[test]
279    fn test_sequential_evaluate_calls() {
280        // Data: [0, 0, 0, 0, 10]
281        // Full window for all rows (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
282        // mean=2.0, stddev=4.0 for all rows
283        let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), Some(10.0)];
284        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
285        let full_range = 0..5;
286        let vals = std::slice::from_ref(&array);
287
288        let mut evaluator = AnomalyScoreZscoreEvaluator { current_row: 0 };
289
290        // Row 0: value=0.0, zscore = |0-2|/4 = 0.5
291        let r0 = evaluator.evaluate(vals, &full_range).unwrap();
292        assert_eq!(r0, ScalarValue::Float64(Some(0.5)));
293
294        // Row 1: value=0.0, same score
295        let r1 = evaluator.evaluate(vals, &full_range).unwrap();
296        assert_eq!(r1, ScalarValue::Float64(Some(0.5)));
297
298        // Row 4: skip ahead by consuming rows 2 and 3
299        let _ = evaluator.evaluate(vals, &full_range).unwrap();
300        let _ = evaluator.evaluate(vals, &full_range).unwrap();
301
302        // Row 4: value=10.0, zscore = |10-2|/4 = 2.0
303        let r4 = evaluator.evaluate(vals, &full_range).unwrap();
304        assert_eq!(r4, ScalarValue::Float64(Some(2.0)));
305    }
306
307    /// Verify correct behavior with a centered window frame where the current
308    /// row is NOT at range.end - 1. This is the regression test for the P1
309    /// review finding about range.end - 1 assumption.
310    #[test]
311    fn test_centered_window_frame() {
312        // Data: [0, 0, 10, 0, 0]
313        // Centered window of size 3: ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
314        let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(10.0), Some(0.0), Some(0.0)];
315        let array = Arc::new(Float64Array::from(values)) as ArrayRef;
316
317        // Score row 2 (value=10.0) with window [1..4) = {0.0, 10.0, 0.0}
318        // mean = 10/3 ≈ 3.333, variance = ((10/3)^2 + (20/3)^2 + (10/3)^2)/3
319        // = (100/9 + 400/9 + 100/9) / 3 = 600/27 ≈ 22.222
320        // stddev ≈ 4.714, zscore = |10 - 3.333| / 4.714 ≈ 1.414
321        let mut evaluator = AnomalyScoreZscoreEvaluator { current_row: 2 };
322        let result = evaluator
323            .evaluate(std::slice::from_ref(&array), &(1..4))
324            .unwrap();
325        match result {
326            ScalarValue::Float64(Some(score)) => {
327                // sqrt(2) ≈ 1.4142
328                assert!(
329                    (score - std::f64::consts::SQRT_2).abs() < 1e-10,
330                    "expected ~1.414, got {score}"
331                );
332            }
333            other => panic!("expected Some(score), got {other:?}"),
334        }
335    }
336}