1mod aggr_over_time;
16mod changes;
17mod deriv;
18mod extrapolate_rate;
19mod holt_winters;
20mod idelta;
21mod predict_linear;
22mod quantile;
23mod quantile_aggr;
24mod resets;
25mod round;
26#[cfg(test)]
27mod test_util;
28
29pub use aggr_over_time::{
30 AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime,
31 PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
32};
33pub use changes::Changes;
34use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
35use datafusion::error::DataFusionError;
36use datafusion::physical_plan::ColumnarValue;
37pub use deriv::Deriv;
38pub use extrapolate_rate::{Delta, Increase, Rate};
39pub use holt_winters::HoltWinters;
40pub use idelta::IDelta;
41pub use predict_linear::PredictLinear;
42pub use quantile::QuantileOverTime;
43pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
44pub use resets::Resets;
45pub use round::Round;
46
47pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
51 match columnar_value {
52 ColumnarValue::Array(array) => Ok(array.clone()),
53 ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
54 }
55}
56
57pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) -> (f64, f64) {
64 let new_sum = sum + inc;
65 if sum.abs() >= inc.abs() {
66 compensation += (sum - new_sum) + inc;
67 } else {
68 compensation += (inc - new_sum) + sum;
69 }
70 (new_sum, compensation)
71}
72
73pub(crate) fn linear_regression(
77 times: &TimestampMillisecondArray,
78 values: &Float64Array,
79 intercept_time: i64,
80) -> (Option<f64>, Option<f64>) {
81 let mut count: f64 = 0.0;
82 let mut sum_x: f64 = 0.0;
83 let mut sum_y: f64 = 0.0;
84 let mut sum_xy: f64 = 0.0;
85 let mut sum_x2: f64 = 0.0;
86 let mut comp_x: f64 = 0.0;
87 let mut comp_y: f64 = 0.0;
88 let mut comp_xy: f64 = 0.0;
89 let mut comp_x2: f64 = 0.0;
90
91 let mut const_y = true;
92 let init_y: f64 = values.value(0);
93
94 for (i, value) in values.iter().enumerate() {
95 let time = times.value(i) as f64;
96 if value.is_none() {
97 continue;
98 }
99 let value = value.unwrap();
100 if const_y && i > 0 && value != init_y {
101 const_y = false;
102 }
103 count += 1.0;
104 let x = (time - intercept_time as f64) / 1e3f64;
105 (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
106 (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
107 (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
108 (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
109 }
110
111 if count < 2.0 {
112 return (None, None);
113 }
114
115 if const_y {
116 if !init_y.is_finite() {
117 return (None, None);
118 }
119 return (Some(0.0), Some(init_y));
120 }
121
122 sum_x += comp_x;
123 sum_y += comp_y;
124 sum_xy += comp_xy;
125 sum_x2 += comp_x2;
126
127 let cov_xy = sum_xy - sum_x * sum_y / count;
128 let var_x = sum_x2 - sum_x * sum_x / count;
129
130 let slope = cov_xy / var_x;
131 let intercept = sum_y / count - slope * sum_x / count;
132
133 (Some(slope), Some(intercept))
134}
135
136#[cfg(test)]
137mod test {
138 use super::*;
139
140 #[test]
141 fn calculate_linear_regression_none() {
142 let ts_array = TimestampMillisecondArray::from_iter(
143 [
144 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
145 ]
146 .into_iter()
147 .map(Some),
148 );
149 let values_array = Float64Array::from_iter([
150 1.0 / 0.0,
151 1.0 / 0.0,
152 1.0 / 0.0,
153 1.0 / 0.0,
154 1.0 / 0.0,
155 1.0 / 0.0,
156 1.0 / 0.0,
157 1.0 / 0.0,
158 1.0 / 0.0,
159 1.0 / 0.0,
160 ]);
161 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
162 assert_eq!(slope, None);
163 assert_eq!(intercept, None);
164 }
165
166 #[test]
167 fn calculate_linear_regression_value_is_const() {
168 let ts_array = TimestampMillisecondArray::from_iter(
169 [
170 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
171 ]
172 .into_iter()
173 .map(Some),
174 );
175 let values_array =
176 Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]);
177 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
178 assert_eq!(slope, Some(0.0));
179 assert_eq!(intercept, Some(10.0));
180 }
181
182 #[test]
183 fn calculate_linear_regression() {
184 let ts_array = TimestampMillisecondArray::from_iter(
185 [
186 0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
187 ]
188 .into_iter()
189 .map(Some),
190 );
191 let values_array = Float64Array::from_iter([
192 0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
193 ]);
194 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
195 assert_eq!(slope, Some(10.606060606060607));
196 assert_eq!(intercept, Some(6.818181818181815));
197
198 let (slope, intercept) = linear_regression(&ts_array, &values_array, 3000);
199 assert_eq!(slope, Some(10.606060606060607));
200 assert_eq!(intercept, Some(38.63636363636364));
201 }
202
203 #[test]
204 fn calculate_linear_regression_value_have_none() {
205 let ts_array = TimestampMillisecondArray::from_iter(
206 [
207 0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000,
208 ]
209 .into_iter()
210 .map(Some),
211 );
212 let values_array: Float64Array = [
213 Some(0.0),
214 Some(10.0),
215 Some(20.0),
216 Some(30.0),
217 Some(40.0),
218 None,
219 Some(0.0),
220 Some(10.0),
221 Some(20.0),
222 Some(30.0),
223 None,
224 Some(40.0),
225 Some(50.0),
226 ]
227 .into_iter()
228 .collect();
229 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
230 assert_eq!(slope, Some(10.606060606060607));
231 assert_eq!(intercept, Some(6.818181818181815));
232 }
233
234 #[test]
235 fn calculate_linear_regression_value_all_none() {
236 let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some));
237 let values_array: Float64Array = [None, None, None].into_iter().collect();
238 let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
239 assert_eq!(slope, None);
240 assert_eq!(intercept, None);
241 }
242
243 #[test]
245 fn test_kahan_sum() {
246 let inputs = vec![1.0, 10.0f64.powf(100.0), 1.0, -1.0 * 10.0f64.powf(100.0)];
247
248 let mut sum = 0.0;
249 let mut c = 0f64;
250
251 for v in inputs {
252 (sum, c) = compensated_sum_inc(v, sum, c);
253 }
254 assert_eq!(sum + c, 2.0)
255 }
256}