Skip to main content

promql/
functions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod aggr_over_time;
16mod changes;
17mod deriv;
18mod double_exponential_smoothing;
19mod extrapolate_rate;
20mod idelta;
21mod predict_linear;
22mod quantile;
23mod quantile_aggr;
24mod resets;
25mod round;
26#[cfg(test)]
27mod test_util;
28
29pub use aggr_over_time::{
30    AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime,
31    PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
32};
33pub use changes::Changes;
34use datafusion::arrow::array::{
35    ArrayRef, DictionaryArray, Float64Array, TimestampMillisecondArray,
36};
37use datafusion::error::DataFusionError;
38use datafusion::physical_plan::ColumnarValue;
39use datatypes::arrow::array::Array;
40use datatypes::arrow::datatypes::Int64Type;
41pub use deriv::Deriv;
42pub use double_exponential_smoothing::DoubleExponentialSmoothing;
43pub use extrapolate_rate::{Delta, Increase, Rate};
44pub use idelta::IDelta;
45pub use predict_linear::PredictLinear;
46pub use quantile::QuantileOverTime;
47pub use quantile_aggr::{QUANTILE_NAME, quantile_udaf};
48pub use resets::Resets;
49pub use round::Round;
50
51use crate::range_array::RangeArray;
52
53/// Extracts an array from a `ColumnarValue`.
54///
55/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
56pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
57    match columnar_value {
58        ColumnarValue::Array(array) => Ok(array.clone()),
59        ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
60    }
61}
62
63/// Extracts a validated [RangeArray] from a [ColumnarValue].
64pub(crate) fn extract_range_array(
65    columnar_value: &ColumnarValue,
66) -> Result<RangeArray, DataFusionError> {
67    let array = extract_array(columnar_value)?;
68    let dict = array
69        .as_any()
70        .downcast_ref::<DictionaryArray<Int64Type>>()
71        .ok_or_else(|| {
72            DataFusionError::Execution(format!(
73                "expected DictionaryArray<Int64>, found {}",
74                array.data_type()
75            ))
76        })?
77        .clone();
78    RangeArray::try_new(dict).map_err(DataFusionError::from)
79}
80
81/// compensation(Kahan) summation algorithm - a technique for reducing the numerical error
82/// in floating-point arithmetic. The algorithm also includes the modification ("Neumaier improvement")
83/// that reduces the numerical error further in cases
84/// where the numbers being summed have a large difference in magnitude
85/// Prometheus's implementation:
86/// <https://github.com/prometheus/prometheus/blob/f55ab2217984770aa1eecd0f2d5f54580029b1c0/promql/functions.go#L782>
87pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) -> (f64, f64) {
88    let new_sum = sum + inc;
89    if sum.abs() >= inc.abs() {
90        compensation += (sum - new_sum) + inc;
91    } else {
92        compensation += (inc - new_sum) + sum;
93    }
94    (new_sum, compensation)
95}
96
97/// linear_regression performs a least-square linear regression analysis on the
98/// times and values. It return the slope and intercept based on times and values.
99/// Prometheus's implementation: <https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837>
100pub(crate) fn linear_regression(
101    times: &TimestampMillisecondArray,
102    values: &Float64Array,
103    intercept_time: i64,
104) -> (Option<f64>, Option<f64>) {
105    linear_regression_slice(times.values(), values, 0, values.len(), intercept_time)
106}
107
108pub(crate) fn linear_regression_slice(
109    times: &[i64],
110    values: &Float64Array,
111    offset: usize,
112    len: usize,
113    intercept_time: i64,
114) -> (Option<f64>, Option<f64>) {
115    linear_regression_slices(times, offset, values, offset, len, intercept_time)
116}
117
118pub(crate) fn linear_regression_slices(
119    times: &[i64],
120    time_offset: usize,
121    values: &Float64Array,
122    value_offset: usize,
123    len: usize,
124    intercept_time: i64,
125) -> (Option<f64>, Option<f64>) {
126    let raw_values = values.values();
127    let has_nulls = values.null_count() > 0;
128    let mut count: f64 = 0.0;
129    let mut sum_x: f64 = 0.0;
130    let mut sum_y: f64 = 0.0;
131    let mut sum_xy: f64 = 0.0;
132    let mut sum_x2: f64 = 0.0;
133    let mut comp_x: f64 = 0.0;
134    let mut comp_y: f64 = 0.0;
135    let mut comp_xy: f64 = 0.0;
136    let mut comp_x2: f64 = 0.0;
137
138    let mut const_y = true;
139    let mut init_y = None;
140
141    for i in 0..len {
142        let time_idx = time_offset + i;
143        let value_idx = value_offset + i;
144        if has_nulls && values.is_null(value_idx) {
145            continue;
146        }
147        let value = raw_values[value_idx];
148        let time = times[time_idx] as f64;
149        let initial = init_y.get_or_insert(value);
150        if const_y && count > 0.0 && value != *initial {
151            const_y = false;
152        }
153        count += 1.0;
154        let x = (time - intercept_time as f64) / 1e3f64;
155        (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
156        (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
157        (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
158        (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
159    }
160
161    if count < 2.0 {
162        return (None, None);
163    }
164
165    if const_y {
166        let init_y = init_y.unwrap();
167        if !init_y.is_finite() {
168            return (None, None);
169        }
170        return (Some(0.0), Some(init_y));
171    }
172
173    sum_x += comp_x;
174    sum_y += comp_y;
175    sum_xy += comp_xy;
176    sum_x2 += comp_x2;
177
178    let cov_xy = sum_xy - sum_x * sum_y / count;
179    let var_x = sum_x2 - sum_x * sum_x / count;
180
181    let slope = cov_xy / var_x;
182    let intercept = sum_y / count - slope * sum_x / count;
183
184    (Some(slope), Some(intercept))
185}
186
187#[cfg(test)]
188mod test {
189    use std::sync::Arc;
190
191    use datafusion::physical_plan::ColumnarValue;
192    use datatypes::arrow::array::Int64Array;
193    use datatypes::arrow::datatypes::Int64Type;
194
195    use super::*;
196    use crate::range_array::RangeArray;
197
198    #[test]
199    fn calculate_linear_regression_none() {
200        let ts_array = TimestampMillisecondArray::from_iter(
201            [
202                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
203            ]
204            .into_iter()
205            .map(Some),
206        );
207        let values_array = Float64Array::from_iter([
208            1.0 / 0.0,
209            1.0 / 0.0,
210            1.0 / 0.0,
211            1.0 / 0.0,
212            1.0 / 0.0,
213            1.0 / 0.0,
214            1.0 / 0.0,
215            1.0 / 0.0,
216            1.0 / 0.0,
217            1.0 / 0.0,
218        ]);
219        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
220        assert_eq!(slope, None);
221        assert_eq!(intercept, None);
222    }
223
224    #[test]
225    fn calculate_linear_regression_value_is_const() {
226        let ts_array = TimestampMillisecondArray::from_iter(
227            [
228                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
229            ]
230            .into_iter()
231            .map(Some),
232        );
233        let values_array =
234            Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]);
235        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
236        assert_eq!(slope, Some(0.0));
237        assert_eq!(intercept, Some(10.0));
238    }
239
240    #[test]
241    fn calculate_linear_regression() {
242        let ts_array = TimestampMillisecondArray::from_iter(
243            [
244                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
245            ]
246            .into_iter()
247            .map(Some),
248        );
249        let values_array = Float64Array::from_iter([
250            0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
251        ]);
252        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
253        assert_eq!(slope, Some(10.606060606060607));
254        assert_eq!(intercept, Some(6.818181818181815));
255
256        let (slope, intercept) = linear_regression(&ts_array, &values_array, 3000);
257        assert_eq!(slope, Some(10.606060606060607));
258        assert_eq!(intercept, Some(38.63636363636364));
259    }
260
261    #[test]
262    fn calculate_linear_regression_value_have_none() {
263        let ts_array = TimestampMillisecondArray::from_iter(
264            [
265                0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000,
266            ]
267            .into_iter()
268            .map(Some),
269        );
270        let values_array: Float64Array = [
271            Some(0.0),
272            Some(10.0),
273            Some(20.0),
274            Some(30.0),
275            Some(40.0),
276            None,
277            Some(0.0),
278            Some(10.0),
279            Some(20.0),
280            Some(30.0),
281            None,
282            Some(40.0),
283            Some(50.0),
284        ]
285        .into_iter()
286        .collect();
287        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
288        assert_eq!(slope, Some(10.606060606060607));
289        assert_eq!(intercept, Some(6.818181818181815));
290    }
291
292    #[test]
293    fn calculate_linear_regression_value_all_none() {
294        let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some));
295        let values_array: Float64Array = [None, None, None].into_iter().collect();
296        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
297        assert_eq!(slope, None);
298        assert_eq!(intercept, None);
299    }
300
301    // From prometheus `promql/functions_test.go` case `TestKahanSum`
302    #[test]
303    fn test_kahan_sum() {
304        let inputs = vec![1.0, 10.0f64.powf(100.0), 1.0, -10.0f64.powf(100.0)];
305
306        let mut sum = 0.0;
307        let mut c = 0f64;
308
309        for v in inputs {
310            (sum, c) = compensated_sum_inc(v, sum, c);
311        }
312        assert_eq!(sum + c, 2.0)
313    }
314
315    #[test]
316    fn extract_range_array_rejects_external_dictionary_with_null_keys() {
317        let keys = Int64Array::from_iter([Some(0), None]);
318        let values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
319        let dict = DictionaryArray::<Int64Type>::try_new(keys, values).unwrap();
320
321        let err = extract_range_array(&ColumnarValue::Array(Arc::new(dict))).unwrap_err();
322        assert!(err.to_string().contains("Empty range is not expected"));
323    }
324
325    #[test]
326    fn extract_range_array_accepts_internal_packed_ranges() {
327        let values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
328        let range_array = RangeArray::from_ranges(values, [(0, 2), (1, 2)]).unwrap();
329
330        let extracted =
331            extract_range_array(&ColumnarValue::Array(Arc::new(range_array.into_dict()))).unwrap();
332
333        assert_eq!(extracted.get_offset_length(0), Some((0, 2)));
334        assert_eq!(extracted.get_offset_length(1), Some((1, 2)));
335    }
336}