promql/functions/
aggr_over_time.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_macro::range_fn;
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::common::DataFusionError;
20use datafusion::logical_expr::{ScalarUDF, Volatility};
21use datafusion::physical_plan::ColumnarValue;
22use datatypes::arrow::array::Array;
23use datatypes::arrow::compute;
24use datatypes::arrow::datatypes::DataType;
25
26use crate::functions::{compensated_sum_inc, extract_array};
27use crate::range_array::RangeArray;
28
29/// The average value of all points in the specified interval.
30#[range_fn(
31    name = AvgOverTime,
32    ret = Float64Array,
33    display_name = prom_avg_over_time
34)]
35pub fn avg_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
36    compute::sum(values).map(|result| result / values.len() as f64)
37}
38
39/// The minimum value of all points in the specified interval.
40#[range_fn(
41    name = MinOverTime,
42    ret = Float64Array,
43    display_name = prom_min_over_time
44)]
45pub fn min_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
46    compute::min(values)
47}
48
49/// The maximum value of all points in the specified interval.
50#[range_fn(
51    name = MaxOverTime,
52    ret = Float64Array,
53    display_name = prom_max_over_time
54)]
55pub fn max_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
56    compute::max(values)
57}
58
59/// The sum of all values in the specified interval.
60#[range_fn(
61    name = SumOverTime,
62    ret = Float64Array,
63    display_name = prom_sum_over_time
64)]
65pub fn sum_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
66    compute::sum(values)
67}
68
69/// The count of all values in the specified interval.
70#[range_fn(
71    name = CountOverTime,
72    ret = Float64Array,
73    display_name = prom_count_over_time
74)]
75pub fn count_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
76    if values.is_empty() {
77        None
78    } else {
79        Some(values.len() as f64)
80    }
81}
82
83/// The most recent point value in specified interval.
84#[range_fn(
85    name = LastOverTime,
86    ret = Float64Array,
87    display_name = prom_last_over_time
88)]
89pub fn last_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
90    values.values().last().copied()
91}
92
93/// absent_over_time returns an empty vector if the range vector passed to it has any
94/// elements (floats or native histograms) and a 1-element vector with the value 1 if
95/// the range vector passed to it has no elements.
96#[range_fn(
97    name = AbsentOverTime,
98    ret = Float64Array,
99    display_name = prom_absent_over_time
100)]
101pub fn absent_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
102    if values.is_empty() {
103        Some(1.0)
104    } else {
105        None
106    }
107}
108
109/// the value 1 for any series in the specified interval.
110#[range_fn(
111    name = PresentOverTime,
112    ret = Float64Array,
113    display_name = prom_present_over_time
114)]
115pub fn present_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
116    if values.is_empty() {
117        None
118    } else {
119        Some(1.0)
120    }
121}
122
123/// the population standard variance of the values in the specified interval.
124/// DataFusion's implementation:
125/// <https://github.com/apache/arrow-datafusion/blob/292eb954fc0bad3a1febc597233ba26cb60bda3e/datafusion/physical-expr/src/aggregate/variance.rs#L224-#L241>
126#[range_fn(
127    name = StdvarOverTime,
128    ret = Float64Array,
129    display_name = prom_stdvar_over_time
130)]
131pub fn stdvar_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
132    if values.is_empty() {
133        None
134    } else {
135        let mut count = 0;
136        let mut mean: f64 = 0.0;
137        let mut result: f64 = 0.0;
138        for value in values {
139            let value = value.unwrap();
140            let new_count = count + 1;
141            let delta1 = value - mean;
142            let new_mean = delta1 / new_count as f64 + mean;
143            let delta2 = value - new_mean;
144            let new_result = result + delta1 * delta2;
145
146            count += 1;
147            mean = new_mean;
148            result = new_result;
149        }
150        Some(result / count as f64)
151    }
152}
153
154/// the population standard deviation of the values in the specified interval.
155/// Prometheus's implementation: <https://github.com/prometheus/prometheus/blob/f55ab2217984770aa1eecd0f2d5f54580029b1c0/promql/functions.go#L556-L569>
156#[range_fn(
157    name = StddevOverTime,
158    ret = Float64Array,
159    display_name = prom_stddev_over_time
160)]
161pub fn stddev_over_time(_: &TimestampMillisecondArray, values: &Float64Array) -> Option<f64> {
162    if values.is_empty() {
163        None
164    } else {
165        let mut count = 0.0;
166        let mut mean = 0.0;
167        let mut comp_mean = 0.0;
168        let mut deviations_sum_sq = 0.0;
169        let mut comp_deviations_sum_sq = 0.0;
170        for v in values {
171            count += 1.0;
172            let current_value = v.unwrap();
173            let delta = current_value - (mean + comp_mean);
174            let (new_mean, new_comp_mean) = compensated_sum_inc(delta / count, mean, comp_mean);
175            mean = new_mean;
176            comp_mean = new_comp_mean;
177            let (new_deviations_sum_sq, new_comp_deviations_sum_sq) = compensated_sum_inc(
178                delta * (current_value - (mean + comp_mean)),
179                deviations_sum_sq,
180                comp_deviations_sum_sq,
181            );
182            deviations_sum_sq = new_deviations_sum_sq;
183            comp_deviations_sum_sq = new_comp_deviations_sum_sq;
184        }
185        Some(((deviations_sum_sq + comp_deviations_sum_sq) / count).sqrt())
186    }
187}
188
189#[cfg(test)]
190mod test {
191    use super::*;
192    use crate::functions::test_util::simple_range_udf_runner;
193
194    // build timestamp range and value range arrays for test
195    fn build_test_range_arrays() -> (RangeArray, RangeArray) {
196        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
197            [
198                1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000, 200000, 500000,
199            ]
200            .into_iter()
201            .map(Some),
202        ));
203        let ranges = [
204            (0, 2),
205            (0, 5),
206            (1, 1), // only 1 element
207            (2, 0), // empty range
208            (2, 0), // empty range
209            (3, 3),
210            (4, 3),
211            (5, 3),
212            (8, 1), // only 1 element
213            (9, 0), // empty range
214        ];
215
216        let values_array = Arc::new(Float64Array::from_iter([
217            12.345678, 87.654321, 31.415927, 27.182818, 70.710678, 41.421356, 57.735027, 69.314718,
218            98.019802, 1.98019802, 61.803399,
219        ]));
220
221        let ts_range_array = RangeArray::from_ranges(ts_array, ranges).unwrap();
222        let value_range_array = RangeArray::from_ranges(values_array, ranges).unwrap();
223
224        (ts_range_array, value_range_array)
225    }
226
227    #[test]
228    fn calculate_avg_over_time() {
229        let (ts_array, value_array) = build_test_range_arrays();
230        simple_range_udf_runner(
231            AvgOverTime::scalar_udf(),
232            ts_array,
233            value_array,
234            vec![],
235            vec![
236                Some(49.9999995),
237                Some(45.8618844),
238                Some(87.654321),
239                None,
240                None,
241                Some(46.438284),
242                Some(56.62235366666667),
243                Some(56.15703366666667),
244                Some(98.019802),
245                None,
246            ],
247        );
248    }
249
250    #[test]
251    fn calculate_min_over_time() {
252        let (ts_array, value_array) = build_test_range_arrays();
253        simple_range_udf_runner(
254            MinOverTime::scalar_udf(),
255            ts_array,
256            value_array,
257            vec![],
258            vec![
259                Some(12.345678),
260                Some(12.345678),
261                Some(87.654321),
262                None,
263                None,
264                Some(27.182818),
265                Some(41.421356),
266                Some(41.421356),
267                Some(98.019802),
268                None,
269            ],
270        );
271    }
272
273    #[test]
274    fn calculate_max_over_time() {
275        let (ts_array, value_array) = build_test_range_arrays();
276        simple_range_udf_runner(
277            MaxOverTime::scalar_udf(),
278            ts_array,
279            value_array,
280            vec![],
281            vec![
282                Some(87.654321),
283                Some(87.654321),
284                Some(87.654321),
285                None,
286                None,
287                Some(70.710678),
288                Some(70.710678),
289                Some(69.314718),
290                Some(98.019802),
291                None,
292            ],
293        );
294    }
295
296    #[test]
297    fn calculate_sum_over_time() {
298        let (ts_array, value_array) = build_test_range_arrays();
299        simple_range_udf_runner(
300            SumOverTime::scalar_udf(),
301            ts_array,
302            value_array,
303            vec![],
304            vec![
305                Some(99.999999),
306                Some(229.309422),
307                Some(87.654321),
308                None,
309                None,
310                Some(139.314852),
311                Some(169.867061),
312                Some(168.471101),
313                Some(98.019802),
314                None,
315            ],
316        );
317    }
318
319    #[test]
320    fn calculate_count_over_time() {
321        let (ts_array, value_array) = build_test_range_arrays();
322        simple_range_udf_runner(
323            CountOverTime::scalar_udf(),
324            ts_array,
325            value_array,
326            vec![],
327            vec![
328                Some(2.0),
329                Some(5.0),
330                Some(1.0),
331                None,
332                None,
333                Some(3.0),
334                Some(3.0),
335                Some(3.0),
336                Some(1.0),
337                None,
338            ],
339        );
340    }
341
342    #[test]
343    fn calculate_last_over_time() {
344        let (ts_array, value_array) = build_test_range_arrays();
345        simple_range_udf_runner(
346            LastOverTime::scalar_udf(),
347            ts_array,
348            value_array,
349            vec![],
350            vec![
351                Some(87.654321),
352                Some(70.710678),
353                Some(87.654321),
354                None,
355                None,
356                Some(41.421356),
357                Some(57.735027),
358                Some(69.314718),
359                Some(98.019802),
360                None,
361            ],
362        );
363    }
364
365    #[test]
366    fn calculate_absent_over_time() {
367        let (ts_array, value_array) = build_test_range_arrays();
368        simple_range_udf_runner(
369            AbsentOverTime::scalar_udf(),
370            ts_array,
371            value_array,
372            vec![],
373            vec![
374                None,
375                None,
376                None,
377                Some(1.0),
378                Some(1.0),
379                None,
380                None,
381                None,
382                None,
383                Some(1.0),
384            ],
385        );
386    }
387
388    #[test]
389    fn calculate_present_over_time() {
390        let (ts_array, value_array) = build_test_range_arrays();
391        simple_range_udf_runner(
392            PresentOverTime::scalar_udf(),
393            ts_array,
394            value_array,
395            vec![],
396            vec![
397                Some(1.0),
398                Some(1.0),
399                Some(1.0),
400                None,
401                None,
402                Some(1.0),
403                Some(1.0),
404                Some(1.0),
405                Some(1.0),
406                None,
407            ],
408        );
409    }
410
411    #[test]
412    fn calculate_stdvar_over_time() {
413        let (ts_array, value_array) = build_test_range_arrays();
414        simple_range_udf_runner(
415            StdvarOverTime::scalar_udf(),
416            ts_array,
417            value_array,
418            vec![],
419            vec![
420                Some(1417.8479276253622),
421                Some(808.999919713209),
422                Some(0.0),
423                None,
424                None,
425                Some(328.3638826418587),
426                Some(143.5964181766362),
427                Some(130.91830542386285),
428                Some(0.0),
429                None,
430            ],
431        );
432
433        // add more assertions
434        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
435            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
436                .into_iter()
437                .map(Some),
438        ));
439        let values_array = Arc::new(Float64Array::from_iter([
440            1.5990505637277868,
441            1.5990505637277868,
442            1.5990505637277868,
443            0.0,
444            8.0,
445            8.0,
446            2.0,
447            3.0,
448        ]));
449        let ranges = [(0, 3), (3, 5)];
450        simple_range_udf_runner(
451            StdvarOverTime::scalar_udf(),
452            RangeArray::from_ranges(ts_array, ranges).unwrap(),
453            RangeArray::from_ranges(values_array, ranges).unwrap(),
454            vec![],
455            vec![Some(0.0), Some(10.559999999999999)],
456        );
457    }
458
459    #[test]
460    fn calculate_std_dev_over_time() {
461        let (ts_array, value_array) = build_test_range_arrays();
462        simple_range_udf_runner(
463            StddevOverTime::scalar_udf(),
464            ts_array,
465            value_array,
466            vec![],
467            vec![
468                Some(37.6543215),
469                Some(28.442923895289123),
470                Some(0.0),
471                None,
472                None,
473                Some(18.12081352042062),
474                Some(11.983172291869804),
475                Some(11.441953741554055),
476                Some(0.0),
477                None,
478            ],
479        );
480
481        // add more assertions
482        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
483            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000]
484                .into_iter()
485                .map(Some),
486        ));
487        let values_array = Arc::new(Float64Array::from_iter([
488            1.5990505637277868,
489            1.5990505637277868,
490            1.5990505637277868,
491            0.0,
492            8.0,
493            8.0,
494            2.0,
495            3.0,
496        ]));
497        let ranges = [(0, 3), (3, 5)];
498        simple_range_udf_runner(
499            StddevOverTime::scalar_udf(),
500            RangeArray::from_ranges(ts_array, ranges).unwrap(),
501            RangeArray::from_ranges(values_array, ranges).unwrap(),
502            vec![],
503            vec![Some(0.0), Some(3.249615361854384)],
504        );
505    }
506}