promql/
functions.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod aggr_over_time;
16mod changes;
17mod deriv;
18mod extrapolate_rate;
19mod holt_winters;
20mod idelta;
21mod predict_linear;
22mod quantile;
23mod quantile_aggr;
24mod resets;
25mod round;
26#[cfg(test)]
27mod test_util;
28
29pub use aggr_over_time::{
30    AbsentOverTime, AvgOverTime, CountOverTime, LastOverTime, MaxOverTime, MinOverTime,
31    PresentOverTime, StddevOverTime, StdvarOverTime, SumOverTime,
32};
33pub use changes::Changes;
34use datafusion::arrow::array::{ArrayRef, Float64Array, TimestampMillisecondArray};
35use datafusion::error::DataFusionError;
36use datafusion::physical_plan::ColumnarValue;
37pub use deriv::Deriv;
38pub use extrapolate_rate::{Delta, Increase, Rate};
39pub use holt_winters::HoltWinters;
40pub use idelta::IDelta;
41pub use predict_linear::PredictLinear;
42pub use quantile::QuantileOverTime;
43pub use quantile_aggr::{quantile_udaf, QUANTILE_NAME};
44pub use resets::Resets;
45pub use round::Round;
46
47/// Extracts an array from a `ColumnarValue`.
48///
49/// If the `ColumnarValue` is a scalar, it converts it to an array of size 1.
50pub(crate) fn extract_array(columnar_value: &ColumnarValue) -> Result<ArrayRef, DataFusionError> {
51    match columnar_value {
52        ColumnarValue::Array(array) => Ok(array.clone()),
53        ColumnarValue::Scalar(scalar) => Ok(scalar.to_array_of_size(1)?),
54    }
55}
56
57/// compensation(Kahan) summation algorithm - a technique for reducing the numerical error
58/// in floating-point arithmetic. The algorithm also includes the modification ("Neumaier improvement")
59/// that reduces the numerical error further in cases
60/// where the numbers being summed have a large difference in magnitude
61/// Prometheus's implementation:
62/// <https://github.com/prometheus/prometheus/blob/f55ab2217984770aa1eecd0f2d5f54580029b1c0/promql/functions.go#L782>
63pub(crate) fn compensated_sum_inc(inc: f64, sum: f64, mut compensation: f64) -> (f64, f64) {
64    let new_sum = sum + inc;
65    if sum.abs() >= inc.abs() {
66        compensation += (sum - new_sum) + inc;
67    } else {
68        compensation += (inc - new_sum) + sum;
69    }
70    (new_sum, compensation)
71}
72
73/// linear_regression performs a least-square linear regression analysis on the
74/// times and values. It return the slope and intercept based on times and values.
75/// Prometheus's implementation: <https://github.com/prometheus/prometheus/blob/90b2f7a540b8a70d8d81372e6692dcbb67ccbaaa/promql/functions.go#L793-L837>
76pub(crate) fn linear_regression(
77    times: &TimestampMillisecondArray,
78    values: &Float64Array,
79    intercept_time: i64,
80) -> (Option<f64>, Option<f64>) {
81    let mut count: f64 = 0.0;
82    let mut sum_x: f64 = 0.0;
83    let mut sum_y: f64 = 0.0;
84    let mut sum_xy: f64 = 0.0;
85    let mut sum_x2: f64 = 0.0;
86    let mut comp_x: f64 = 0.0;
87    let mut comp_y: f64 = 0.0;
88    let mut comp_xy: f64 = 0.0;
89    let mut comp_x2: f64 = 0.0;
90
91    let mut const_y = true;
92    let init_y: f64 = values.value(0);
93
94    for (i, value) in values.iter().enumerate() {
95        let time = times.value(i) as f64;
96        if value.is_none() {
97            continue;
98        }
99        let value = value.unwrap();
100        if const_y && i > 0 && value != init_y {
101            const_y = false;
102        }
103        count += 1.0;
104        let x = (time - intercept_time as f64) / 1e3f64;
105        (sum_x, comp_x) = compensated_sum_inc(x, sum_x, comp_x);
106        (sum_y, comp_y) = compensated_sum_inc(value, sum_y, comp_y);
107        (sum_xy, comp_xy) = compensated_sum_inc(x * value, sum_xy, comp_xy);
108        (sum_x2, comp_x2) = compensated_sum_inc(x * x, sum_x2, comp_x2);
109    }
110
111    if count < 2.0 {
112        return (None, None);
113    }
114
115    if const_y {
116        if !init_y.is_finite() {
117            return (None, None);
118        }
119        return (Some(0.0), Some(init_y));
120    }
121
122    sum_x += comp_x;
123    sum_y += comp_y;
124    sum_xy += comp_xy;
125    sum_x2 += comp_x2;
126
127    let cov_xy = sum_xy - sum_x * sum_y / count;
128    let var_x = sum_x2 - sum_x * sum_x / count;
129
130    let slope = cov_xy / var_x;
131    let intercept = sum_y / count - slope * sum_x / count;
132
133    (Some(slope), Some(intercept))
134}
135
136#[cfg(test)]
137mod test {
138    use super::*;
139
140    #[test]
141    fn calculate_linear_regression_none() {
142        let ts_array = TimestampMillisecondArray::from_iter(
143            [
144                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
145            ]
146            .into_iter()
147            .map(Some),
148        );
149        let values_array = Float64Array::from_iter([
150            1.0 / 0.0,
151            1.0 / 0.0,
152            1.0 / 0.0,
153            1.0 / 0.0,
154            1.0 / 0.0,
155            1.0 / 0.0,
156            1.0 / 0.0,
157            1.0 / 0.0,
158            1.0 / 0.0,
159            1.0 / 0.0,
160        ]);
161        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
162        assert_eq!(slope, None);
163        assert_eq!(intercept, None);
164    }
165
166    #[test]
167    fn calculate_linear_regression_value_is_const() {
168        let ts_array = TimestampMillisecondArray::from_iter(
169            [
170                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
171            ]
172            .into_iter()
173            .map(Some),
174        );
175        let values_array =
176            Float64Array::from_iter([10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0, 10.0]);
177        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
178        assert_eq!(slope, Some(0.0));
179        assert_eq!(intercept, Some(10.0));
180    }
181
182    #[test]
183    fn calculate_linear_regression() {
184        let ts_array = TimestampMillisecondArray::from_iter(
185            [
186                0i64, 300, 600, 900, 1200, 1500, 1800, 2100, 2400, 2700, 3000,
187            ]
188            .into_iter()
189            .map(Some),
190        );
191        let values_array = Float64Array::from_iter([
192            0.0, 10.0, 20.0, 30.0, 40.0, 0.0, 10.0, 20.0, 30.0, 40.0, 50.0,
193        ]);
194        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
195        assert_eq!(slope, Some(10.606060606060607));
196        assert_eq!(intercept, Some(6.818181818181815));
197
198        let (slope, intercept) = linear_regression(&ts_array, &values_array, 3000);
199        assert_eq!(slope, Some(10.606060606060607));
200        assert_eq!(intercept, Some(38.63636363636364));
201    }
202
203    #[test]
204    fn calculate_linear_regression_value_have_none() {
205        let ts_array = TimestampMillisecondArray::from_iter(
206            [
207                0i64, 300, 600, 900, 1200, 1350, 1500, 1800, 2100, 2400, 2550, 2700, 3000,
208            ]
209            .into_iter()
210            .map(Some),
211        );
212        let values_array: Float64Array = [
213            Some(0.0),
214            Some(10.0),
215            Some(20.0),
216            Some(30.0),
217            Some(40.0),
218            None,
219            Some(0.0),
220            Some(10.0),
221            Some(20.0),
222            Some(30.0),
223            None,
224            Some(40.0),
225            Some(50.0),
226        ]
227        .into_iter()
228        .collect();
229        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
230        assert_eq!(slope, Some(10.606060606060607));
231        assert_eq!(intercept, Some(6.818181818181815));
232    }
233
234    #[test]
235    fn calculate_linear_regression_value_all_none() {
236        let ts_array = TimestampMillisecondArray::from_iter([0i64, 300, 600].into_iter().map(Some));
237        let values_array: Float64Array = [None, None, None].into_iter().collect();
238        let (slope, intercept) = linear_regression(&ts_array, &values_array, ts_array.value(0));
239        assert_eq!(slope, None);
240        assert_eq!(intercept, None);
241    }
242
243    // From prometheus `promql/functions_test.go` case `TestKahanSum`
244    #[test]
245    fn test_kahan_sum() {
246        let inputs = vec![1.0, 10.0f64.powf(100.0), 1.0, -1.0 * 10.0f64.powf(100.0)];
247
248        let mut sum = 0.0;
249        let mut c = 0f64;
250
251        for v in inputs {
252            (sum, c) = compensated_sum_inc(v, sum, c);
253        }
254        assert_eq!(sum + c, 2.0)
255    }
256}