1use std::sync::Arc;
16
17use common_macro::range_fn;
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::common::DataFusionError;
20use datafusion::logical_expr::{ScalarUDF, Volatility};
21use datafusion::physical_plan::ColumnarValue;
22use datatypes::arrow::array::Array;
23use datatypes::arrow::compute;
24use datatypes::arrow::datatypes::DataType;
25
26use crate::functions::{compensated_sum_inc, extract_array};
27use crate::range_array::RangeArray;
28
29#[range_fn(
31 name = AvgOverTime,
32 ret = Float64Array,
33 display_name = prom_avg_over_time
34)]
35pub fn avg_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
36 compute::sum(values).map(|result| result / values.len() as f64)
37}
38
39#[range_fn(
41 name = MinOverTime,
42 ret = Float64Array,
43 display_name = prom_min_over_time
44)]
45pub fn min_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
46 compute::min(values)
47}
48
49#[range_fn(
51 name = MaxOverTime,
52 ret = Float64Array,
53 display_name = prom_max_over_time
54)]
55pub fn max_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
56 compute::max(values)
57}
58
59#[range_fn(
61 name = SumOverTime,
62 ret = Float64Array,
63 display_name = prom_sum_over_time
64)]
65pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
66 compute::sum(values)
67}
68
69#[range_fn(
71 name = CountOverTime,
72 ret = Float64Array,
73 display_name = prom_count_over_time
74)]
75pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
76 if values.is_empty() {
77 None
78 } else {
79 Some(values.len() as f64)
80 }
81}
82
83#[range_fn(
85 name = LastOverTime,
86 ret = Float64Array,
87 display_name = prom_last_over_time
88)]
89pub fn last_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
90 values.values().last().copied()
91}
92
93#[range_fn(
97 name = AbsentOverTime,
98 ret = Float64Array,
99 display_name = prom_absent_over_time
100)]
101pub fn absent_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
102 if values.is_empty() {
103 Some(1.0)
104 } else {
105 None
106 }
107}
108
109#[range_fn(
111 name = PresentOverTime,
112 ret = Float64Array,
113 display_name = prom_present_over_time
114)]
115pub fn present_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
116 if values.is_empty() {
117 None
118 } else {
119 Some(1.0)
120 }
121}
122
123#[range_fn(
127 name = StdvarOverTime,
128 ret = Float64Array,
129 display_name = prom_stdvar_over_time
130)]
131pub fn stdvar_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
132 if values.is_empty() {
133 None
134 } else {
135 let mut count = 0;
136 let mut mean: f64 = 0.0;
137 let mut result: f64 = 0.0;
138 for value in values {
139 let value = value.unwrap();
140 let new_count = count + 1;
141 let delta1 = value - mean;
142 let new_mean = delta1 / new_count as f64 + mean;
143 let delta2 = value - new_mean;
144 let new_result = result + delta1 * delta2;
145
146 count += 1;
147 mean = new_mean;
148 result = new_result;
149 }
150 Some(result / count as f64)
151 }
152}
153
154#[range_fn(
157 name = StddevOverTime,
158 ret = Float64Array,
159 display_name = prom_stddev_over_time
160)]
161pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
162 if values.is_empty() {
163 None
164 } else {
165 let mut count = 0.0;
166 let mut mean = 0.0;
167 let mut comp_mean = 0.0;
168 let mut deviations_sum_sq = 0.0;
169 let mut comp_deviations_sum_sq = 0.0;
170 for v in values {
171 count += 1.0;
172 let current_value = v.unwrap();
173 let delta = current_value - (mean + comp_mean);
174 let (new_mean, new_comp_mean) = compensated_sum_inc(delta / count, mean, comp_mean);
175 mean = new_mean;
176 comp_mean = new_comp_mean;
177 let (new_deviations_sum_sq, new_comp_deviations_sum_sq) = compensated_sum_inc(
178 delta * (current_value - (mean + comp_mean)),
179 deviations_sum_sq,
180 comp_deviations_sum_sq,
181 );
182 deviations_sum_sq = new_deviations_sum_sq;
183 comp_deviations_sum_sq = new_comp_deviations_sum_sq;
184 }
185 Some(((deviations_sum_sq + comp_deviations_sum_sq) / count).sqrt())
186 }
187}
188
189#[cfg(test)]
190mod test {
191 use super::*;
192 use crate::functions::test_util::simple_range_udf_runner;
193
194 fn build_test_range_arrays() -> (RangeArray, RangeArray) {
196 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
197 [
198 1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 200000, 500000,
199 ]
200 .into_iter()
201 .map(Some),
202 ));
203 let ranges = [
204 (0, 2),
205 (0, 5),
206 (1, 1), (2, 0), (2, 0), (3, 3),
210 (4, 3),
211 (5, 3),
212 (8, 1), (9, 0), ];
215
216 let values_array = Arc::new(Float64Array::from_iter([
217 12.345678, 87.654321, 31.415927, 27.182818, 70.710678, 41.421356, 57.735027, 69.314718,
218 98.019802, 1.98019802, 61.803399,
219 ]));
220
221 let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
222 let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
223
224 (ts_range_array, value_range_array)
225 }
226
227 #[test]
228 fn calculate_avg_over_time() {
229 let (ts_array, value_array) = build_test_range_arrays();
230 simple_range_udf_runner(
231 AvgOverTime::scalar_udf(),
232 ts_array,
233 value_array,
234 vec![],
235 vec![
236 Some(49.9999995),
237 Some(45.8618844),
238 Some(87.654321),
239 None,
240 None,
241 Some(46.438284),
242 Some(56.62235366666667),
243 Some(56.15703366666667),
244 Some(98.019802),
245 None,
246 ],
247 );
248 }
249
250 #[test]
251 fn calculate_min_over_time() {
252 let (ts_array, value_array) = build_test_range_arrays();
253 simple_range_udf_runner(
254 MinOverTime::scalar_udf(),
255 ts_array,
256 value_array,
257 vec![],
258 vec![
259 Some(12.345678),
260 Some(12.345678),
261 Some(87.654321),
262 None,
263 None,
264 Some(27.182818),
265 Some(41.421356),
266 Some(41.421356),
267 Some(98.019802),
268 None,
269 ],
270 );
271 }
272
273 #[test]
274 fn calculate_max_over_time() {
275 let (ts_array, value_array) = build_test_range_arrays();
276 simple_range_udf_runner(
277 MaxOverTime::scalar_udf(),
278 ts_array,
279 value_array,
280 vec![],
281 vec![
282 Some(87.654321),
283 Some(87.654321),
284 Some(87.654321),
285 None,
286 None,
287 Some(70.710678),
288 Some(70.710678),
289 Some(69.314718),
290 Some(98.019802),
291 None,
292 ],
293 );
294 }
295
296 #[test]
297 fn calculate_sum_over_time() {
298 let (ts_array, value_array) = build_test_range_arrays();
299 simple_range_udf_runner(
300 SumOverTime::scalar_udf(),
301 ts_array,
302 value_array,
303 vec![],
304 vec![
305 Some(99.999999),
306 Some(229.309422),
307 Some(87.654321),
308 None,
309 None,
310 Some(139.314852),
311 Some(169.867061),
312 Some(168.471101),
313 Some(98.019802),
314 None,
315 ],
316 );
317 }
318
319 #[test]
320 fn calculate_count_over_time() {
321 let (ts_array, value_array) = build_test_range_arrays();
322 simple_range_udf_runner(
323 CountOverTime::scalar_udf(),
324 ts_array,
325 value_array,
326 vec![],
327 vec![
328 Some(2.0),
329 Some(5.0),
330 Some(1.0),
331 None,
332 None,
333 Some(3.0),
334 Some(3.0),
335 Some(3.0),
336 Some(1.0),
337 None,
338 ],
339 );
340 }
341
342 #[test]
343 fn calculate_last_over_time() {
344 let (ts_array, value_array) = build_test_range_arrays();
345 simple_range_udf_runner(
346 LastOverTime::scalar_udf(),
347 ts_array,
348 value_array,
349 vec![],
350 vec![
351 Some(87.654321),
352 Some(70.710678),
353 Some(87.654321),
354 None,
355 None,
356 Some(41.421356),
357 Some(57.735027),
358 Some(69.314718),
359 Some(98.019802),
360 None,
361 ],
362 );
363 }
364
365 #[test]
366 fn calculate_absent_over_time() {
367 let (ts_array, value_array) = build_test_range_arrays();
368 simple_range_udf_runner(
369 AbsentOverTime::scalar_udf(),
370 ts_array,
371 value_array,
372 vec![],
373 vec![
374 None,
375 None,
376 None,
377 Some(1.0),
378 Some(1.0),
379 None,
380 None,
381 None,
382 None,
383 Some(1.0),
384 ],
385 );
386 }
387
388 #[test]
389 fn calculate_present_over_time() {
390 let (ts_array, value_array) = build_test_range_arrays();
391 simple_range_udf_runner(
392 PresentOverTime::scalar_udf(),
393 ts_array,
394 value_array,
395 vec![],
396 vec![
397 Some(1.0),
398 Some(1.0),
399 Some(1.0),
400 None,
401 None,
402 Some(1.0),
403 Some(1.0),
404 Some(1.0),
405 Some(1.0),
406 None,
407 ],
408 );
409 }
410
411 #[test]
412 fn calculate_stdvar_over_time() {
413 let (ts_array, value_array) = build_test_range_arrays();
414 simple_range_udf_runner(
415 StdvarOverTime::scalar_udf(),
416 ts_array,
417 value_array,
418 vec![],
419 vec![
420 Some(1417.8479276253622),
421 Some(808.999919713209),
422 Some(0.0),
423 None,
424 None,
425 Some(328.3638826418587),
426 Some(143.5964181766362),
427 Some(130.91830542386285),
428 Some(0.0),
429 None,
430 ],
431 );
432
433 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
435 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
436 .into_iter()
437 .map(Some),
438 ));
439 let values_array = Arc::new(Float64Array::from_iter([
440 1.5990505637277868,
441 1.5990505637277868,
442 1.5990505637277868,
443 0.0,
444 8.0,
445 8.0,
446 2.0,
447 3.0,
448 ]));
449 let ranges = [(0, 3), (3, 5)];
450 simple_range_udf_runner(
451 StdvarOverTime::scalar_udf(),
452 RangeArray::from_ranges(ts_array, ranges).unwrap(),
453 RangeArray::from_ranges(values_array, ranges).unwrap(),
454 vec![],
455 vec![Some(0.0), Some(10.559999999999999)],
456 );
457 }
458
459 #[test]
460 fn calculate_std_dev_over_time() {
461 let (ts_array, value_array) = build_test_range_arrays();
462 simple_range_udf_runner(
463 StddevOverTime::scalar_udf(),
464 ts_array,
465 value_array,
466 vec![],
467 vec![
468 Some(37.6543215),
469 Some(28.442923895289123),
470 Some(0.0),
471 None,
472 None,
473 Some(18.12081352042062),
474 Some(11.983172291869804),
475 Some(11.441953741554055),
476 Some(0.0),
477 None,
478 ],
479 );
480
481 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
483 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
484 .into_iter()
485 .map(Some),
486 ));
487 let values_array = Arc::new(Float64Array::from_iter([
488 1.5990505637277868,
489 1.5990505637277868,
490 1.5990505637277868,
491 0.0,
492 8.0,
493 8.0,
494 2.0,
495 3.0,
496 ]));
497 let ranges = [(0, 3), (3, 5)];
498 simple_range_udf_runner(
499 StddevOverTime::scalar_udf(),
500 RangeArray::from_ranges(ts_array, ranges).unwrap(),
501 RangeArray::from_ranges(values_array, ranges).unwrap(),
502 vec![],
503 vec![Some(0.0), Some(3.249615361854384)],
504 );
505 }
506}