common_function/scalars/anomaly/
zscore.rs1use std::any::Any;
23use std::fmt::Debug;
24use std::ops::Range;
25use std::sync::Arc;
26
27use arrow::array::{Array, ArrayRef, Float64Array};
28use arrow::datatypes::{DataType, Field, FieldRef};
29use datafusion_common::{DataFusionError, Result, ScalarValue};
30use datafusion_expr::type_coercion::aggregates::NUMERICS;
31use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
32use datafusion_functions_window_common::field::WindowUDFFieldArgs;
33use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
34
35use crate::scalars::anomaly::utils::{anomaly_ratio, cast_to_f64, collect_window_values};
36
37const MIN_SAMPLES: usize = 2;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41pub struct AnomalyScoreZscore {
42 signature: Signature,
43}
44
45impl AnomalyScoreZscore {
46 pub fn new() -> Self {
47 Self {
48 signature: Signature::uniform(1, NUMERICS.to_vec(), Volatility::Immutable),
49 }
50 }
51}
52
53impl WindowUDFImpl for AnomalyScoreZscore {
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57
58 fn name(&self) -> &str {
59 "anomaly_score_zscore"
60 }
61
62 fn signature(&self) -> &Signature {
63 &self.signature
64 }
65
66 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
67 Ok(Arc::new(Field::new(
68 field_args.name(),
69 DataType::Float64,
70 true, )))
72 }
73
74 fn partition_evaluator(
75 &self,
76 _partition_evaluator_args: PartitionEvaluatorArgs,
77 ) -> Result<Box<dyn PartitionEvaluator>> {
78 Ok(Box::new(AnomalyScoreZscoreEvaluator { current_row: 0 }))
79 }
80}
81
82#[derive(Debug)]
83struct AnomalyScoreZscoreEvaluator {
84 current_row: usize,
86}
87
88impl PartitionEvaluator for AnomalyScoreZscoreEvaluator {
89 fn uses_window_frame(&self) -> bool {
90 true
91 }
92
93 fn supports_bounded_execution(&self) -> bool {
94 false
95 }
96
97 fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
98 let values_f64 = cast_to_f64(&values[0])?;
99 let array = values_f64
100 .as_any()
101 .downcast_ref::<Float64Array>()
102 .ok_or_else(|| {
103 DataFusionError::Internal(format!(
104 "Expected Float64Array, got: {:?}",
105 values_f64.data_type()
106 ))
107 })?;
108
109 let current_idx = self.current_row;
111 self.current_row += 1;
112
113 if current_idx >= array.len()
114 || !array.is_valid(current_idx)
115 || !array.value(current_idx).is_finite()
116 {
117 return Ok(ScalarValue::Float64(None));
118 }
119 let current_value = array.value(current_idx);
120
121 let window_values = collect_window_values(array, range);
122 if window_values.len() < MIN_SAMPLES {
123 return Ok(ScalarValue::Float64(None));
124 }
125
126 let n = window_values.len() as f64;
127 let mean = window_values.iter().sum::<f64>() / n;
128
129 let variance = window_values
130 .iter()
131 .map(|x| (x - mean).powi(2))
132 .sum::<f64>()
133 / n;
134 let stddev = variance.sqrt();
135
136 let distance = (current_value - mean).abs();
137 let score = anomaly_ratio(distance, stddev);
138 Ok(ScalarValue::Float64(Some(score)))
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use std::sync::Arc;
145
146 use arrow::array::Float64Array;
147 use datafusion_expr::WindowUDF;
148
149 use super::*;
150
151 fn eval_zscore(values: &[Option<f64>], range: Range<usize>) -> ScalarValue {
152 let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
153 let current_row = range.end.saturating_sub(1);
154 let mut evaluator = AnomalyScoreZscoreEvaluator { current_row };
155 evaluator.evaluate(&[array], &range).unwrap()
156 }
157
158 #[test]
159 fn test_basic_outlier() {
160 let values: Vec<Option<f64>> = vec![
162 Some(1.0),
163 Some(2.0),
164 Some(1.5),
165 Some(2.5),
166 Some(1.0),
167 Some(2.0),
168 Some(1.5),
169 Some(2.5),
170 Some(1.0),
171 Some(2.0),
172 Some(100.0),
173 ];
174 let len = values.len();
175 let result = eval_zscore(&values, 0..len);
176 match result {
177 ScalarValue::Float64(Some(score)) => assert!(score > 3.0, "score={score}"),
178 other => panic!("expected Some(score), got {other:?}"),
179 }
180 }
181
182 #[test]
183 fn test_constant_sequence() {
184 let values: Vec<Option<f64>> = vec![Some(5.0); 10];
185 let result = eval_zscore(&values, 0..10);
186 match result {
187 ScalarValue::Float64(Some(score)) => assert_eq!(score, 0.0),
188 other => panic!("expected Some(0.0), got {other:?}"),
189 }
190 }
191
192 #[test]
193 fn test_outlier_in_window_gives_finite_score() {
194 let values: Vec<Option<f64>> = vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(5.0)];
196 let result = eval_zscore(&values, 0..5);
197 match result {
198 ScalarValue::Float64(Some(score)) => assert!(score > 0.0),
199 other => panic!("expected Some(score>0), got {other:?}"),
200 }
201 }
202
203 #[test]
204 fn test_all_null() {
205 let values: Vec<Option<f64>> = vec![None, None, None, None];
206 let result = eval_zscore(&values, 0..4);
207 assert_eq!(result, ScalarValue::Float64(None));
208 }
209
210 #[test]
211 fn test_two_samples_returns_score() {
212 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
215 let result = eval_zscore(&values, 0..2);
216 match result {
217 ScalarValue::Float64(Some(score)) => {
218 assert!((score - 1.0).abs() < 1e-10, "score={score}")
219 }
220 other => panic!("expected Some(1.0), got {other:?}"),
221 }
222 }
223
224 #[test]
225 fn test_insufficient_samples() {
226 let values: Vec<Option<f64>> = vec![Some(1.0)];
228 let result = eval_zscore(&values, 0..1);
229 assert_eq!(result, ScalarValue::Float64(None));
230 }
231
232 #[test]
233 fn test_nan_inf_skipped() {
234 let values: Vec<Option<f64>> = vec![
235 Some(1.0),
236 Some(f64::NAN),
237 Some(f64::INFINITY),
238 Some(2.0),
239 Some(3.0),
240 ];
241 let result = eval_zscore(&values, 0..5);
242 match result {
243 ScalarValue::Float64(Some(_)) => {}
244 other => panic!("expected Some(score), got {other:?}"),
245 }
246 }
247
248 #[test]
249 fn test_current_row_null() {
250 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0), None];
251 let result = eval_zscore(&values, 0..5);
252 assert_eq!(result, ScalarValue::Float64(None));
253 }
254
255 #[test]
256 fn test_known_zscore() {
257 let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), Some(10.0)];
261 let result = eval_zscore(&values, 0..5);
262 match result {
263 ScalarValue::Float64(Some(score)) => {
264 assert!((score - 2.0).abs() < 1e-10, "score={score}")
265 }
266 other => panic!("expected Some(2.0), got {other:?}"),
267 }
268 }
269
270 #[test]
271 fn test_udwf_creation() {
272 let udwf = WindowUDF::from(AnomalyScoreZscore::new());
273 assert_eq!(udwf.name(), "anomaly_score_zscore");
274 }
275
276 #[test]
279 fn test_sequential_evaluate_calls() {
280 let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(0.0), Some(0.0), Some(10.0)];
284 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
285 let full_range = 0..5;
286 let vals = std::slice::from_ref(&array);
287
288 let mut evaluator = AnomalyScoreZscoreEvaluator { current_row: 0 };
289
290 let r0 = evaluator.evaluate(vals, &full_range).unwrap();
292 assert_eq!(r0, ScalarValue::Float64(Some(0.5)));
293
294 let r1 = evaluator.evaluate(vals, &full_range).unwrap();
296 assert_eq!(r1, ScalarValue::Float64(Some(0.5)));
297
298 let _ = evaluator.evaluate(vals, &full_range).unwrap();
300 let _ = evaluator.evaluate(vals, &full_range).unwrap();
301
302 let r4 = evaluator.evaluate(vals, &full_range).unwrap();
304 assert_eq!(r4, ScalarValue::Float64(Some(2.0)));
305 }
306
307 #[test]
311 fn test_centered_window_frame() {
312 let values: Vec<Option<f64>> = vec![Some(0.0), Some(0.0), Some(10.0), Some(0.0), Some(0.0)];
315 let array = Arc::new(Float64Array::from(values)) as ArrayRef;
316
317 let mut evaluator = AnomalyScoreZscoreEvaluator { current_row: 2 };
322 let result = evaluator
323 .evaluate(std::slice::from_ref(&array), &(1..4))
324 .unwrap();
325 match result {
326 ScalarValue::Float64(Some(score)) => {
327 assert!(
329 (score - std::f64::consts::SQRT_2).abs() < 1e-10,
330 "expected ~1.414, got {score}"
331 );
332 }
333 other => panic!("expected Some(score), got {other:?}"),
334 }
335 }
336}