promql/functions/
idelta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::sync::Arc;
17
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::arrow::datatypes::TimeUnit;
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::{ScalarUDF, Volatility};
22use datafusion::physical_plan::ColumnarValue;
23use datafusion_expr::create_udf;
24use datatypes::arrow::array::Array;
25use datatypes::arrow::datatypes::DataType;
26
27use crate::error;
28use crate::functions::extract_array;
29use crate::range_array::RangeArray;
30
31/// The `funcIdelta` in Promql,
32/// from <https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L235>
33#[derive(Debug)]
34pub struct IDelta<const IS_RATE: bool> {}
35
36impl<const IS_RATE: bool> IDelta<IS_RATE> {
37    pub const fn name() -> &'static str {
38        if IS_RATE {
39            "prom_irate"
40        } else {
41            "prom_idelta"
42        }
43    }
44
45    pub fn scalar_udf() -> ScalarUDF {
46        create_udf(
47            Self::name(),
48            Self::input_type(),
49            Self::return_type(),
50            Volatility::Volatile,
51            Arc::new(Self::calc) as _,
52        )
53    }
54
55    // time index column and value column
56    fn input_type() -> Vec<DataType> {
57        vec![
58            RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
59            RangeArray::convert_data_type(DataType::Float64),
60        ]
61    }
62
63    fn return_type() -> DataType {
64        DataType::Float64
65    }
66
67    fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
68        // construct matrix from input
69        assert_eq!(input.len(), 2);
70        let ts_array = extract_array(&input[0])?;
71        let value_array = extract_array(&input[1])?;
72
73        let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
74        let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
75        error::ensure(
76            ts_range.len() == value_range.len(),
77            DataFusionError::Execution(format!(
78                "{}: input arrays should have the same length, found {} and {}",
79                Self::name(),
80                ts_range.len(),
81                value_range.len()
82            )),
83        )?;
84        error::ensure(
85            ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None),
86            DataFusionError::Execution(format!(
87                "{}: expect TimestampMillisecond as time index array's type, found {}",
88                Self::name(),
89                ts_range.value_type()
90            )),
91        )?;
92        error::ensure(
93            value_range.value_type() == DataType::Float64,
94            DataFusionError::Execution(format!(
95                "{}: expect Float64 as value array's type, found {}",
96                Self::name(),
97                value_range.value_type()
98            )),
99        )?;
100
101        // calculation
102        let mut result_array = Vec::with_capacity(ts_range.len());
103
104        for index in 0..ts_range.len() {
105            let timestamps = ts_range.get(index).unwrap();
106            let timestamps = timestamps
107                .as_any()
108                .downcast_ref::<TimestampMillisecondArray>()
109                .unwrap()
110                .values();
111
112            let values = value_range.get(index).unwrap();
113            let values = values
114                .as_any()
115                .downcast_ref::<Float64Array>()
116                .unwrap()
117                .values();
118            error::ensure(
119                timestamps.len() == values.len(),
120                DataFusionError::Execution(format!(
121                    "{}: input arrays should have the same length, found {} and {}",
122                    Self::name(),
123                    timestamps.len(),
124                    values.len()
125                )),
126            )?;
127
128            let len = timestamps.len();
129            if len < 2 {
130                result_array.push(None);
131                continue;
132            }
133
134            // if is delta
135            if !IS_RATE {
136                result_array.push(Some(values[len - 1] - values[len - 2]));
137                continue;
138            }
139
140            // else is rate
141            let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0;
142            let last_value = values[len - 1];
143            let prev_value = values[len - 2];
144            let result_value = if last_value < prev_value {
145                // counter reset
146                last_value
147            } else {
148                last_value - prev_value
149            };
150
151            result_array.push(Some(result_value / sampled_interval as f64));
152        }
153
154        let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
155        Ok(result)
156    }
157}
158
159impl<const IS_RATE: bool> Display for IDelta<IS_RATE> {
160    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161        write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",)
162    }
163}
164
165#[cfg(test)]
166mod test {
167
168    use super::*;
169    use crate::functions::test_util::simple_range_udf_runner;
170
171    #[test]
172    fn basic_idelta_and_irate() {
173        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
174            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000]
175                .into_iter()
176                .map(Some),
177        ));
178        let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
179
180        let values_array = Arc::new(Float64Array::from_iter([
181            1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0,
182        ]));
183        let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
184
185        // test idelta
186        let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap();
187        let value_range_array =
188            RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap();
189        simple_range_udf_runner(
190            IDelta::<false>::scalar_udf(),
191            ts_range_array,
192            value_range_array,
193            vec![],
194            vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None],
195        );
196
197        // test irate
198        let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap();
199        let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap();
200        simple_range_udf_runner(
201            IDelta::<true>::scalar_udf(),
202            ts_range_array,
203            value_range_array,
204            vec![],
205            // the second point represent counter reset
206            vec![Some(0.5), Some(0.0), None, Some(3.0), None, None],
207        );
208    }
209}