1use std::fmt::Display;
33use std::sync::Arc;
34
35use datafusion::arrow::array::{Float64Array, TimestampMillisecondArray};
36use datafusion::arrow::datatypes::TimeUnit;
37use datafusion::common::{DataFusionError, Result as DfResult};
38use datafusion::logical_expr::{ScalarUDF, Volatility};
39use datafusion::physical_plan::ColumnarValue;
40use datafusion_expr::create_udf;
41use datatypes::arrow::array::{Array, Int64Array};
42use datatypes::arrow::datatypes::DataType;
43
44use crate::extension_plan::Millisecond;
45use crate::functions::extract_array;
46use crate::range_array::RangeArray;
47
48pub type Delta = ExtrapolatedRate<false, false>;
49pub type Rate = ExtrapolatedRate<true, true>;
50pub type Increase = ExtrapolatedRate<true, false>;
51
52#[derive(Debug)]
55pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
56 range_length: i64,
58}
59
60impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, IS_RATE> {
61 fn new(range_length: i64) -> Self {
63 Self { range_length }
64 }
65
66 fn scalar_udf_with_name(name: &str) -> ScalarUDF {
67 let input_types = vec![
68 RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
70 RangeArray::convert_data_type(DataType::Float64),
72 DataType::Timestamp(TimeUnit::Millisecond, None),
74 DataType::Int64,
76 ];
77
78 create_udf(
79 name,
80 input_types,
81 DataType::Float64,
82 Volatility::Volatile,
83 Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
84 )
85 }
86
87 fn create_function(inputs: &[ColumnarValue]) -> DfResult<Self> {
88 if inputs.len() != 4 {
89 return Err(DataFusionError::Plan(
90 "ExtrapolatedRate function should have 4 inputs".to_string(),
91 ));
92 }
93
94 let range_length_array = extract_array(&inputs[3])?;
95 let range_length = range_length_array
96 .as_any()
97 .downcast_ref::<Int64Array>()
98 .unwrap()
99 .value(0) as i64;
100
101 Ok(Self::new(range_length))
102 }
103
104 fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
110 assert_eq!(input.len(), 4);
111
112 let ts_array = extract_array(&input[0])?;
114 let ts_range = RangeArray::try_new(ts_array.to_data().into())?;
115 let value_array = extract_array(&input[1])?;
116 let value_range = RangeArray::try_new(value_array.to_data().into())?;
117 let ts = extract_array(&input[2])?;
118 let ts = ts
119 .as_any()
120 .downcast_ref::<TimestampMillisecondArray>()
121 .unwrap();
122
123 let mut result_array = Vec::with_capacity(ts_range.len());
125
126 let all_timestamps = ts_range
127 .values()
128 .as_any()
129 .downcast_ref::<TimestampMillisecondArray>()
130 .unwrap()
131 .values();
132 let all_values = value_range
133 .values()
134 .as_any()
135 .downcast_ref::<Float64Array>()
136 .unwrap()
137 .values();
138 for index in 0..ts_range.len() {
139 let (offset, length) = ts_range.get_offset_length(index).unwrap();
141
142 let timestamps = &all_timestamps[offset..offset + length];
143 let end_ts = ts.value(index);
144 let values = &all_values[offset..offset + length];
145
146 if values.len() < 2 {
147 result_array.push(None);
148 continue;
149 }
150
151 let mut result_value = values.last().unwrap() - values.first().unwrap();
153 if IS_COUNTER {
154 for window in values.windows(2) {
155 let prev = window[0];
156 let curr = window[1];
157 if curr < prev {
158 result_value += prev
159 }
160 }
161 }
162
163 let mut factor = Self::extrapolate_factor(
164 timestamps,
165 end_ts,
166 self.range_length,
167 *values.first().unwrap(),
168 result_value,
169 );
170
171 if IS_RATE {
172 factor /= self.range_length as f64 / 1000.0;
174 }
175
176 result_array.push(Some(result_value * factor));
177 }
178
179 let result = ColumnarValue::Array(Arc::new(Float64Array::from_iter(result_array)));
180 Ok(result)
181 }
182
183 fn extrapolate_factor(
184 timestamps: &[Millisecond],
185 range_end: Millisecond,
186 range_length: Millisecond,
187 first_value: f64,
190 result_value: f64,
191 ) -> f64 {
192 let range_start = range_end - range_length;
196 let mut duration_to_start = (timestamps.first().unwrap() - range_start) as f64 / 1000.0;
197 let duration_to_end = (range_end - timestamps.last().unwrap()) as f64 / 1000.0;
198 let sampled_interval =
199 (timestamps.last().unwrap() - timestamps.first().unwrap()) as f64 / 1000.0;
200 let average_duration_between_samples = sampled_interval / (timestamps.len() - 1) as f64;
201
202 if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
211 let duration_to_zero = sampled_interval * (first_value / result_value);
212 if duration_to_zero < duration_to_start {
213 duration_to_start = duration_to_zero;
214 }
215 }
216
217 let extrapolation_threshold = average_duration_between_samples * 1.1;
218 let mut extrapolate_to_interval = sampled_interval;
219
220 if duration_to_start < extrapolation_threshold {
221 extrapolate_to_interval += duration_to_start;
222 } else {
223 extrapolate_to_interval += average_duration_between_samples / 2.0;
224 }
225 if duration_to_end < extrapolation_threshold {
226 extrapolate_to_interval += duration_to_end;
227 } else {
228 extrapolate_to_interval += average_duration_between_samples / 2.0;
229 }
230
231 extrapolate_to_interval / sampled_interval
232 }
233}
234
235impl ExtrapolatedRate<false, false> {
237 pub const fn name() -> &'static str {
238 "prom_delta"
239 }
240
241 pub fn scalar_udf() -> ScalarUDF {
242 Self::scalar_udf_with_name(Self::name())
243 }
244}
245
246impl ExtrapolatedRate<true, true> {
248 pub const fn name() -> &'static str {
249 "prom_rate"
250 }
251
252 pub fn scalar_udf() -> ScalarUDF {
253 Self::scalar_udf_with_name(Self::name())
254 }
255}
256
257impl ExtrapolatedRate<true, false> {
259 pub const fn name() -> &'static str {
260 "prom_increase"
261 }
262
263 pub fn scalar_udf() -> ScalarUDF {
264 Self::scalar_udf_with_name(Self::name())
265 }
266}
267
268impl Display for ExtrapolatedRate<false, false> {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 f.write_str("PromQL Delta Function")
271 }
272}
273
274impl Display for ExtrapolatedRate<true, true> {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 f.write_str("PromQL Rate Function")
277 }
278}
279
280impl Display for ExtrapolatedRate<true, false> {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 f.write_str("PromQL Increase Function")
283 }
284}
285
286#[cfg(test)]
287mod test {
288
289 use datafusion::arrow::array::ArrayRef;
290
291 use super::*;
292
293 fn extrapolated_rate_runner<const IS_COUNTER: bool, const IS_RATE: bool>(
295 ts_range: RangeArray,
296 value_range: RangeArray,
297 timestamps: ArrayRef,
298 expected: Vec<f64>,
299 ) {
300 let input = vec![
301 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
302 ColumnarValue::Array(Arc::new(value_range.into_dict())),
303 ColumnarValue::Array(timestamps),
304 ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))),
305 ];
306 let output = extract_array(
307 &ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)
308 .calc(&input)
309 .unwrap(),
310 )
311 .unwrap()
312 .as_any()
313 .downcast_ref::<Float64Array>()
314 .unwrap()
315 .values()
316 .to_vec();
317 assert_eq!(output, expected);
318 }
319
320 #[test]
321 fn increase_abnormal_input() {
322 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
323 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
324 ));
325 let values_array = Arc::new(Float64Array::from_iter([
326 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
327 ]));
328 let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
329 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
330 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
331 let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
332 Some(2),
333 Some(5),
334 Some(2),
335 Some(6),
336 Some(9),
337 None,
338 ])) as _;
339 extrapolated_rate_runner::<true, false>(
340 ts_range,
341 value_range,
342 timestamps,
343 vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0],
344 );
345 }
346
347 #[test]
348 fn increase_normal_input() {
349 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
350 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
351 ));
352 let values_array = Arc::new(Float64Array::from_iter([
353 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
354 ]));
355 let ranges = [
356 (0, 2),
357 (1, 2),
358 (2, 2),
359 (3, 2),
360 (4, 2),
361 (5, 2),
362 (6, 2),
363 (7, 2),
364 ];
365 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
366 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
367 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
368 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
369 )) as _;
370 extrapolated_rate_runner::<true, false>(
371 ts_range,
372 value_range,
373 timestamps,
374 vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
376 );
377 }
378
379 #[test]
380 fn increase_short_input() {
381 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
382 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
383 ));
384 let values_array = Arc::new(Float64Array::from_iter([
385 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
386 ]));
387 let ranges = [
388 (0, 1),
389 (1, 0),
390 (2, 1),
391 (3, 0),
392 (4, 3),
393 (5, 1),
394 (6, 0),
395 (7, 2),
396 ];
397 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
398 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
399 let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
400 Some(1),
401 None,
402 Some(3),
403 None,
404 Some(7),
405 Some(6),
406 None,
407 Some(9),
408 ])) as _;
409 extrapolated_rate_runner::<true, false>(
410 ts_range,
411 value_range,
412 timestamps,
413 vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5],
414 );
415 }
416
417 #[test]
418 fn increase_counter_reset() {
419 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
420 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
421 ));
422 let values_array = Arc::new(Float64Array::from_iter([
424 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
425 ]));
426 let ranges = [
427 (0, 2),
428 (1, 2),
429 (2, 2),
430 (3, 2),
431 (4, 2),
432 (5, 2),
433 (6, 2),
434 (7, 2),
435 ];
436 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
437 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
438 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
439 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
440 )) as _;
441 extrapolated_rate_runner::<true, false>(
442 ts_range,
443 value_range,
444 timestamps,
445 vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5],
449 );
450 }
451
452 #[test]
453 fn rate_counter_reset() {
454 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
455 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
456 ));
457 let values_array = Arc::new(Float64Array::from_iter([
459 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
460 ]));
461 let ranges = [
462 (0, 2),
463 (1, 2),
464 (2, 2),
465 (3, 2),
466 (4, 2),
467 (5, 2),
468 (6, 2),
469 (7, 2),
470 ];
471 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
472 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
473 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
474 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
475 )) as _;
476 extrapolated_rate_runner::<true, true>(
477 ts_range,
478 value_range,
479 timestamps,
480 vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0],
481 );
482 }
483
484 #[test]
485 fn rate_normal_input() {
486 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
487 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
488 ));
489 let values_array = Arc::new(Float64Array::from_iter([
490 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
491 ]));
492 let ranges = [
493 (0, 2),
494 (1, 2),
495 (2, 2),
496 (3, 2),
497 (4, 2),
498 (5, 2),
499 (6, 2),
500 (7, 2),
501 ];
502 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
503 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
504 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
505 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
506 )) as _;
507 extrapolated_rate_runner::<true, true>(
508 ts_range,
509 value_range,
510 timestamps,
511 vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0],
512 );
513 }
514
515 #[test]
516 fn delta_counter_reset() {
517 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
518 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
519 ));
520 let values_array = Arc::new(Float64Array::from_iter([
522 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
523 ]));
524 let ranges = [
525 (0, 2),
526 (1, 2),
527 (2, 2),
528 (3, 2),
529 (4, 2),
530 (5, 2),
531 (6, 2),
532 (7, 2),
533 ];
534 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
535 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
536 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
537 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
538 )) as _;
539 extrapolated_rate_runner::<false, false>(
540 ts_range,
541 value_range,
542 timestamps,
543 vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5],
545 );
546 }
547
548 #[test]
549 fn delta_normal_input() {
550 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
551 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
552 ));
553 let values_array = Arc::new(Float64Array::from_iter([
554 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
555 ]));
556 let ranges = [
557 (0, 2),
558 (1, 2),
559 (2, 2),
560 (3, 2),
561 (4, 2),
562 (5, 2),
563 (6, 2),
564 (7, 2),
565 ];
566 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
567 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
568 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
569 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
570 )) as _;
571 extrapolated_rate_runner::<false, false>(
572 ts_range,
573 value_range,
574 timestamps,
575 vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
576 );
577 }
578}