Skip to main content

promql/functions/
extrapolate_rate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file also contains some code from prometheus project.
16
17// Copyright 2015 The Prometheus Authors
18// Licensed under the Apache License, Version 2.0 (the "License");
19// you may not use this file except in compliance with the License.
20// You may obtain a copy of the License at
21//
22// http://www.apache.org/licenses/LICENSE-2.0
23//
24// Unless required by applicable law or agreed to in writing, software
25// distributed under the License is distributed on an "AS IS" BASIS,
26// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
27// See the License for the specific language governing permissions and
28// limitations under the License.
29
30//! Implementations of `rate`, `increase` and `delta` functions in PromQL.
31
32use std::fmt::Display;
33use std::sync::Arc;
34
35use datafusion::arrow::array::{
36    DictionaryArray, Float64Array, Float64Builder, TimestampMillisecondArray,
37};
38use datafusion::arrow::datatypes::TimeUnit;
39use datafusion::common::{DataFusionError, Result as DfResult};
40use datafusion::logical_expr::{ScalarUDF, Volatility};
41use datafusion::physical_plan::ColumnarValue;
42use datafusion_expr::create_udf;
43use datatypes::arrow::array::{Array, Int64Array};
44use datatypes::arrow::datatypes::{DataType, Int64Type};
45
46use crate::functions::extract_array;
47use crate::range_array::{RangeArray, unpack};
48
49pub type Delta = ExtrapolatedRate<false, false>;
50pub type Rate = ExtrapolatedRate<true, true>;
51pub type Increase = ExtrapolatedRate<true, false>;
52
53/// Part of the `extrapolatedRate` in Promql,
54/// from <https://github.com/prometheus/prometheus/blob/v0.40.1/promql/functions.go#L66>
55#[derive(Debug)]
56pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
57    /// Range length in milliseconds.
58    range_length: i64,
59}
60
61impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, IS_RATE> {
62    /// Constructor. Other public usage should use [scalar_udf()](ExtrapolatedRate::scalar_udf()) instead.
63    fn new(range_length: i64) -> Self {
64        Self { range_length }
65    }
66
67    fn func_name() -> &'static str {
68        match (IS_COUNTER, IS_RATE) {
69            (true, true) => "prom_rate",
70            (true, false) => "prom_increase",
71            (false, false) => "prom_delta",
72            (false, true) => {
73                unreachable!("gauge rate is not supported by ExtrapolatedRate")
74            }
75        }
76    }
77
78    fn scalar_udf_with_name(name: &str) -> ScalarUDF {
79        let input_types = vec![
80            // timestamp range vector
81            RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
82            // value range vector
83            RangeArray::convert_data_type(DataType::Float64),
84            // timestamp vector
85            DataType::Timestamp(TimeUnit::Millisecond, None),
86            // range length
87            DataType::Int64,
88        ];
89
90        create_udf(
91            name,
92            input_types,
93            DataType::Float64,
94            Volatility::Volatile,
95            Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
96        )
97    }
98
99    fn create_function(inputs: &[ColumnarValue]) -> DfResult<Self> {
100        if inputs.len() != 4 {
101            return Err(DataFusionError::Plan(
102                "ExtrapolatedRate function should have 4 inputs".to_string(),
103            ));
104        }
105
106        let range_length_array = extract_array(&inputs[3])?;
107        let range_length_array = range_length_array
108            .as_any()
109            .downcast_ref::<Int64Array>()
110            .ok_or_else(|| {
111                DataFusionError::Execution(format!(
112                    "{}: expect Int64 as range length type, found {}",
113                    Self::func_name(),
114                    range_length_array.data_type()
115                ))
116            })?;
117        if range_length_array.is_empty() || range_length_array.is_null(0) {
118            return Err(DataFusionError::Execution(format!(
119                "{}: range length must contain a non-null Int64 value",
120                Self::func_name()
121            )));
122        }
123        let range_length = range_length_array.value(0);
124
125        Ok(Self::new(range_length))
126    }
127
128    /// Input parameters:
129    /// * 0: timestamp range vector
130    /// * 1: value range vector
131    /// * 2: timestamp vector
132    /// * 3: range length. Range duration in milliseconds
133    fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
134        if input.len() != 4 {
135            return Err(DataFusionError::Plan(
136                "ExtrapolatedRate function should have 4 inputs".to_string(),
137            ));
138        }
139
140        let ts_dict = extract_range_dict(
141            &input[0],
142            Self::func_name(),
143            "timestamp range vector",
144            &DataType::Timestamp(TimeUnit::Millisecond, None),
145        )?;
146        let value_dict = extract_range_dict(
147            &input[1],
148            Self::func_name(),
149            "value range vector",
150            &DataType::Float64,
151        )?;
152        let eval_ts_array = extract_eval_timestamps(&input[2], Self::func_name())?;
153
154        let keys = ts_dict.keys().values();
155        let num_windows = keys.len();
156        if value_dict.keys().len() != num_windows {
157            return Err(DataFusionError::Execution(format!(
158                "{}: timestamp and value ranges should have the same number of windows, found {} and {}",
159                Self::func_name(),
160                num_windows,
161                value_dict.keys().len()
162            )));
163        }
164        if value_dict.keys().values() != keys {
165            return Err(DataFusionError::Execution(format!(
166                "{}: timestamp and value ranges should have the same window layout",
167                Self::func_name()
168            )));
169        }
170        if eval_ts_array.len() != num_windows {
171            return Err(DataFusionError::Execution(format!(
172                "{}: evaluation timestamp vector should have the same number of rows as range inputs, found {} and {}",
173                Self::func_name(),
174                eval_ts_array.len(),
175                num_windows
176            )));
177        }
178
179        let all_timestamps = ts_dict
180            .values()
181            .as_any()
182            .downcast_ref::<TimestampMillisecondArray>()
183            .expect("validated by extract_range_dict")
184            .values();
185        let all_values = value_dict
186            .values()
187            .as_any()
188            .downcast_ref::<Float64Array>()
189            .expect("validated by extract_range_dict")
190            .values();
191        let eval_ts = eval_ts_array.values();
192
193        let mut result_builder = Float64Builder::with_capacity(num_windows);
194        let range_length = self.range_length;
195        let range_length_secs = range_length as f64 / 1000.0;
196
197        let mut counter_correction = 0.0;
198        let mut prev_offset = usize::MAX;
199        let mut prev_length = 0usize;
200
201        for index in 0..num_windows {
202            let (raw_offset, raw_length) = unpack(keys[index]);
203            let offset = raw_offset as usize;
204            let length = raw_length as usize;
205
206            if length < 2 {
207                result_builder.append_null();
208                prev_offset = usize::MAX;
209                continue;
210            }
211
212            let end = offset + length;
213            let first_value = all_values[offset];
214            let last_value = all_values[end - 1];
215
216            let result_value = if IS_COUNTER {
217                // Adjacent normalized windows usually slide forward by one sample. Reuse the
218                // previous window's accumulated reset correction and adjust only the dropped and
219                // newly added edges, falling back to a full scan when the layout changes.
220                if prev_offset != usize::MAX && offset == prev_offset + 1 && length == prev_length {
221                    if all_values[prev_offset + 1] < all_values[prev_offset] {
222                        counter_correction -= all_values[prev_offset];
223                    }
224                    if all_values[end - 1] < all_values[end - 2] {
225                        counter_correction += all_values[end - 2];
226                    }
227                } else {
228                    counter_correction = 0.0;
229                    for pair in all_values[offset..end].windows(2) {
230                        if pair[1] < pair[0] {
231                            counter_correction += pair[0];
232                        }
233                    }
234                }
235                last_value - first_value + counter_correction
236            } else {
237                last_value - first_value
238            };
239
240            prev_offset = offset;
241            prev_length = length;
242
243            let first_ts = all_timestamps[offset];
244            let last_ts = all_timestamps[end - 1];
245            let range_end = eval_ts[index];
246            let range_start = range_end - range_length;
247            let sampled_interval_ms = (last_ts - first_ts) as f64;
248            let average_interval_ms = sampled_interval_ms / (length - 1) as f64;
249            let mut duration_to_start_ms = (first_ts - range_start) as f64;
250            let duration_to_end_ms = (range_end - last_ts) as f64;
251
252            // Counters cannot be negative, so Prometheus allows the extrapolation window to snap
253            // back to the inferred zero point instead of extending into negative values.
254            if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
255                let duration_to_zero = sampled_interval_ms * (first_value / result_value);
256                if duration_to_zero < duration_to_start_ms {
257                    duration_to_start_ms = duration_to_zero;
258                }
259            }
260
261            let extrapolation_threshold = average_interval_ms * 1.1;
262            let mut extrapolated_interval_ms = sampled_interval_ms;
263
264            // Mirror Prometheus extrapolation: extend to the real range boundary when a sample is
265            // close enough, otherwise add half an average sampling interval on that side.
266            if duration_to_start_ms < extrapolation_threshold {
267                extrapolated_interval_ms += duration_to_start_ms;
268            } else {
269                extrapolated_interval_ms += average_interval_ms / 2.0;
270            }
271            if duration_to_end_ms < extrapolation_threshold {
272                extrapolated_interval_ms += duration_to_end_ms;
273            } else {
274                extrapolated_interval_ms += average_interval_ms / 2.0;
275            }
276
277            let mut factor = extrapolated_interval_ms / sampled_interval_ms;
278
279            if IS_RATE {
280                factor /= range_length_secs;
281            }
282
283            result_builder.append_value(result_value * factor);
284        }
285
286        let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
287        Ok(result)
288    }
289}
290
291fn extract_range_dict(
292    columnar_value: &ColumnarValue,
293    func_name: &str,
294    arg_name: &str,
295    expected_value_type: &DataType,
296) -> DfResult<DictionaryArray<Int64Type>> {
297    let array = extract_array(columnar_value)?;
298    let dict = array
299        .as_any()
300        .downcast_ref::<DictionaryArray<Int64Type>>()
301        .ok_or_else(|| {
302            DataFusionError::Execution(format!(
303                "{func_name}: expect {arg_name} as DictionaryArray<Int64>, found {}",
304                array.data_type()
305            ))
306        })?
307        .clone();
308
309    if &dict.value_type() != expected_value_type {
310        return Err(DataFusionError::Execution(format!(
311            "{func_name}: expect {arg_name} values of type {expected_value_type}, found {}",
312            dict.value_type()
313        )));
314    }
315
316    RangeArray::try_new(dict.clone()).map_err(DataFusionError::from)?;
317    Ok(dict)
318}
319
320fn extract_eval_timestamps(
321    columnar_value: &ColumnarValue,
322    func_name: &str,
323) -> DfResult<TimestampMillisecondArray> {
324    let array = extract_array(columnar_value)?;
325    let timestamps = array
326        .as_any()
327        .downcast_ref::<TimestampMillisecondArray>()
328        .ok_or_else(|| {
329            DataFusionError::Execution(format!(
330                "{func_name}: expect evaluation timestamp vector as Timestamp(Millisecond), found {}",
331                array.data_type()
332            ))
333        })?;
334    Ok(timestamps.clone())
335}
336
337// delta
338impl ExtrapolatedRate<false, false> {
339    pub const fn name() -> &'static str {
340        "prom_delta"
341    }
342
343    pub fn scalar_udf() -> ScalarUDF {
344        Self::scalar_udf_with_name(Self::name())
345    }
346}
347
348// rate
349impl ExtrapolatedRate<true, true> {
350    pub const fn name() -> &'static str {
351        "prom_rate"
352    }
353
354    pub fn scalar_udf() -> ScalarUDF {
355        Self::scalar_udf_with_name(Self::name())
356    }
357}
358
359// increase
360impl ExtrapolatedRate<true, false> {
361    pub const fn name() -> &'static str {
362        "prom_increase"
363    }
364
365    pub fn scalar_udf() -> ScalarUDF {
366        Self::scalar_udf_with_name(Self::name())
367    }
368}
369
370impl Display for ExtrapolatedRate<false, false> {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        f.write_str("PromQL Delta Function")
373    }
374}
375
376impl Display for ExtrapolatedRate<true, true> {
377    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378        f.write_str("PromQL Rate Function")
379    }
380}
381
382impl Display for ExtrapolatedRate<true, false> {
383    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384        f.write_str("PromQL Increase Function")
385    }
386}
387
388#[cfg(test)]
389mod test {
390
391    use datafusion::arrow::array::ArrayRef;
392    use datafusion_common::ScalarValue;
393
394    use super::*;
395
396    /// Range length is fixed to 5
397    fn extrapolated_rate_runner<const IS_COUNTER: bool, const IS_RATE: bool>(
398        ts_range: RangeArray,
399        value_range: RangeArray,
400        timestamps: ArrayRef,
401        expected: Vec<f64>,
402    ) {
403        let input = vec![
404            ColumnarValue::Array(Arc::new(ts_range.into_dict())),
405            ColumnarValue::Array(Arc::new(value_range.into_dict())),
406            ColumnarValue::Array(timestamps),
407            ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))),
408        ];
409        let output = extract_array(
410            &ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)
411                .calc(&input)
412                .unwrap(),
413        )
414        .unwrap()
415        .as_any()
416        .downcast_ref::<Float64Array>()
417        .unwrap()
418        .values()
419        .to_vec();
420        assert_eq!(output, expected);
421    }
422
423    fn sample_range_inputs() -> (ColumnarValue, ColumnarValue, ColumnarValue) {
424        let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
425            [1, 2, 3].into_iter().map(Some),
426        ));
427        let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
428        let ranges = [(0, 2), (1, 2)];
429
430        let ts_range = RangeArray::from_ranges(ts_values, ranges).unwrap();
431        let value_range = RangeArray::from_ranges(value_values, ranges).unwrap();
432        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
433            [2, 3].into_iter().map(Some),
434        )) as _;
435
436        (
437            ColumnarValue::Array(Arc::new(ts_range.into_dict())),
438            ColumnarValue::Array(Arc::new(value_range.into_dict())),
439            ColumnarValue::Array(eval_ts),
440        )
441    }
442
443    #[test]
444    fn rate_rejects_wrong_input_arity() {
445        let err = ExtrapolatedRate::<true, true>::new(5)
446            .calc(&[])
447            .unwrap_err();
448
449        assert!(err.to_string().contains("should have 4 inputs"));
450    }
451
452    #[test]
453    fn rate_rejects_non_int64_range_length() {
454        let (ts_range, value_range, eval_ts) = sample_range_inputs();
455
456        let err = ExtrapolatedRate::<true, true>::create_function(&[
457            ts_range,
458            value_range,
459            eval_ts,
460            ColumnarValue::Scalar(ScalarValue::Float64(Some(5.0))),
461        ])
462        .unwrap_err();
463
464        assert!(err.to_string().contains("range length type"));
465    }
466
467    #[test]
468    fn rate_rejects_empty_range_length() {
469        let (ts_range, value_range, eval_ts) = sample_range_inputs();
470
471        let err = ExtrapolatedRate::<true, true>::create_function(&[
472            ts_range,
473            value_range,
474            eval_ts,
475            ColumnarValue::Array(Arc::new(Int64Array::from(Vec::<i64>::new()))),
476        ])
477        .unwrap_err();
478
479        assert!(err.to_string().contains("range length must contain"));
480    }
481
482    #[test]
483    fn rate_rejects_null_range_length() {
484        let (ts_range, value_range, eval_ts) = sample_range_inputs();
485
486        let err = ExtrapolatedRate::<true, true>::create_function(&[
487            ts_range,
488            value_range,
489            eval_ts,
490            ColumnarValue::Array(Arc::new(Int64Array::from(vec![None]))),
491        ])
492        .unwrap_err();
493
494        assert!(err.to_string().contains("range length must contain"));
495    }
496
497    #[test]
498    fn increase_abnormal_input() {
499        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
500            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
501        ));
502        let values_array = Arc::new(Float64Array::from_iter([
503            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
504        ]));
505        let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
506        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
507        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
508        let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
509            Some(2),
510            Some(5),
511            Some(2),
512            Some(6),
513            Some(9),
514            None,
515        ])) as _;
516        extrapolated_rate_runner::<true, false>(
517            ts_range,
518            value_range,
519            timestamps,
520            vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0],
521        );
522    }
523
524    #[test]
525    fn increase_normal_input() {
526        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
527            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
528        ));
529        let values_array = Arc::new(Float64Array::from_iter([
530            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
531        ]));
532        let ranges = [
533            (0, 2),
534            (1, 2),
535            (2, 2),
536            (3, 2),
537            (4, 2),
538            (5, 2),
539            (6, 2),
540            (7, 2),
541        ];
542        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
543        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
544        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
545            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
546        )) as _;
547        extrapolated_rate_runner::<true, false>(
548            ts_range,
549            value_range,
550            timestamps,
551            // `2.0` is because that `duration_to_zero` less than `extrapolation_threshold`
552            vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
553        );
554    }
555
556    #[test]
557    fn increase_short_input() {
558        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
559            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
560        ));
561        let values_array = Arc::new(Float64Array::from_iter([
562            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
563        ]));
564        let ranges = [
565            (0, 1),
566            (1, 0),
567            (2, 1),
568            (3, 0),
569            (4, 3),
570            (5, 1),
571            (6, 0),
572            (7, 2),
573        ];
574        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
575        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
576        let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
577            Some(1),
578            None,
579            Some(3),
580            None,
581            Some(7),
582            Some(6),
583            None,
584            Some(9),
585        ])) as _;
586        extrapolated_rate_runner::<true, false>(
587            ts_range,
588            value_range,
589            timestamps,
590            vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5],
591        );
592    }
593
594    #[test]
595    fn increase_counter_reset() {
596        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
597            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
598        ));
599        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
600        let values_array = Arc::new(Float64Array::from_iter([
601            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
602        ]));
603        let ranges = [
604            (0, 2),
605            (1, 2),
606            (2, 2),
607            (3, 2),
608            (4, 2),
609            (5, 2),
610            (6, 2),
611            (7, 2),
612        ];
613        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
614        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
615        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
616            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
617        )) as _;
618        extrapolated_rate_runner::<true, false>(
619            ts_range,
620            value_range,
621            timestamps,
622            // that two `2.0` is because `duration_to_start` are shrunk to
623            // `duration_to_zero`, and causes `duration_to_zero` less than
624            // `extrapolation_threshold`.
625            vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5],
626        );
627    }
628
629    #[test]
630    fn increase_counter_reset_wide_windows() {
631        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
632            [1, 2, 3, 4, 5, 6, 7].into_iter().map(Some),
633        ));
634        let values_array = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 2.0]));
635        let ranges = [(0, 4), (1, 4), (2, 4), (3, 4)];
636        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
637        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
638        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
639            [4, 5, 6, 7].into_iter().map(Some),
640        )) as _;
641        extrapolated_rate_runner::<true, false>(
642            ts_range,
643            value_range,
644            timestamps,
645            vec![4.0, 3.5, 3.5, 4.0],
646        );
647    }
648
649    #[test]
650    fn rate_rejects_non_array_timestamp_ranges() {
651        let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
652        let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
653        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
654
655        let err = ExtrapolatedRate::<true, true>::new(5)
656            .calc(&[
657                ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
658                ColumnarValue::Array(Arc::new(value_range.into_dict())),
659                ColumnarValue::Array(eval_ts),
660                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
661            ])
662            .unwrap_err();
663
664        assert!(err.to_string().contains("timestamp range vector"));
665    }
666
667    #[test]
668    fn rate_rejects_non_timestamp_timestamp_range_values() {
669        let ts_values = Arc::new(Int64Array::from_iter([1, 2]));
670        let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
671        let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
672        let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
673        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
674
675        let err = ExtrapolatedRate::<true, true>::new(5)
676            .calc(&[
677                ColumnarValue::Array(Arc::new(ts_range.into_dict())),
678                ColumnarValue::Array(Arc::new(value_range.into_dict())),
679                ColumnarValue::Array(eval_ts),
680                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
681            ])
682            .unwrap_err();
683
684        assert!(err.to_string().contains("values of type Timestamp"));
685    }
686
687    #[test]
688    fn rate_rejects_non_float_value_range_values() {
689        let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
690            [1, 2].into_iter().map(Some),
691        ));
692        let value_values = Arc::new(Int64Array::from_iter([1, 2]));
693        let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
694        let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
695        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
696
697        let err = ExtrapolatedRate::<true, true>::new(5)
698            .calc(&[
699                ColumnarValue::Array(Arc::new(ts_range.into_dict())),
700                ColumnarValue::Array(Arc::new(value_range.into_dict())),
701                ColumnarValue::Array(eval_ts),
702                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
703            ])
704            .unwrap_err();
705
706        assert!(
707            err.to_string()
708                .contains("value range vector values of type Float64")
709        );
710    }
711
712    #[test]
713    fn rate_rejects_mismatched_range_counts() {
714        let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
715            [1, 2, 3].into_iter().map(Some),
716        ));
717        let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
718        let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
719        let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
720        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
721            [2, 3].into_iter().map(Some),
722        ));
723
724        let err = ExtrapolatedRate::<true, true>::new(5)
725            .calc(&[
726                ColumnarValue::Array(Arc::new(ts_range.into_dict())),
727                ColumnarValue::Array(Arc::new(value_range.into_dict())),
728                ColumnarValue::Array(eval_ts),
729                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
730            ])
731            .unwrap_err();
732
733        assert!(err.to_string().contains("same number of windows"));
734    }
735
736    #[test]
737    fn rate_rejects_mismatched_range_layouts() {
738        let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
739            [1, 2, 3, 4].into_iter().map(Some),
740        ));
741        let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 4.0]));
742        let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
743        let value_range = RangeArray::from_ranges(value_values, [(0, 2), (2, 2)]).unwrap();
744        let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
745            [2, 4].into_iter().map(Some),
746        ));
747
748        let err = ExtrapolatedRate::<true, true>::new(5)
749            .calc(&[
750                ColumnarValue::Array(Arc::new(ts_range.into_dict())),
751                ColumnarValue::Array(Arc::new(value_range.into_dict())),
752                ColumnarValue::Array(eval_ts),
753                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
754            ])
755            .unwrap_err();
756
757        assert!(err.to_string().contains("same window layout"));
758    }
759
760    #[test]
761    fn rate_rejects_non_timestamp_eval_vector() {
762        let (ts_range, value_range, _) = sample_range_inputs();
763
764        let err = ExtrapolatedRate::<true, true>::new(5)
765            .calc(&[
766                ts_range,
767                value_range,
768                ColumnarValue::Array(Arc::new(Float64Array::from_iter([2.0, 3.0]))),
769                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
770            ])
771            .unwrap_err();
772
773        assert!(err.to_string().contains("evaluation timestamp vector"));
774    }
775
776    #[test]
777    fn rate_rejects_mismatched_eval_timestamp_rows() {
778        let (ts_range, value_range, _) = sample_range_inputs();
779
780        let err = ExtrapolatedRate::<true, true>::new(5)
781            .calc(&[
782                ts_range,
783                value_range,
784                ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from_iter([Some(2)]))),
785                ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
786            ])
787            .unwrap_err();
788
789        assert!(err.to_string().contains("same number of rows"));
790    }
791
792    #[test]
793    fn rate_counter_reset() {
794        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
795            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
796        ));
797        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
798        let values_array = Arc::new(Float64Array::from_iter([
799            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
800        ]));
801        let ranges = [
802            (0, 2),
803            (1, 2),
804            (2, 2),
805            (3, 2),
806            (4, 2),
807            (5, 2),
808            (6, 2),
809            (7, 2),
810        ];
811        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
812        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
813        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
814            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
815        )) as _;
816        extrapolated_rate_runner::<true, true>(
817            ts_range,
818            value_range,
819            timestamps,
820            vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0],
821        );
822    }
823
824    #[test]
825    fn rate_normal_input() {
826        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
827            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
828        ));
829        let values_array = Arc::new(Float64Array::from_iter([
830            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
831        ]));
832        let ranges = [
833            (0, 2),
834            (1, 2),
835            (2, 2),
836            (3, 2),
837            (4, 2),
838            (5, 2),
839            (6, 2),
840            (7, 2),
841        ];
842        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
843        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
844        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
845            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
846        )) as _;
847        extrapolated_rate_runner::<true, true>(
848            ts_range,
849            value_range,
850            timestamps,
851            vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0],
852        );
853    }
854
855    #[test]
856    fn delta_counter_reset() {
857        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
858            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
859        ));
860        // this series should be treated like [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
861        let values_array = Arc::new(Float64Array::from_iter([
862            1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
863        ]));
864        let ranges = [
865            (0, 2),
866            (1, 2),
867            (2, 2),
868            (3, 2),
869            (4, 2),
870            (5, 2),
871            (6, 2),
872            (7, 2),
873        ];
874        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
875        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
876        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
877            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
878        )) as _;
879        extrapolated_rate_runner::<false, false>(
880            ts_range,
881            value_range,
882            timestamps,
883            // delta doesn't handle counter reset, thus there is a negative value
884            vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5],
885        );
886    }
887
888    #[test]
889    fn delta_normal_input() {
890        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
891            [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
892        ));
893        let values_array = Arc::new(Float64Array::from_iter([
894            1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
895        ]));
896        let ranges = [
897            (0, 2),
898            (1, 2),
899            (2, 2),
900            (3, 2),
901            (4, 2),
902            (5, 2),
903            (6, 2),
904            (7, 2),
905        ];
906        let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
907        let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
908        let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
909            [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
910        )) as _;
911        extrapolated_rate_runner::<false, false>(
912            ts_range,
913            value_range,
914            timestamps,
915            vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
916        );
917    }
918}