1mod aggr_over_time;
16mod changes;
17mod deriv;
18mod double_exponential_smoothing;
19mod extrapolate_rate;
20mod idelta;
21mod predict_linear;
22mod quantile;
23mod quantile_aggr;
24mod resets;
25mod round;
26#[cfg(test)]
27mod test_util;
28
29pub use aggr_over_time::{
30 AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime,
31 PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
32};
33pub use changes::Changes;
34use datafusion::arrow::array::{
35 ArrayRef, DictionaryArray, Float64Array, TimestampMillisecondArray,
36};
37use datafusion::error::DataFusionError;
38use datafusion::physical_plan::ColumnarValue;
39use datatypes::arrow::array::Array;
40use datatypes::arrow::datatypes::Int64Type;
41pub use deriv::Deriv;
42pub use double_exponential_smoothing::DoubleExponentialSmoothing;
43pub use extrapolate_rate::{Delta, Increase, Rate};
44pub use idelta::IDelta;
45pub use predict_linear::PredictLinear;
46pub use quantile::QuantileOverTime;
47pub use quantile_aggr::{QUANTILE_NAME, quantile_udaf};
48pub use resets::Resets;
49pub use round::Round;
50
51use crate::range_array::RangeArray;
52
53pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
57 match columnar_value {
58 ColumnarValue::Array(array) => Ok(array.clone()),
59 ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
60 }
61}
62
63pub(crate) fn extract_range_array(
65 columnar_value: &ColumnarValue,
66) -> Result<RangeArray, DataFusionError> {
67 let array = extract_array(columnar_value)?;
68 let dict = array
69 .as_any()
70 .downcast_ref::<DictionaryArray<Int64Type>>()
71 .ok_or_else(|| {
72 DataFusionError::Execution(format!(
73 "expected DictionaryArray<Int64>, found {}",
74 array.data_type()
75 ))
76 })?
77 .clone();
78 RangeArray::try_new(dict).map_err(DataFusionError::from)
79}
80
81pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) -> (f64, f64) {
88 let new_sum = sum + inc;
89 if sum.abs() >= inc.abs() {
90 compensation += (sum - new_sum) + inc;
91 } else {
92 compensation += (inc - new_sum) + sum;
93 }
94 (new_sum, compensation)
95}
96
97pub(crate) fn linear_regression(
101 times: &TimestampMillisecondArray,
102 values: &Float64Array,
103 intercept_time: i64,
104) -> (Option<f64>, Option<f64>) {
105 linear_regression_slice(times.values(), values, 0, values.len(), intercept_time)
106}
107
108pub(crate) fn linear_regression_slice(
109 times: &[i64],
110 values: &Float64Array,
111 offset: usize,
112 len: usize,
113 intercept_time: i64,
114) -> (Option<f64>, Option<f64>) {
115 linear_regression_slices(times, offset, values, offset, len, intercept_time)
116}
117
118pub(crate) fn linear_regression_slices(
119 times: &[i64],
120 time_offset: usize,
121 values: &Float64Array,
122 value_offset: usize,
123 len: usize,
124 intercept_time: i64,
125) -> (Option<f64>, Option<f64>) {
126 let raw_values = values.values();
127 let has_nulls = values.null_count() > 0;
128 let mut count: f64 = 0.0;
129 let mut sum_x: f64 = 0.0;
130 let mut sum_y: f64 = 0.0;
131 let mut sum_xy: f64 = 0.0;
132 let mut sum_x2: f64 = 0.0;
133 let mut comp_x: f64 = 0.0;
134 let mut comp_y: f64 = 0.0;
135 let mut comp_xy: f64 = 0.0;
136 let mut comp_x2: f64 = 0.0;
137
138 let mut const_y = true;
139 let mut init_y = None;
140
141 for i in 0..len {
142 let time_idx = time_offset + i;
143 let value_idx = value_offset + i;
144 if has_nulls && values.is_null(value_idx) {
145 continue;
146 }
147 let value = raw_values[value_idx];
148 let time = times[time_idx] as f64;
149 let initial = init_y.get_or_insert(value);
150 if const_y && count > 0.0 && value != *initial {
151 const_y = false;
152 }
153 count += 1.0;
154 let x = (time - intercept_time as f64) / 1e3f64;
155 (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
156 (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
157 (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
158 (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
159 }
160
161 if count < 2.0 {
162 return (None, None);
163 }
164
165 if const_y {
166 let init_y = init_y.unwrap();
167 if !init_y.is_finite() {
168 return (None, None);
169 }
170 return (Some(0.0), Some(init_y));
171 }
172
173 sum_x += comp_x;
174 sum_y += comp_y;
175 sum_xy += comp_xy;
176 sum_x2 += comp_x2;
177
178 let cov_xy = sum_xy - sum_x * sum_y / count;
179 let var_x = sum_x2 - sum_x * sum_x / count;
180
181 let slope = cov_xy / var_x;
182 let intercept = sum_y / count - slope * sum_x / count;
183
184 (Some(slope), Some(intercept))
185}
186
187#[cfg(test)]
188mod test {
189 use std::sync::Arc;
190
191 use datafusion::physical_plan::ColumnarValue;
192 use datatypes::arrow::array::Int64Array;
193 use datatypes::arrow::datatypes::Int64Type;
194
195 use super::*;
196 use crate::range_array::RangeArray;
197
198 #[test]
199 fn calculate_linear_regression_none() {
200 let ts_array = TimestampMillisecondArray::from_iter(
201 [
202 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
203 ]
204 .into_iter()
205 .map(Some),
206 );
207 let values_array = Float64Array::from_iter([
208 1.0 / 0.0,
209 1.0 / 0.0,
210 1.0 / 0.0,
211 1.0 / 0.0,
212 1.0 / 0.0,
213 1.0 / 0.0,
214 1.0 / 0.0,
215 1.0 / 0.0,
216 1.0 / 0.0,
217 1.0 / 0.0,
218 ]);
219 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
220 assert_eq!(slope, None);
221 assert_eq!(intercept, None);
222 }
223
224 #[test]
225 fn calculate_linear_regression_value_is_const() {
226 let ts_array = TimestampMillisecondArray::from_iter(
227 [
228 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
229 ]
230 .into_iter()
231 .map(Some),
232 );
233 let values_array =
234 Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]);
235 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
236 assert_eq!(slope, Some(0.0));
237 assert_eq!(intercept, Some(10.0));
238 }
239
240 #[test]
241 fn calculate_linear_regression() {
242 let ts_array = TimestampMillisecondArray::from_iter(
243 [
244 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
245 ]
246 .into_iter()
247 .map(Some),
248 );
249 let values_array = Float64Array::from_iter([
250 0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
251 ]);
252 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
253 assert_eq!(slope, Some(10.606060606060607));
254 assert_eq!(intercept, Some(6.818181818181815));
255
256 let (slope, intercept) = linear_regression(&ts_array, &values_array, 3000);
257 assert_eq!(slope, Some(10.606060606060607));
258 assert_eq!(intercept, Some(38.63636363636364));
259 }
260
261 #[test]
262 fn calculate_linear_regression_value_have_none() {
263 let ts_array = TimestampMillisecondArray::from_iter(
264 [
265 0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000,
266 ]
267 .into_iter()
268 .map(Some),
269 );
270 let values_array: Float64Array = [
271 Some(0.0),
272 Some(10.0),
273 Some(20.0),
274 Some(30.0),
275 Some(40.0),
276 None,
277 Some(0.0),
278 Some(10.0),
279 Some(20.0),
280 Some(30.0),
281 None,
282 Some(40.0),
283 Some(50.0),
284 ]
285 .into_iter()
286 .collect();
287 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
288 assert_eq!(slope, Some(10.606060606060607));
289 assert_eq!(intercept, Some(6.818181818181815));
290 }
291
292 #[test]
293 fn calculate_linear_regression_value_all_none() {
294 let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some));
295 let values_array: Float64Array = [None, None, None].into_iter().collect();
296 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
297 assert_eq!(slope, None);
298 assert_eq!(intercept, None);
299 }
300
301 #[test]
303 fn test_kahan_sum() {
304 let inputs = vec![1.0, 10.0f64.powf(100.0), 1.0, -10.0f64.powf(100.0)];
305
306 let mut sum = 0.0;
307 let mut c = 0f64;
308
309 for v in inputs {
310 (sum, c) = compensated_sum_inc(v, sum, c);
311 }
312 assert_eq!(sum + c, 2.0)
313 }
314
315 #[test]
316 fn extract_range_array_rejects_external_dictionary_with_null_keys() {
317 let keys = Int64Array::from_iter([Some(0), None]);
318 let values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
319 let dict = DictionaryArray::<Int64Type>::try_new(keys, values).unwrap();
320
321 let err = extract_range_array(&ColumnarValue::Array(Arc::new(dict))).unwrap_err();
322 assert!(err.to_string().contains("Empty range is not expected"));
323 }
324
325 #[test]
326 fn extract_range_array_accepts_internal_packed_ranges() {
327 let values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
328 let range_array = RangeArray::from_ranges(values, [(0, 2), (1, 2)]).unwrap();
329
330 let extracted =
331 extract_range_array(&ColumnarValue::Array(Arc::new(range_array.into_dict()))).unwrap();
332
333 assert_eq!(extracted.get_offset_length(0), Some((0, 2)));
334 assert_eq!(extracted.get_offset_length(1), Some((1, 2)));
335 }
336}