1use std::fmt::Display;
16use std::sync::Arc;
17
18use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
19use datafusion::arrow::datatypes::TimeUnit;
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::{ScalarUDF, Volatility};
22use datafusion::physical_plan::ColumnarValue;
23use datafusion_expr::create_udf;
24use datatypes::arrow::array::Array;
25use datatypes::arrow::datatypes::DataType;
26
27use crate::error;
28use crate::functions::extract_array;
29use crate::range_array::RangeArray;
30
31#[derive(Debug)]
34pub struct IDelta<const IS_RATE: bool> {}
35
36impl<const IS_RATE: bool> IDelta<IS_RATE> {
37 pub const fn name() -> &'static str {
38 if IS_RATE {
39 "prom_irate"
40 } else {
41 "prom_idelta"
42 }
43 }
44
45 pub fn scalar_udf() -> ScalarUDF {
46 create_udf(
47 Self::name(),
48 Self::input_type(),
49 Self::return_type(),
50 Volatility::Volatile,
51 Arc::new(Self::calc) as _,
52 )
53 }
54
55 fn input_type() -> Vec<DataType> {
57 vec![
58 RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
59 RangeArray::convert_data_type(DataType::Float64),
60 ]
61 }
62
63 fn return_type() -> DataType {
64 DataType::Float64
65 }
66
67 fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
68 assert_eq!(input.len(), 2);
70 let ts_array = extract_array(&input[0])?;
71 let value_array = extract_array(&input[1])?;
72
73 let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
74 let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
75 error::ensure(
76 ts_range.len() == value_range.len(),
77 DataFusionError::Execution(format!(
78 "{}: input arrays should have the same length, found {} and {}",
79 Self::name(),
80 ts_range.len(),
81 value_range.len()
82 )),
83 )?;
84 error::ensure(
85 ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None),
86 DataFusionError::Execution(format!(
87 "{}: expect TimestampMillisecond as time index array's type, found {}",
88 Self::name(),
89 ts_range.value_type()
90 )),
91 )?;
92 error::ensure(
93 value_range.value_type() == DataType::Float64,
94 DataFusionError::Execution(format!(
95 "{}: expect Float64 as value array's type, found {}",
96 Self::name(),
97 value_range.value_type()
98 )),
99 )?;
100
101 let mut result_array = Vec::with_capacity(ts_range.len());
103
104 for index in 0..ts_range.len() {
105 let timestamps = ts_range.get(index).unwrap();
106 let timestamps = timestamps
107 .as_any()
108 .downcast_ref::<TimestampMillisecondArray>()
109 .unwrap()
110 .values();
111
112 let values = value_range.get(index).unwrap();
113 let values = values
114 .as_any()
115 .downcast_ref::<Float64Array>()
116 .unwrap()
117 .values();
118 error::ensure(
119 timestamps.len() == values.len(),
120 DataFusionError::Execution(format!(
121 "{}: input arrays should have the same length, found {} and {}",
122 Self::name(),
123 timestamps.len(),
124 values.len()
125 )),
126 )?;
127
128 let len = timestamps.len();
129 if len < 2 {
130 result_array.push(None);
131 continue;
132 }
133
134 if !IS_RATE {
136 result_array.push(Some(values[len - 1] - values[len - 2]));
137 continue;
138 }
139
140 let sampled_interval = (timestamps[len - 1] - timestamps[len - 2]) as f64 / 1000.0;
142 let last_value = values[len - 1];
143 let prev_value = values[len - 2];
144 let result_value = if last_value < prev_value {
145 last_value
147 } else {
148 last_value - prev_value
149 };
150
151 result_array.push(Some(result_value / sampled_interval as f64));
152 }
153
154 let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
155 Ok(result)
156 }
157}
158
159impl<const IS_RATE: bool> Display for IDelta<IS_RATE> {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",)
162 }
163}
164
165#[cfg(test)]
166mod test {
167
168 use super::*;
169 use crate::functions::test_util::simple_range_udf_runner;
170
171 #[test]
172 fn basic_idelta_and_irate() {
173 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
174 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000]
175 .into_iter()
176 .map(Some),
177 ));
178 let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
179
180 let values_array = Arc::new(Float64Array::from_iter([
181 1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0,
182 ]));
183 let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
184
185 let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap();
187 let value_range_array =
188 RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap();
189 simple_range_udf_runner(
190 IDelta::<false>::scalar_udf(),
191 ts_range_array,
192 value_range_array,
193 vec![],
194 vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None],
195 );
196
197 let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap();
199 let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap();
200 simple_range_udf_runner(
201 IDelta::<true>::scalar_udf(),
202 ts_range_array,
203 value_range_array,
204 vec![],
205 vec![Some(0.5), Some(0.0), None, Some(3.0), None, None],
207 );
208 }
209}