common_function/scalars/anomaly/
iqr.rs1use std::any::Any;
28use std::fmt::Debug;
29use std::ops::Range;
30use std::sync::Arc;
31
32use arrow::array::{Array, ArrayRef, Float64Array};
33use arrow::datatypes::{DataType, Field, FieldRef};
34use datafusion_common::{DataFusionError, Result, ScalarValue};
35use datafusion_expr::type_coercion::aggregates::NUMERICS;
36use datafusion_expr::{PartitionEvaluator, Signature, Volatility, WindowUDFImpl};
37use datafusion_functions_window_common::field::WindowUDFFieldArgs;
38use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
39
40use crate::scalars::anomaly::utils::{cast_to_f64, collect_window_values, percentile_sorted};
41
42const MIN_SAMPLES: usize = 3;
44
45#[derive(Debug, Clone, PartialEq, Eq, Hash)]
46pub struct AnomalyScoreIqr {
47 signature: Signature,
48}
49
50impl AnomalyScoreIqr {
51 pub fn new() -> Self {
52 Self {
53 signature: Signature::uniform(2, NUMERICS.to_vec(), Volatility::Immutable),
54 }
55 }
56}
57
58impl WindowUDFImpl for AnomalyScoreIqr {
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn name(&self) -> &str {
64 "anomaly_score_iqr"
65 }
66
67 fn signature(&self) -> &Signature {
68 &self.signature
69 }
70
71 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
72 Ok(Arc::new(Field::new(
73 field_args.name(),
74 DataType::Float64,
75 true, )))
77 }
78
79 fn partition_evaluator(
80 &self,
81 _partition_evaluator_args: PartitionEvaluatorArgs,
82 ) -> Result<Box<dyn PartitionEvaluator>> {
83 Ok(Box::new(AnomalyScoreIqrEvaluator { current_row: 0 }))
84 }
85}
86
87#[derive(Debug)]
88struct AnomalyScoreIqrEvaluator {
89 current_row: usize,
91}
92
93impl PartitionEvaluator for AnomalyScoreIqrEvaluator {
94 fn uses_window_frame(&self) -> bool {
95 true
96 }
97
98 fn supports_bounded_execution(&self) -> bool {
99 false
100 }
101
102 fn evaluate(&mut self, values: &[ArrayRef], range: &Range<usize>) -> Result<ScalarValue> {
103 let values_f64 = cast_to_f64(&values[0])?;
105 let array = values_f64
106 .as_any()
107 .downcast_ref::<Float64Array>()
108 .ok_or_else(|| {
109 DataFusionError::Internal(format!(
110 "Expected Float64Array, got: {:?}",
111 values_f64.data_type()
112 ))
113 })?;
114
115 let k_f64 = cast_to_f64(&values[1])?;
117 let k_array = k_f64
118 .as_any()
119 .downcast_ref::<Float64Array>()
120 .ok_or_else(|| {
121 DataFusionError::Internal(format!(
122 "Expected Float64Array for k, got: {:?}",
123 k_f64.data_type()
124 ))
125 })?;
126
127 let current_idx = self.current_row;
129 self.current_row += 1;
130
131 if current_idx >= array.len()
133 || !array.is_valid(current_idx)
134 || !array.value(current_idx).is_finite()
135 {
136 return Ok(ScalarValue::Float64(None));
137 }
138 let current_value = array.value(current_idx);
139
140 if current_idx >= k_array.len()
142 || !k_array.is_valid(current_idx)
143 || !k_array.value(current_idx).is_finite()
144 {
145 return Ok(ScalarValue::Float64(None));
146 }
147 let k = k_array.value(current_idx);
148 if k < 0.0 {
149 return Ok(ScalarValue::Float64(None));
150 }
151
152 let mut window_values = collect_window_values(array, range);
153 if window_values.len() < MIN_SAMPLES {
154 return Ok(ScalarValue::Float64(None));
155 }
156
157 window_values.sort_unstable_by(|a, b| a.partial_cmp(b).unwrap());
159
160 let q1 = match percentile_sorted(&window_values, 0.25) {
161 Some(v) => v,
162 None => return Ok(ScalarValue::Float64(None)),
163 };
164 let q3 = match percentile_sorted(&window_values, 0.75) {
165 Some(v) => v,
166 None => return Ok(ScalarValue::Float64(None)),
167 };
168 let iqr = q3 - q1;
169
170 let lower_fence = q1 - k * iqr;
171 let upper_fence = q3 + k * iqr;
172
173 let score = if current_value < lower_fence {
174 (lower_fence - current_value) / iqr
175 } else if current_value > upper_fence {
176 (current_value - upper_fence) / iqr
177 } else {
178 0.0
179 };
180
181 Ok(ScalarValue::Float64(Some(score)))
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use std::sync::Arc;
188
189 use arrow::array::Float64Array;
190 use datafusion_expr::WindowUDF;
191
192 use super::*;
193
194 fn eval_iqr(values: &[Option<f64>], k: f64, range: Range<usize>) -> ScalarValue {
195 let array = Arc::new(Float64Array::from(values.to_vec())) as ArrayRef;
196 let k_array = Arc::new(Float64Array::from(vec![Some(k); values.len()])) as ArrayRef;
197 let current_row = range.end.saturating_sub(1);
198 let mut evaluator = AnomalyScoreIqrEvaluator { current_row };
199 evaluator.evaluate(&[array, k_array], &range).unwrap()
200 }
201
202 #[test]
203 fn test_basic_outlier() {
204 let mut values: Vec<Option<f64>> = (1..=10).map(|x| Some(x as f64)).collect();
206 values.push(Some(100.0));
207 let result = eval_iqr(&values, 1.5, 0..11);
208 match result {
209 ScalarValue::Float64(Some(score)) => assert!(score > 0.0, "score={score}"),
210 other => panic!("expected Some(score), got {other:?}"),
211 }
212 }
213
214 #[test]
215 fn test_value_within_fences() {
216 let values: Vec<Option<f64>> = (1..=10).map(|x| Some(x as f64)).collect();
217 let result = eval_iqr(&values, 1.5, 0..10);
219 match result {
220 ScalarValue::Float64(Some(score)) => {
221 assert!(score >= 0.0, "score should be non-negative, got {score}");
222 }
223 other => panic!("expected Some(score), got {other:?}"),
224 }
225 }
226
227 #[test]
228 fn test_constant_sequence() {
229 let values: Vec<Option<f64>> = vec![Some(5.0); 10];
230 let result = eval_iqr(&values, 1.5, 0..10);
232 assert_eq!(result, ScalarValue::Float64(Some(0.0)));
233 }
234
235 #[test]
236 fn test_zero_iqr_outlier_is_infinite() {
237 let values: Vec<Option<f64>> =
239 vec![Some(1.0), Some(1.0), Some(1.0), Some(1.0), Some(100.0)];
240 let result = eval_iqr(&values, 1.5, 0..5);
241 match result {
242 ScalarValue::Float64(Some(score)) => {
243 assert!(score.is_infinite() && score.is_sign_positive())
244 }
245 other => panic!("expected Some(+inf), got {other:?}"),
246 }
247 }
248
249 #[test]
250 fn test_all_null() {
251 let values: Vec<Option<f64>> = vec![None; 5];
252 let result = eval_iqr(&values, 1.5, 0..5);
253 assert_eq!(result, ScalarValue::Float64(None));
254 }
255
256 #[test]
257 fn test_insufficient_samples() {
258 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0)];
260 let result = eval_iqr(&values, 1.5, 0..2);
261 assert_eq!(result, ScalarValue::Float64(None));
262 }
263
264 #[test]
265 fn test_negative_k() {
266 let values: Vec<Option<f64>> =
267 vec![Some(48.0), Some(49.0), Some(50.0), Some(51.0), Some(52.0)];
268 let result = eval_iqr(&values, -1.0, 0..5);
269 assert_eq!(result, ScalarValue::Float64(None));
270 }
271
272 #[test]
273 fn test_current_row_null() {
274 let values: Vec<Option<f64>> = vec![Some(1.0), Some(2.0), Some(3.0), None];
275 let result = eval_iqr(&values, 1.5, 0..4);
276 assert_eq!(result, ScalarValue::Float64(None));
277 }
278
279 #[test]
280 fn test_lower_outlier() {
281 let values: Vec<Option<f64>> = vec![
283 Some(48.0),
284 Some(49.0),
285 Some(50.0),
286 Some(51.0),
287 Some(52.0),
288 Some(-100.0),
289 ];
290 let result = eval_iqr(&values, 1.5, 0..6);
291 match result {
292 ScalarValue::Float64(Some(score)) => assert!(score > 0.0, "score={score}"),
293 other => panic!("expected Some(score), got {other:?}"),
294 }
295
296 let values: Vec<Option<f64>> = vec![
298 Some(48.0),
299 Some(49.0),
300 Some(50.0),
301 Some(51.0),
302 Some(52.0),
303 Some(-100.0),
304 Some(50.0),
305 ];
306 let result = eval_iqr(&values, 1.5, 0..7);
307 assert_eq!(result, ScalarValue::Float64(Some(0.0)));
308 }
309
310 #[test]
311 fn test_udwf_creation() {
312 let udwf = WindowUDF::from(AnomalyScoreIqr::new());
313 assert_eq!(udwf.name(), "anomaly_score_iqr");
314 }
315}