1use std::sync::Arc;
16
17use common_macro::range_fn;
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::common::DataFusionError;
20use datafusion::logical_expr::{ScalarUDF, Volatility};
21use datafusion::physical_plan::ColumnarValue;
22use datatypes::arrow::array::Array;
23use datatypes::arrow::compute;
24use datatypes::arrow::datatypes::DataType;
25
26use crate::functions::{compensated_sum_inc, extract_array};
27use crate::range_array::RangeArray;
28
29#[range_fn(
31 name = AvgOverTime,
32 ret = Float64Array,
33 display_name = prom_avg_over_time
34)]
35pub fn avg_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
36 compute::sum(values).map(|result| result / values.len() as f64)
37}
38
39#[range_fn(
41 name = MinOverTime,
42 ret = Float64Array,
43 display_name = prom_min_over_time
44)]
45pub fn min_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
46 compute::min(values)
47}
48
49#[range_fn(
51 name = MaxOverTime,
52 ret = Float64Array,
53 display_name = prom_max_over_time
54)]
55pub fn max_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
56 compute::max(values)
57}
58
59#[range_fn(
61 name = SumOverTime,
62 ret = Float64Array,
63 display_name = prom_sum_over_time
64)]
65pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
66 compute::sum(values)
67}
68
69#[range_fn(
71 name = CountOverTime,
72 ret = Float64Array,
73 display_name = prom_count_over_time
74)]
75pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
76 if values.is_empty() {
77 None
78 } else {
79 Some(values.len() as f64)
80 }
81}
82
83#[range_fn(
85 name = LastOverTime,
86 ret = Float64Array,
87 display_name = prom_last_over_time
88)]
89pub fn last_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
90 values.values().last().copied()
91}
92
93#[range_fn(
97 name = AbsentOverTime,
98 ret = Float64Array,
99 display_name = prom_absent_over_time
100)]
101pub fn absent_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
102 if values.is_empty() { Some(1.0) } else { None }
103}
104
105#[range_fn(
107 name = PresentOverTime,
108 ret = Float64Array,
109 display_name = prom_present_over_time
110)]
111pub fn present_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
112 if values.is_empty() { None } else { Some(1.0) }
113}
114
115#[range_fn(
119 name = StdvarOverTime,
120 ret = Float64Array,
121 display_name = prom_stdvar_over_time
122)]
123pub fn stdvar_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
124 if values.is_empty() {
125 None
126 } else {
127 let mut count = 0;
128 let mut mean: f64 = 0.0;
129 let mut result: f64 = 0.0;
130 for value in values {
131 let value = value.unwrap();
132 let new_count = count + 1;
133 let delta1 = value - mean;
134 let new_mean = delta1 / new_count as f64 + mean;
135 let delta2 = value - new_mean;
136 let new_result = result + delta1 * delta2;
137
138 count += 1;
139 mean = new_mean;
140 result = new_result;
141 }
142 Some(result / count as f64)
143 }
144}
145
146#[range_fn(
149 name = StddevOverTime,
150 ret = Float64Array,
151 display_name = prom_stddev_over_time
152)]
153pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
154 if values.is_empty() {
155 None
156 } else {
157 let mut count = 0.0;
158 let mut mean = 0.0;
159 let mut comp_mean = 0.0;
160 let mut deviations_sum_sq = 0.0;
161 let mut comp_deviations_sum_sq = 0.0;
162 for v in values {
163 count += 1.0;
164 let current_value = v.unwrap();
165 let delta = current_value - (mean + comp_mean);
166 let (new_mean, new_comp_mean) = compensated_sum_inc(delta / count, mean, comp_mean);
167 mean = new_mean;
168 comp_mean = new_comp_mean;
169 let (new_deviations_sum_sq, new_comp_deviations_sum_sq) = compensated_sum_inc(
170 delta * (current_value - (mean + comp_mean)),
171 deviations_sum_sq,
172 comp_deviations_sum_sq,
173 );
174 deviations_sum_sq = new_deviations_sum_sq;
175 comp_deviations_sum_sq = new_comp_deviations_sum_sq;
176 }
177 Some(((deviations_sum_sq + comp_deviations_sum_sq) / count).sqrt())
178 }
179}
180
181#[cfg(test)]
182mod test {
183 use super::*;
184 use crate::functions::test_util::simple_range_udf_runner;
185
186 fn build_test_range_arrays() -> (RangeArray, RangeArray) {
188 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
189 [
190 1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 200000, 500000,
191 ]
192 .into_iter()
193 .map(Some),
194 ));
195 let ranges = [
196 (0, 2),
197 (0, 5),
198 (1, 1), (2, 0), (2, 0), (3, 3),
202 (4, 3),
203 (5, 3),
204 (8, 1), (9, 0), ];
207
208 let values_array = Arc::new(Float64Array::from_iter([
209 12.345678, 87.654321, 31.415927, 27.182818, 70.710678, 41.421356, 57.735027, 69.314718,
210 98.019802, 1.98019802, 61.803399,
211 ]));
212
213 let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
214 let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
215
216 (ts_range_array, value_range_array)
217 }
218
219 #[test]
220 fn calculate_avg_over_time() {
221 let (ts_array, value_array) = build_test_range_arrays();
222 simple_range_udf_runner(
223 AvgOverTime::scalar_udf(),
224 ts_array,
225 value_array,
226 vec![],
227 vec![
228 Some(49.9999995),
229 Some(45.8618844),
230 Some(87.654321),
231 None,
232 None,
233 Some(46.438284),
234 Some(56.62235366666667),
235 Some(56.15703366666667),
236 Some(98.019802),
237 None,
238 ],
239 );
240 }
241
242 #[test]
243 fn calculate_min_over_time() {
244 let (ts_array, value_array) = build_test_range_arrays();
245 simple_range_udf_runner(
246 MinOverTime::scalar_udf(),
247 ts_array,
248 value_array,
249 vec![],
250 vec![
251 Some(12.345678),
252 Some(12.345678),
253 Some(87.654321),
254 None,
255 None,
256 Some(27.182818),
257 Some(41.421356),
258 Some(41.421356),
259 Some(98.019802),
260 None,
261 ],
262 );
263 }
264
265 #[test]
266 fn calculate_max_over_time() {
267 let (ts_array, value_array) = build_test_range_arrays();
268 simple_range_udf_runner(
269 MaxOverTime::scalar_udf(),
270 ts_array,
271 value_array,
272 vec![],
273 vec![
274 Some(87.654321),
275 Some(87.654321),
276 Some(87.654321),
277 None,
278 None,
279 Some(70.710678),
280 Some(70.710678),
281 Some(69.314718),
282 Some(98.019802),
283 None,
284 ],
285 );
286 }
287
288 #[test]
289 fn calculate_sum_over_time() {
290 let (ts_array, value_array) = build_test_range_arrays();
291 simple_range_udf_runner(
292 SumOverTime::scalar_udf(),
293 ts_array,
294 value_array,
295 vec![],
296 vec![
297 Some(99.999999),
298 Some(229.309422),
299 Some(87.654321),
300 None,
301 None,
302 Some(139.314852),
303 Some(169.867061),
304 Some(168.471101),
305 Some(98.019802),
306 None,
307 ],
308 );
309 }
310
311 #[test]
312 fn calculate_count_over_time() {
313 let (ts_array, value_array) = build_test_range_arrays();
314 simple_range_udf_runner(
315 CountOverTime::scalar_udf(),
316 ts_array,
317 value_array,
318 vec![],
319 vec![
320 Some(2.0),
321 Some(5.0),
322 Some(1.0),
323 None,
324 None,
325 Some(3.0),
326 Some(3.0),
327 Some(3.0),
328 Some(1.0),
329 None,
330 ],
331 );
332 }
333
334 #[test]
335 fn calculate_last_over_time() {
336 let (ts_array, value_array) = build_test_range_arrays();
337 simple_range_udf_runner(
338 LastOverTime::scalar_udf(),
339 ts_array,
340 value_array,
341 vec![],
342 vec![
343 Some(87.654321),
344 Some(70.710678),
345 Some(87.654321),
346 None,
347 None,
348 Some(41.421356),
349 Some(57.735027),
350 Some(69.314718),
351 Some(98.019802),
352 None,
353 ],
354 );
355 }
356
357 #[test]
358 fn calculate_absent_over_time() {
359 let (ts_array, value_array) = build_test_range_arrays();
360 simple_range_udf_runner(
361 AbsentOverTime::scalar_udf(),
362 ts_array,
363 value_array,
364 vec![],
365 vec![
366 None,
367 None,
368 None,
369 Some(1.0),
370 Some(1.0),
371 None,
372 None,
373 None,
374 None,
375 Some(1.0),
376 ],
377 );
378 }
379
380 #[test]
381 fn calculate_present_over_time() {
382 let (ts_array, value_array) = build_test_range_arrays();
383 simple_range_udf_runner(
384 PresentOverTime::scalar_udf(),
385 ts_array,
386 value_array,
387 vec![],
388 vec![
389 Some(1.0),
390 Some(1.0),
391 Some(1.0),
392 None,
393 None,
394 Some(1.0),
395 Some(1.0),
396 Some(1.0),
397 Some(1.0),
398 None,
399 ],
400 );
401 }
402
403 #[test]
404 fn calculate_stdvar_over_time() {
405 let (ts_array, value_array) = build_test_range_arrays();
406 simple_range_udf_runner(
407 StdvarOverTime::scalar_udf(),
408 ts_array,
409 value_array,
410 vec![],
411 vec![
412 Some(1417.8479276253622),
413 Some(808.999919713209),
414 Some(0.0),
415 None,
416 None,
417 Some(328.3638826418587),
418 Some(143.5964181766362),
419 Some(130.91830542386285),
420 Some(0.0),
421 None,
422 ],
423 );
424
425 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
427 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
428 .into_iter()
429 .map(Some),
430 ));
431 let values_array = Arc::new(Float64Array::from_iter([
432 1.5990505637277868,
433 1.5990505637277868,
434 1.5990505637277868,
435 0.0,
436 8.0,
437 8.0,
438 2.0,
439 3.0,
440 ]));
441 let ranges = [(0, 3), (3, 5)];
442 simple_range_udf_runner(
443 StdvarOverTime::scalar_udf(),
444 RangeArray::from_ranges(ts_array, ranges).unwrap(),
445 RangeArray::from_ranges(values_array, ranges).unwrap(),
446 vec![],
447 vec![Some(0.0), Some(10.559999999999999)],
448 );
449 }
450
451 #[test]
452 fn calculate_std_dev_over_time() {
453 let (ts_array, value_array) = build_test_range_arrays();
454 simple_range_udf_runner(
455 StddevOverTime::scalar_udf(),
456 ts_array,
457 value_array,
458 vec![],
459 vec![
460 Some(37.6543215),
461 Some(28.442923895289123),
462 Some(0.0),
463 None,
464 None,
465 Some(18.12081352042062),
466 Some(11.983172291869804),
467 Some(11.441953741554055),
468 Some(0.0),
469 None,
470 ],
471 );
472
473 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
475 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
476 .into_iter()
477 .map(Some),
478 ));
479 let values_array = Arc::new(Float64Array::from_iter([
480 1.5990505637277868,
481 1.5990505637277868,
482 1.5990505637277868,
483 0.0,
484 8.0,
485 8.0,
486 2.0,
487 3.0,
488 ]));
489 let ranges = [(0, 3), (3, 5)];
490 simple_range_udf_runner(
491 StddevOverTime::scalar_udf(),
492 RangeArray::from_ranges(ts_array, ranges).unwrap(),
493 RangeArray::from_ranges(values_array, ranges).unwrap(),
494 vec![],
495 vec![Some(0.0), Some(3.249615361854384)],
496 );
497 }
498}