promql/functions/
extrapolate_rate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file also contains some code from prometheus project.
16
17// Copyright 2015 The Prometheus Authors
18// Licensed under the Apache License, Version 2.0 (the "License");
19// you may not use this file except in compliance with the License.
20// You may obtain a copy of the License at
21//
22// http://www.apache.org/licenses/LICENSE-2.0
23//
24// Unless required by applicable law or agreed to in writing, software
25// distributed under the License is distributed on an "AS IS" BASIS,
26// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27// See the License for the specific language governing permissions and
28// limitations under the License.
29
30//! Implementations of `rate`, `increase` and `delta` functions in PromQL.
31
32use std::fmt::Display;
33use std::sync::Arc;
34
35use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
36use datafusion::arrow::datatypes::TimeUnit;
37use datafusion::common::{DataFusionError, Result as DfResult};
38use datafusion::logical_expr::{ScalarUDF, Volatility};
39use datafusion::physical_plan::ColumnarValue;
40use datafusion_expr::create_udf;
41use datatypes::arrow::array::{Array, Int64Array};
42use datatypes::arrow::datatypes::DataType;
43
44use crate::extension_plan::Millisecond;
45use crate::functions::extract_array;
46use crate::range_array::RangeArray;
47
48pub type Delta = ExtrapolatedRate<false, false>;
49pub type Rate = ExtrapolatedRate<true, true>;
50pub type Increase = ExtrapolatedRate<true, false>;
51
52/// Part of the `extrapolatedRate` in Promql,
53/// from <https://github.com/prometheus/prometheus/blob/v0.40.1/promql/functions.go#L66>
54#[derive(Debug)]
55pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
56    /// Range length in milliseconds.
57    range_length: i64,
58}
59
60impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, IS_RATE> {
61    /// Constructor. Other public usage should use [scalar_udf()](ExtrapolatedRate::scalar_udf()) instead.
62    fn new(range_length: i64) -> Self {
63        Self { range_length }
64    }
65
66    fn scalar_udf_with_name(name: &str) -> ScalarUDF {
67        let input_types = vec![
68            // timestamp range vector
69            RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
70            // value range vector
71            RangeArray::convert_data_type(DataType::Float64),
72            // timestamp vector
73            DataType::Timestamp(TimeUnit::Millisecond, None),
74            // range length
75            DataType::Int64,
76        ];
77
78        create_udf(
79            name,
80            input_types,
81            DataType::Float64,
82            Volatility::Volatile,
83            Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
84        )
85    }
86
87    fn create_function(inputs: &[ColumnarValue]) -> DfResult<Self> {
88        if inputs.len() != 4 {
89            return Err(DataFusionError::Plan(
90                "ExtrapolatedRate function should have 4 inputs".to_string(),
91            ));
92        }
93
94        let range_length_array = extract_array(&inputs[3])?;
95        let range_length = range_length_array
96            .as_any()
97            .downcast_ref::<Int64Array>()
98            .unwrap()
99            .value(0) as i64;
100
101        Ok(Self::new(range_length))
102    }
103
104    /// Input parameters:
105    /// * 0: timestamp range vector
106    /// * 1: value range vector
107    /// * 2: timestamp vector
108    /// * 3: range length. Range duration in millisecond. Not used here
109    fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
110        assert_eq!(input.len(), 4);
111
112        // construct matrix from input
113        let ts_array = extract_array(&input[0])?;
114        let ts_range = RangeArray::try_new(ts_array.to_data().into())?;
115        let value_array = extract_array(&input[1])?;
116        let value_range = RangeArray::try_new(value_array.to_data().into())?;
117        let ts = extract_array(&input[2])?;
118        let ts = ts
119            .as_any()
120            .downcast_ref::<TimestampMillisecondArray>()
121            .unwrap();
122
123        // calculation
124        let mut result_array = Vec::with_capacity(ts_range.len());
125
126        let all_timestamps = ts_range
127            .values()
128            .as_any()
129            .downcast_ref::<TimestampMillisecondArray>()
130            .unwrap()
131            .values();
132        let all_values = value_range
133            .values()
134            .as_any()
135            .downcast_ref::<Float64Array>()
136            .unwrap()
137            .values();
138        for index in 0..ts_range.len() {
139            // Safety: we are inside `ts_range`'s iterator which guarantees the index is valid.
140            let (offset, length) = ts_range.get_offset_length(index).unwrap();
141
142            let timestamps = &all_timestamps[offset..offset + length];
143            let end_ts = ts.value(index);
144            let values = &all_values[offset..offset + length];
145
146            if values.len() < 2 {
147                result_array.push(None);
148                continue;
149            }
150
151            // refer to functions.go L83-L110
152            let mut result_value = values.last().unwrap() - values.first().unwrap();
153            if IS_COUNTER {
154                for window in values.windows(2) {
155                    let prev = window[0];
156                    let curr = window[1];
157                    if curr < prev {
158                        result_value += prev
159                    }
160                }
161            }
162
163            let mut factor = Self::extrapolate_factor(
164                timestamps,
165                end_ts,
166                self.range_length,
167                *values.first().unwrap(),
168                result_value,
169            );
170
171            if IS_RATE {
172                // safety: range_length is checked to be non-zero in the planner.
173                factor /= self.range_length as f64 / 1000.0;
174            }
175
176            result_array.push(Some(result_value * factor));
177        }
178
179        let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
180        Ok(result)
181    }
182
183    fn extrapolate_factor(
184        timestamps: &[Millisecond],
185        range_end: Millisecond,
186        range_length: Millisecond,
187        // the following two parameters are for counters.
188        // see functions.go L121 - L127
189        first_value: f64,
190        result_value: f64,
191    ) -> f64 {
192        // result_value
193        // refer to functions.go extrapolatedRate fn
194        // assume offset is processed (and it should be processed in normalize plan)
195        let range_start = range_end - range_length;
196        let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0;
197        let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0;
198        let sampled_interval =
199            (timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0;
200        let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64;
201
202        // functions.go L122 - L134. quote:
203        // Counters cannot be negative. If we have any slope at
204        // all (i.e. resultValue went up), we can extrapolate
205        // the zero point of the counter. If the duration to the
206        // zero point is shorter than the durationToStart, we
207        // take the zero point as the start of the series,
208        // thereby avoiding extrapolation to negative counter
209        // values.
210        if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
211            let duration_to_zero = sampled_interval * (first_value / result_value);
212            if duration_to_zero < duration_to_start {
213                duration_to_start = duration_to_zero;
214            }
215        }
216
217        let extrapolation_threshold = average_duration_between_samples * 1.1;
218        let mut extrapolate_to_interval = sampled_interval;
219
220        if duration_to_start < extrapolation_threshold {
221            extrapolate_to_interval += duration_to_start;
222        } else {
223            extrapolate_to_interval += average_duration_between_samples / 2.0;
224        }
225        if duration_to_end < extrapolation_threshold {
226            extrapolate_to_interval += duration_to_end;
227        } else {
228            extrapolate_to_interval += average_duration_between_samples / 2.0;
229        }
230
231        extrapolate_to_interval / sampled_interval
232    }
233}
234
235// delta
236impl ExtrapolatedRate<false, false> {
237    pub const fn name() -> &'static str {
238        "prom_delta"
239    }
240
241    pub fn scalar_udf() -> ScalarUDF {
242        Self::scalar_udf_with_name(Self::name())
243    }
244}
245
246// rate
247impl ExtrapolatedRate<true, true> {
248    pub const fn name() -> &'static str {
249        "prom_rate"
250    }
251
252    pub fn scalar_udf() -> ScalarUDF {
253        Self::scalar_udf_with_name(Self::name())
254    }
255}
256
257// increase
258impl ExtrapolatedRate<true, false> {
259    pub const fn name() -> &'static str {
260        "prom_increase"
261    }
262
263    pub fn scalar_udf() -> ScalarUDF {
264        Self::scalar_udf_with_name(Self::name())
265    }
266}
267
268impl Display for ExtrapolatedRate<false, false> {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.write_str("PromQL Delta Function")
271    }
272}
273
274impl Display for ExtrapolatedRate<true, true> {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        f.write_str("PromQL Rate Function")
277    }
278}
279
280impl Display for ExtrapolatedRate<true, false> {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        f.write_str("PromQL Increase Function")
283    }
284}
285
286#[cfg(test)]
287mod test {
288
289    use datafusion::arrow::array::ArrayRef;
290
291    use super::*;
292
293    /// Range length is fixed to 5
294    fn extrapolated_rate_runner<const IS_COUNTER: bool, const IS_RATE: bool>(
295        ts_range: RangeArray,
296        value_range: RangeArray,
297        timestamps: ArrayRef,
298        expected: Vec<f64>,
299    ) {
300        let input = vec![
301            ColumnarValue::Array(Arc::new(ts_range.into_dict())),
302            ColumnarValue::Array(Arc::new(value_range.into_dict())),
303            ColumnarValue::Array(timestamps),
304            ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))),
305        ];
306        let output = extract_array(
307            &ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)
308                .calc(&input)
309                .unwrap(),
310        )
311        .unwrap()
312        .as_any()
313        .downcast_ref::<Float64Array>()
314        .unwrap()
315        .values()
316        .to_vec();
317        assert_eq!(output, expected);
318    }
319
320    #[test]
321    fn increase_abnormal_input() {
322        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
323            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
324        ));
325        let values_array = Arc::new(Float64Array::from_iter([
326            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
327        ]));
328        let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
329        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
330        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
331        let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
332            Some(2),
333            Some(5),
334            Some(2),
335            Some(6),
336            Some(9),
337            None,
338        ])) as _;
339        extrapolated_rate_runner::<true, false>(
340            ts_range,
341            value_range,
342            timestamps,
343            vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0],
344        );
345    }
346
347    #[test]
348    fn increase_normal_input() {
349        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
350            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
351        ));
352        let values_array = Arc::new(Float64Array::from_iter([
353            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
354        ]));
355        let ranges = [
356            (0, 2),
357            (1, 2),
358            (2, 2),
359            (3, 2),
360            (4, 2),
361            (5, 2),
362            (6, 2),
363            (7, 2),
364        ];
365        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
366        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
367        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
368            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
369        )) as _;
370        extrapolated_rate_runner::<true, false>(
371            ts_range,
372            value_range,
373            timestamps,
374            // `2.0` is because that `duration_to_zero` less than `extrapolation_threshold`
375            vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
376        );
377    }
378
379    #[test]
380    fn increase_short_input() {
381        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
382            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
383        ));
384        let values_array = Arc::new(Float64Array::from_iter([
385            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
386        ]));
387        let ranges = [
388            (0, 1),
389            (1, 0),
390            (2, 1),
391            (3, 0),
392            (4, 3),
393            (5, 1),
394            (6, 0),
395            (7, 2),
396        ];
397        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
398        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
399        let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
400            Some(1),
401            None,
402            Some(3),
403            None,
404            Some(7),
405            Some(6),
406            None,
407            Some(9),
408        ])) as _;
409        extrapolated_rate_runner::<true, false>(
410            ts_range,
411            value_range,
412            timestamps,
413            vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5],
414        );
415    }
416
417    #[test]
418    fn increase_counter_reset() {
419        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
420            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
421        ));
422        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
423        let values_array = Arc::new(Float64Array::from_iter([
424            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
425        ]));
426        let ranges = [
427            (0, 2),
428            (1, 2),
429            (2, 2),
430            (3, 2),
431            (4, 2),
432            (5, 2),
433            (6, 2),
434            (7, 2),
435        ];
436        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
437        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
438        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
439            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
440        )) as _;
441        extrapolated_rate_runner::<true, false>(
442            ts_range,
443            value_range,
444            timestamps,
445            // that two `2.0` is because `duration_to_start` are shrunk to
446            // `duration_to_zero`, and causes `duration_to_zero` less than
447            // `extrapolation_threshold`.
448            vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5],
449        );
450    }
451
452    #[test]
453    fn rate_counter_reset() {
454        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
455            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
456        ));
457        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
458        let values_array = Arc::new(Float64Array::from_iter([
459            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
460        ]));
461        let ranges = [
462            (0, 2),
463            (1, 2),
464            (2, 2),
465            (3, 2),
466            (4, 2),
467            (5, 2),
468            (6, 2),
469            (7, 2),
470        ];
471        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
472        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
473        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
474            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
475        )) as _;
476        extrapolated_rate_runner::<true, true>(
477            ts_range,
478            value_range,
479            timestamps,
480            vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0],
481        );
482    }
483
484    #[test]
485    fn rate_normal_input() {
486        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
487            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
488        ));
489        let values_array = Arc::new(Float64Array::from_iter([
490            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
491        ]));
492        let ranges = [
493            (0, 2),
494            (1, 2),
495            (2, 2),
496            (3, 2),
497            (4, 2),
498            (5, 2),
499            (6, 2),
500            (7, 2),
501        ];
502        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
503        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
504        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
505            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
506        )) as _;
507        extrapolated_rate_runner::<true, true>(
508            ts_range,
509            value_range,
510            timestamps,
511            vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0],
512        );
513    }
514
515    #[test]
516    fn delta_counter_reset() {
517        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
518            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
519        ));
520        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
521        let values_array = Arc::new(Float64Array::from_iter([
522            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
523        ]));
524        let ranges = [
525            (0, 2),
526            (1, 2),
527            (2, 2),
528            (3, 2),
529            (4, 2),
530            (5, 2),
531            (6, 2),
532            (7, 2),
533        ];
534        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
535        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
536        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
537            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
538        )) as _;
539        extrapolated_rate_runner::<false, false>(
540            ts_range,
541            value_range,
542            timestamps,
543            // delta doesn't handle counter reset, thus there is a negative value
544            vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5],
545        );
546    }
547
548    #[test]
549    fn delta_normal_input() {
550        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
551            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
552        ));
553        let values_array = Arc::new(Float64Array::from_iter([
554            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
555        ]));
556        let ranges = [
557            (0, 2),
558            (1, 2),
559            (2, 2),
560            (3, 2),
561            (4, 2),
562            (5, 2),
563            (6, 2),
564            (7, 2),
565        ];
566        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
567        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
568        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
569            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
570        )) as _;
571        extrapolated_rate_runner::<false, false>(
572            ts_range,
573            value_range,
574            timestamps,
575            vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
576        );
577    }
578}