Skip to main content

promql/functions/
idelta.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16use std::sync::Arc;
17
18use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
19use datafusion::arrow::datatypes::TimeUnit;
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::{ScalarUDF, Volatility};
22use datafusion::physical_plan::ColumnarValue;
23use datafusion_expr::create_udf;
24use datatypes::arrow::array::Array;
25use datatypes::arrow::datatypes::DataType;
26
27use crate::error;
28use crate::functions::extract_array;
29use crate::range_array::RangeArray;
30
31/// The `funcIdelta` in Promql,
32/// from <https://github.com/prometheus/prometheus/blob/6bdecf377cea8e856509914f35234e948c4fcb80/promql/functions.go#L235>
33#[derive(Debug)]
34pub struct IDelta<const IS_RATE: bool> {}
35
36impl<const IS_RATE: bool> IDelta<IS_RATE> {
37    pub const fn name() -> &'static str {
38        if IS_RATE { "prom_irate" } else { "prom_idelta" }
39    }
40
41    pub fn scalar_udf() -> ScalarUDF {
42        create_udf(
43            Self::name(),
44            Self::input_type(),
45            Self::return_type(),
46            Volatility::Volatile,
47            Arc::new(Self::calc) as _,
48        )
49    }
50
51    // time index column and value column
52    fn input_type() -> Vec<DataType> {
53        vec![
54            RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
55            RangeArray::convert_data_type(DataType::Float64),
56        ]
57    }
58
59    fn return_type() -> DataType {
60        DataType::Float64
61    }
62
63    fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
64        // construct matrix from input
65        assert_eq!(input.len(), 2);
66        let ts_array = extract_array(&input[0])?;
67        let value_array = extract_array(&input[1])?;
68
69        let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
70        let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
71        error::ensure(
72            ts_range.len() == value_range.len(),
73            DataFusionError::Execution(format!(
74                "{}: input arrays should have the same length, found {} and {}",
75                Self::name(),
76                ts_range.len(),
77                value_range.len()
78            )),
79        )?;
80        error::ensure(
81            ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None),
82            DataFusionError::Execution(format!(
83                "{}: expect TimestampMillisecond as time index array's type, found {}",
84                Self::name(),
85                ts_range.value_type()
86            )),
87        )?;
88        error::ensure(
89            value_range.value_type() == DataType::Float64,
90            DataFusionError::Execution(format!(
91                "{}: expect Float64 as value array's type, found {}",
92                Self::name(),
93                value_range.value_type()
94            )),
95        )?;
96
97        let ts_values = ts_range.values();
98        let ts_values = ts_values
99            .as_any()
100            .downcast_ref::<TimestampMillisecondArray>()
101            .unwrap()
102            .values();
103
104        let value_values = value_range.values();
105        let value_values = value_values
106            .as_any()
107            .downcast_ref::<Float64Array>()
108            .unwrap()
109            .values();
110
111        let mut result_builder = Float64Builder::with_capacity(ts_range.len());
112
113        for index in 0..ts_range.len() {
114            let (ts_offset, len) = ts_range.get_offset_length(index).unwrap();
115            let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
116            error::ensure(
117                len == value_len,
118                DataFusionError::Execution(format!(
119                    "{}: input arrays should have the same length, found {} and {}",
120                    Self::name(),
121                    len,
122                    value_len
123                )),
124            )?;
125            if len < 2 {
126                result_builder.append_null();
127                continue;
128            }
129
130            let last_offset = ts_offset + len - 1;
131            let prev_offset = last_offset - 1;
132            let sampled_interval =
133                (ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0;
134
135            let last_value_offset = value_offset + len - 1;
136            let prev_value_offset = last_value_offset - 1;
137            let last_value = value_values[last_value_offset];
138            let prev_value = value_values[prev_value_offset];
139
140            if !IS_RATE {
141                result_builder.append_value(last_value - prev_value);
142                continue;
143            }
144
145            let result_value = if last_value < prev_value {
146                // counter reset
147                last_value
148            } else {
149                last_value - prev_value
150            };
151
152            result_builder.append_value(result_value / sampled_interval);
153        }
154
155        let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
156        Ok(result)
157    }
158}
159
160impl<const IS_RATE: bool> Display for IDelta<IS_RATE> {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",)
163    }
164}
165
166#[cfg(test)]
167mod test {
168
169    use super::*;
170    use crate::functions::test_util::simple_range_udf_runner;
171
172    #[test]
173    fn basic_idelta_and_irate() {
174        let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
175            [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000]
176                .into_iter()
177                .map(Some),
178        ));
179        let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
180
181        let values_array = Arc::new(Float64Array::from_iter([
182            1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0,
183        ]));
184        let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
185
186        // test idelta
187        let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap();
188        let value_range_array =
189            RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap();
190        simple_range_udf_runner(
191            IDelta::<false>::scalar_udf(),
192            ts_range_array,
193            value_range_array,
194            vec![],
195            vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None],
196        );
197
198        // test irate
199        let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap();
200        let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap();
201        simple_range_udf_runner(
202            IDelta::<true>::scalar_udf(),
203            ts_range_array,
204            value_range_array,
205            vec![],
206            // the second point represent counter reset
207            vec![Some(0.5), Some(0.0), None, Some(3.0), None, None],
208        );
209    }
210}