1use std::fmt::Display;
16use std::sync::Arc;
17
18use datafusion::arrow::array::{Float64Array, Float64Builder, TimestampMillisecondArray};
19use datafusion::arrow::datatypes::TimeUnit;
20use datafusion::common::DataFusionError;
21use datafusion::logical_expr::{ScalarUDF, Volatility};
22use datafusion::physical_plan::ColumnarValue;
23use datafusion_expr::create_udf;
24use datatypes::arrow::array::Array;
25use datatypes::arrow::datatypes::DataType;
26
27use crate::error;
28use crate::functions::extract_array;
29use crate::range_array::RangeArray;
30
31#[derive(Debug)]
34pub struct IDelta<const IS_RATE: bool> {}
35
36impl<const IS_RATE: bool> IDelta<IS_RATE> {
37 pub const fn name() -> &'static str {
38 if IS_RATE { "prom_irate" } else { "prom_idelta" }
39 }
40
41 pub fn scalar_udf() -> ScalarUDF {
42 create_udf(
43 Self::name(),
44 Self::input_type(),
45 Self::return_type(),
46 Volatility::Volatile,
47 Arc::new(Self::calc) as _,
48 )
49 }
50
51 fn input_type() -> Vec<DataType> {
53 vec![
54 RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
55 RangeArray::convert_data_type(DataType::Float64),
56 ]
57 }
58
59 fn return_type() -> DataType {
60 DataType::Float64
61 }
62
63 fn calc(input: &[ColumnarValue]) -> Result<ColumnarValue, DataFusionError> {
64 assert_eq!(input.len(), 2);
66 let ts_array = extract_array(&input[0])?;
67 let value_array = extract_array(&input[1])?;
68
69 let ts_range: RangeArray = RangeArray::try_new(ts_array.to_data().into())?;
70 let value_range: RangeArray = RangeArray::try_new(value_array.to_data().into())?;
71 error::ensure(
72 ts_range.len() == value_range.len(),
73 DataFusionError::Execution(format!(
74 "{}: input arrays should have the same length, found {} and {}",
75 Self::name(),
76 ts_range.len(),
77 value_range.len()
78 )),
79 )?;
80 error::ensure(
81 ts_range.value_type() == DataType::Timestamp(TimeUnit::Millisecond, None),
82 DataFusionError::Execution(format!(
83 "{}: expect TimestampMillisecond as time index array's type, found {}",
84 Self::name(),
85 ts_range.value_type()
86 )),
87 )?;
88 error::ensure(
89 value_range.value_type() == DataType::Float64,
90 DataFusionError::Execution(format!(
91 "{}: expect Float64 as value array's type, found {}",
92 Self::name(),
93 value_range.value_type()
94 )),
95 )?;
96
97 let ts_values = ts_range.values();
98 let ts_values = ts_values
99 .as_any()
100 .downcast_ref::<TimestampMillisecondArray>()
101 .unwrap()
102 .values();
103
104 let value_values = value_range.values();
105 let value_values = value_values
106 .as_any()
107 .downcast_ref::<Float64Array>()
108 .unwrap()
109 .values();
110
111 let mut result_builder = Float64Builder::with_capacity(ts_range.len());
112
113 for index in 0..ts_range.len() {
114 let (ts_offset, len) = ts_range.get_offset_length(index).unwrap();
115 let (value_offset, value_len) = value_range.get_offset_length(index).unwrap();
116 error::ensure(
117 len == value_len,
118 DataFusionError::Execution(format!(
119 "{}: input arrays should have the same length, found {} and {}",
120 Self::name(),
121 len,
122 value_len
123 )),
124 )?;
125 if len < 2 {
126 result_builder.append_null();
127 continue;
128 }
129
130 let last_offset = ts_offset + len - 1;
131 let prev_offset = last_offset - 1;
132 let sampled_interval =
133 (ts_values[last_offset] - ts_values[prev_offset]) as f64 / 1000.0;
134
135 let last_value_offset = value_offset + len - 1;
136 let prev_value_offset = last_value_offset - 1;
137 let last_value = value_values[last_value_offset];
138 let prev_value = value_values[prev_value_offset];
139
140 if !IS_RATE {
141 result_builder.append_value(last_value - prev_value);
142 continue;
143 }
144
145 let result_value = if last_value < prev_value {
146 last_value
148 } else {
149 last_value - prev_value
150 };
151
152 result_builder.append_value(result_value / sampled_interval);
153 }
154
155 let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
156 Ok(result)
157 }
158}
159
160impl<const IS_RATE: bool> Display for IDelta<IS_RATE> {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 write!(f, "PromQL Idelta Function (is_rate: {IS_RATE})",)
163 }
164}
165
166#[cfg(test)]
167mod test {
168
169 use super::*;
170 use crate::functions::test_util::simple_range_udf_runner;
171
172 #[test]
173 fn basic_idelta_and_irate() {
174 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
175 [1000i64, 3000, 5000, 7000, 9000, 11000, 13000, 15000, 17000]
176 .into_iter()
177 .map(Some),
178 ));
179 let ts_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
180
181 let values_array = Arc::new(Float64Array::from_iter([
182 1.0, 2.0, 3.0, 5.0, 0.0, 6.0, 7.0, 8.0, 9.0,
183 ]));
184 let values_ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
185
186 let ts_range_array = RangeArray::from_ranges(ts_array.clone(), ts_ranges).unwrap();
188 let value_range_array =
189 RangeArray::from_ranges(values_array.clone(), values_ranges).unwrap();
190 simple_range_udf_runner(
191 IDelta::<false>::scalar_udf(),
192 ts_range_array,
193 value_range_array,
194 vec![],
195 vec![Some(1.0), Some(-5.0), None, Some(6.0), None, None],
196 );
197
198 let ts_range_array = RangeArray::from_ranges(ts_array, ts_ranges).unwrap();
200 let value_range_array = RangeArray::from_ranges(values_array, values_ranges).unwrap();
201 simple_range_udf_runner(
202 IDelta::<true>::scalar_udf(),
203 ts_range_array,
204 value_range_array,
205 vec![],
206 vec![Some(0.5), Some(0.0), None, Some(3.0), None, None],
208 );
209 }
210}