common_function/scalars/anomaly/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Shared statistical utilities for anomaly detection window functions.
16
17use std::ops::Range;
18
19use arrow::array::{Array, ArrayRef, Float64Array};
20use arrow::compute;
21use arrow::datatypes::DataType;
22use datafusion_common::DataFusionError;
23
24/// Cast an ArrayRef to Float64. Returns the array as-is if already Float64.
25pub fn cast_to_f64(array: &ArrayRef) -> datafusion_common::Result<ArrayRef> {
26    if array.data_type() == &DataType::Float64 {
27        return Ok(array.clone());
28    }
29    compute::cast(array, &DataType::Float64)
30        .map_err(|e| DataFusionError::Internal(format!("Failed to cast to Float64: {e}")))
31}
32
33/// Collect valid f64 values from a Float64Array within the given range,
34/// skipping NULL, NaN, and ±Inf values.
35pub fn collect_window_values(array: &Float64Array, range: &Range<usize>) -> Vec<f64> {
36    let mut values = Vec::with_capacity(range.len());
37    for i in range.clone() {
38        if array.is_valid(i) {
39            let v = array.value(i);
40            if v.is_finite() {
41                values.push(v);
42            }
43        }
44    }
45    values
46}
47
48/// Compute median of a mutable slice using O(n) selection algorithm.
49///
50/// The input slice will be partially reordered.
51/// Returns `None` if the slice is empty.
52pub fn median_f64(values: &mut [f64]) -> Option<f64> {
53    let len = values.len();
54    if len == 0 {
55        return None;
56    }
57    let mid = len / 2;
58    let (lower, median, _) = values.select_nth_unstable_by(mid, |a, b| a.total_cmp(b));
59    if len % 2 == 1 {
60        Some(*median)
61    } else {
62        let right = *median;
63        // For even length, find the max of the left half (all elements before mid).
64        let left = lower.iter().copied().max_by(|a, b| a.total_cmp(b)).unwrap();
65        Some((left + right) / 2.0)
66    }
67}
68
69/// Compute percentile on a sorted slice using linear interpolation.
70///
71/// `p` should be in [0.0, 1.0]. The slice must be sorted in ascending order.
72/// Returns `None` if the slice is empty.
73pub fn percentile_sorted(sorted: &[f64], p: f64) -> Option<f64> {
74    let len = sorted.len();
75    if len == 0 {
76        return None;
77    }
78    if len == 1 {
79        return Some(sorted[0]);
80    }
81    let idx = p * (len - 1) as f64;
82    let lower = idx.floor() as usize;
83    let upper = idx.ceil() as usize;
84    if lower == upper {
85        Some(sorted[lower])
86    } else {
87        let frac = idx - lower as f64;
88        Some(sorted[lower] * (1.0 - frac) + sorted[upper] * frac)
89    }
90}
91
92/// Compute a non-negative anomaly score ratio with stable zero-denominator semantics.
93///
94/// When `scale == 0.0`:
95/// - returns `0.0` if `distance == 0.0` (on-center value),
96/// - returns `+inf` if `distance > 0.0` (off-center value under zero spread).
97#[inline]
98pub fn anomaly_ratio(distance: f64, scale: f64) -> f64 {
99    if scale == 0.0 {
100        if distance == 0.0 { 0.0 } else { f64::INFINITY }
101    } else {
102        distance / scale
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use super::*;
109
110    #[test]
111    fn test_median_odd() {
112        let mut v = vec![5.0, 1.0, 3.0, 2.0, 4.0];
113        assert_eq!(median_f64(&mut v), Some(3.0));
114    }
115
116    #[test]
117    fn test_median_even() {
118        let mut v = vec![4.0, 1.0, 3.0, 2.0];
119        assert_eq!(median_f64(&mut v), Some(2.5));
120    }
121
122    #[test]
123    fn test_median_single() {
124        let mut v = vec![42.0];
125        assert_eq!(median_f64(&mut v), Some(42.0));
126    }
127
128    #[test]
129    fn test_median_empty() {
130        let mut v: Vec<f64> = vec![];
131        assert_eq!(median_f64(&mut v), None);
132    }
133
134    #[test]
135    fn test_percentile_sorted_quartiles() {
136        let sorted = vec![1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0];
137        let q1 = percentile_sorted(&sorted, 0.25).unwrap();
138        let q3 = percentile_sorted(&sorted, 0.75).unwrap();
139        assert!((q1 - 3.25).abs() < 1e-10);
140        assert!((q3 - 7.75).abs() < 1e-10);
141    }
142
143    #[test]
144    fn test_percentile_sorted_empty() {
145        assert_eq!(percentile_sorted(&[], 0.5), None);
146    }
147
148    #[test]
149    fn test_collect_window_values_filters_invalid() {
150        let array = Float64Array::from(vec![
151            Some(1.0),
152            None,
153            Some(f64::NAN),
154            Some(f64::INFINITY),
155            Some(f64::NEG_INFINITY),
156            Some(2.0),
157            Some(3.0),
158        ]);
159        let values = collect_window_values(&array, &(0..7));
160        assert_eq!(values, vec![1.0, 2.0, 3.0]);
161    }
162
163    #[test]
164    fn test_anomaly_ratio_zero_scale() {
165        assert_eq!(anomaly_ratio(0.0, 0.0), 0.0);
166        assert!(anomaly_ratio(1.0, 0.0).is_infinite());
167    }
168}