promql/functions/
aggr_over_time.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_macro::range_fn;
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::common::DataFusionError;
20use datafusion::logical_expr::{ScalarUDF, Volatility};
21use datafusion::physical_plan::ColumnarValue;
22use datatypes::arrow::array::Array;
23use datatypes::arrow::compute;
24use datatypes::arrow::datatypes::DataType;
25
26use crate::functions::{compensated_sum_inc, extract_array};
27use crate::range_array::RangeArray;
28
29/// The average value of all points in the specified interval.
30#[range_fn(
31    name = AvgOverTime,
32    ret = Float64Array,
33    display_name = prom_avg_over_time
34)]
35pub fn avg_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
36    compute::sum(values).map(|result| result / values.len() as f64)
37}
38
39/// The minimum value of all points in the specified interval.
40#[range_fn(
41    name = MinOverTime,
42    ret = Float64Array,
43    display_name = prom_min_over_time
44)]
45pub fn min_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
46    compute::min(values)
47}
48
49/// The maximum value of all points in the specified interval.
50#[range_fn(
51    name = MaxOverTime,
52    ret = Float64Array,
53    display_name = prom_max_over_time
54)]
55pub fn max_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
56    compute::max(values)
57}
58
59/// The sum of all values in the specified interval.
60#[range_fn(
61    name = SumOverTime,
62    ret = Float64Array,
63    display_name = prom_sum_over_time
64)]
65pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
66    compute::sum(values)
67}
68
69/// The count of all values in the specified interval.
70#[range_fn(
71    name = CountOverTime,
72    ret = Float64Array,
73    display_name = prom_count_over_time
74)]
75pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
76    if values.is_empty() {
77        None
78    } else {
79        Some(values.len() as f64)
80    }
81}
82
83/// The most recent point value in specified interval.
84#[range_fn(
85    name = LastOverTime,
86    ret = Float64Array,
87    display_name = prom_last_over_time
88)]
89pub fn last_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
90    values.values().last().copied()
91}
92
93/// absent_over_time returns an empty vector if the range vector passed to it has any
94/// elements (floats or native histograms) and a 1-element vector with the value 1 if
95/// the range vector passed to it has no elements.
96#[range_fn(
97    name = AbsentOverTime,
98    ret = Float64Array,
99    display_name = prom_absent_over_time
100)]
101pub fn absent_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
102    if values.is_empty() { Some(1.0) } else { None }
103}
104
105/// the value 1 for any series in the specified interval.
106#[range_fn(
107    name = PresentOverTime,
108    ret = Float64Array,
109    display_name = prom_present_over_time
110)]
111pub fn present_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
112    if values.is_empty() { None } else { Some(1.0) }
113}
114
115/// the population standard variance of the values in the specified interval.
116/// DataFusion's implementation:
117/// <https://github.com/apache/arrow-datafusion/blob/292eb954fc0bad3a1febc597233ba26cb60bda3e/datafusion/physical-expr/src/aggregate/variance.rs#L224-#L241>
118#[range_fn(
119    name = StdvarOverTime,
120    ret = Float64Array,
121    display_name = prom_stdvar_over_time
122)]
123pub fn stdvar_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
124    if values.is_empty() {
125        None
126    } else {
127        let mut count = 0;
128        let mut mean: f64 = 0.0;
129        let mut result: f64 = 0.0;
130        for value in values {
131            let value = value.unwrap();
132            let new_count = count + 1;
133            let delta1 = value - mean;
134            let new_mean = delta1 / new_count as f64 + mean;
135            let delta2 = value - new_mean;
136            let new_result = result + delta1 * delta2;
137
138            count += 1;
139            mean = new_mean;
140            result = new_result;
141        }
142        Some(result / count as f64)
143    }
144}
145
146/// the population standard deviation of the values in the specified interval.
147/// Prometheus's implementation: <https://github.com/prometheus/prometheus/blob/f55ab2217984770aa1eecd0f2d5f54580029b1c0/promql/functions.go#L556-L569>
148#[range_fn(
149    name = StddevOverTime,
150    ret = Float64Array,
151    display_name = prom_stddev_over_time
152)]
153pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
154    if values.is_empty() {
155        None
156    } else {
157        let mut count = 0.0;
158        let mut mean = 0.0;
159        let mut comp_mean = 0.0;
160        let mut deviations_sum_sq = 0.0;
161        let mut comp_deviations_sum_sq = 0.0;
162        for v in values {
163            count += 1.0;
164            let current_value = v.unwrap();
165            let delta = current_value - (mean + comp_mean);
166            let (new_mean, new_comp_mean) = compensated_sum_inc(delta / count, mean, comp_mean);
167            mean = new_mean;
168            comp_mean = new_comp_mean;
169            let (new_deviations_sum_sq, new_comp_deviations_sum_sq) = compensated_sum_inc(
170                delta * (current_value - (mean + comp_mean)),
171                deviations_sum_sq,
172                comp_deviations_sum_sq,
173            );
174            deviations_sum_sq = new_deviations_sum_sq;
175            comp_deviations_sum_sq = new_comp_deviations_sum_sq;
176        }
177        Some(((deviations_sum_sq + comp_deviations_sum_sq) / count).sqrt())
178    }
179}
180
181#[cfg(test)]
182mod test {
183    use super::*;
184    use crate::functions::test_util::simple_range_udf_runner;
185
186    // build timestamp range and value range arrays for test
187    fn build_test_range_arrays() -> (RangeArray, RangeArray) {
188        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
189            [
190                1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 200000, 500000,
191            ]
192            .into_iter()
193            .map(Some),
194        ));
195        let ranges = [
196            (0, 2),
197            (0, 5),
198            (1, 1), // only 1 element
199            (2, 0), // empty range
200            (2, 0), // empty range
201            (3, 3),
202            (4, 3),
203            (5, 3),
204            (8, 1), // only 1 element
205            (9, 0), // empty range
206        ];
207
208        let values_array = Arc::new(Float64Array::from_iter([
209            12.345678, 87.654321, 31.415927, 27.182818, 70.710678, 41.421356, 57.735027, 69.314718,
210            98.019802, 1.98019802, 61.803399,
211        ]));
212
213        let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
214        let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
215
216        (ts_range_array, value_range_array)
217    }
218
219    #[test]
220    fn calculate_avg_over_time() {
221        let (ts_array, value_array) = build_test_range_arrays();
222        simple_range_udf_runner(
223            AvgOverTime::scalar_udf(),
224            ts_array,
225            value_array,
226            vec![],
227            vec![
228                Some(49.9999995),
229                Some(45.8618844),
230                Some(87.654321),
231                None,
232                None,
233                Some(46.438284),
234                Some(56.62235366666667),
235                Some(56.15703366666667),
236                Some(98.019802),
237                None,
238            ],
239        );
240    }
241
242    #[test]
243    fn calculate_min_over_time() {
244        let (ts_array, value_array) = build_test_range_arrays();
245        simple_range_udf_runner(
246            MinOverTime::scalar_udf(),
247            ts_array,
248            value_array,
249            vec![],
250            vec![
251                Some(12.345678),
252                Some(12.345678),
253                Some(87.654321),
254                None,
255                None,
256                Some(27.182818),
257                Some(41.421356),
258                Some(41.421356),
259                Some(98.019802),
260                None,
261            ],
262        );
263    }
264
265    #[test]
266    fn calculate_max_over_time() {
267        let (ts_array, value_array) = build_test_range_arrays();
268        simple_range_udf_runner(
269            MaxOverTime::scalar_udf(),
270            ts_array,
271            value_array,
272            vec![],
273            vec![
274                Some(87.654321),
275                Some(87.654321),
276                Some(87.654321),
277                None,
278                None,
279                Some(70.710678),
280                Some(70.710678),
281                Some(69.314718),
282                Some(98.019802),
283                None,
284            ],
285        );
286    }
287
288    #[test]
289    fn calculate_sum_over_time() {
290        let (ts_array, value_array) = build_test_range_arrays();
291        simple_range_udf_runner(
292            SumOverTime::scalar_udf(),
293            ts_array,
294            value_array,
295            vec![],
296            vec![
297                Some(99.999999),
298                Some(229.309422),
299                Some(87.654321),
300                None,
301                None,
302                Some(139.314852),
303                Some(169.867061),
304                Some(168.471101),
305                Some(98.019802),
306                None,
307            ],
308        );
309    }
310
311    #[test]
312    fn calculate_count_over_time() {
313        let (ts_array, value_array) = build_test_range_arrays();
314        simple_range_udf_runner(
315            CountOverTime::scalar_udf(),
316            ts_array,
317            value_array,
318            vec![],
319            vec![
320                Some(2.0),
321                Some(5.0),
322                Some(1.0),
323                None,
324                None,
325                Some(3.0),
326                Some(3.0),
327                Some(3.0),
328                Some(1.0),
329                None,
330            ],
331        );
332    }
333
334    #[test]
335    fn calculate_last_over_time() {
336        let (ts_array, value_array) = build_test_range_arrays();
337        simple_range_udf_runner(
338            LastOverTime::scalar_udf(),
339            ts_array,
340            value_array,
341            vec![],
342            vec![
343                Some(87.654321),
344                Some(70.710678),
345                Some(87.654321),
346                None,
347                None,
348                Some(41.421356),
349                Some(57.735027),
350                Some(69.314718),
351                Some(98.019802),
352                None,
353            ],
354        );
355    }
356
357    #[test]
358    fn calculate_absent_over_time() {
359        let (ts_array, value_array) = build_test_range_arrays();
360        simple_range_udf_runner(
361            AbsentOverTime::scalar_udf(),
362            ts_array,
363            value_array,
364            vec![],
365            vec![
366                None,
367                None,
368                None,
369                Some(1.0),
370                Some(1.0),
371                None,
372                None,
373                None,
374                None,
375                Some(1.0),
376            ],
377        );
378    }
379
380    #[test]
381    fn calculate_present_over_time() {
382        let (ts_array, value_array) = build_test_range_arrays();
383        simple_range_udf_runner(
384            PresentOverTime::scalar_udf(),
385            ts_array,
386            value_array,
387            vec![],
388            vec![
389                Some(1.0),
390                Some(1.0),
391                Some(1.0),
392                None,
393                None,
394                Some(1.0),
395                Some(1.0),
396                Some(1.0),
397                Some(1.0),
398                None,
399            ],
400        );
401    }
402
403    #[test]
404    fn calculate_stdvar_over_time() {
405        let (ts_array, value_array) = build_test_range_arrays();
406        simple_range_udf_runner(
407            StdvarOverTime::scalar_udf(),
408            ts_array,
409            value_array,
410            vec![],
411            vec![
412                Some(1417.8479276253622),
413                Some(808.999919713209),
414                Some(0.0),
415                None,
416                None,
417                Some(328.3638826418587),
418                Some(143.5964181766362),
419                Some(130.91830542386285),
420                Some(0.0),
421                None,
422            ],
423        );
424
425        // add more assertions
426        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
427            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
428                .into_iter()
429                .map(Some),
430        ));
431        let values_array = Arc::new(Float64Array::from_iter([
432            1.5990505637277868,
433            1.5990505637277868,
434            1.5990505637277868,
435            0.0,
436            8.0,
437            8.0,
438            2.0,
439            3.0,
440        ]));
441        let ranges = [(0, 3), (3, 5)];
442        simple_range_udf_runner(
443            StdvarOverTime::scalar_udf(),
444            RangeArray::from_ranges(ts_array, ranges).unwrap(),
445            RangeArray::from_ranges(values_array, ranges).unwrap(),
446            vec![],
447            vec![Some(0.0), Some(10.559999999999999)],
448        );
449    }
450
451    #[test]
452    fn calculate_std_dev_over_time() {
453        let (ts_array, value_array) = build_test_range_arrays();
454        simple_range_udf_runner(
455            StddevOverTime::scalar_udf(),
456            ts_array,
457            value_array,
458            vec![],
459            vec![
460                Some(37.6543215),
461                Some(28.442923895289123),
462                Some(0.0),
463                None,
464                None,
465                Some(18.12081352042062),
466                Some(11.983172291869804),
467                Some(11.441953741554055),
468                Some(0.0),
469                None,
470            ],
471        );
472
473        // add more assertions
474        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
475            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
476                .into_iter()
477                .map(Some),
478        ));
479        let values_array = Arc::new(Float64Array::from_iter([
480            1.5990505637277868,
481            1.5990505637277868,
482            1.5990505637277868,
483            0.0,
484            8.0,
485            8.0,
486            2.0,
487            3.0,
488        ]));
489        let ranges = [(0, 3), (3, 5)];
490        simple_range_udf_runner(
491            StddevOverTime::scalar_udf(),
492            RangeArray::from_ranges(ts_array, ranges).unwrap(),
493            RangeArray::from_ranges(values_array, ranges).unwrap(),
494            vec![],
495            vec![Some(0.0), Some(3.249615361854384)],
496        );
497    }
498}