1use std::fmt::Display;
33use std::sync::Arc;
34
35use datafusion::arrow::array::{
36 DictionaryArray, Float64Array, Float64Builder, TimestampMillisecondArray,
37};
38use datafusion::arrow::datatypes::TimeUnit;
39use datafusion::common::{DataFusionError, Result as DfResult};
40use datafusion::logical_expr::{ScalarUDF, Volatility};
41use datafusion::physical_plan::ColumnarValue;
42use datafusion_expr::create_udf;
43use datatypes::arrow::array::{Array, Int64Array};
44use datatypes::arrow::datatypes::{DataType, Int64Type};
45
46use crate::functions::extract_array;
47use crate::range_array::{RangeArray, unpack};
48
49pub type Delta = ExtrapolatedRate<false, false>;
50pub type Rate = ExtrapolatedRate<true, true>;
51pub type Increase = ExtrapolatedRate<true, false>;
52
53#[derive(Debug)]
56pub struct ExtrapolatedRate<const IS_COUNTER: bool, const IS_RATE: bool> {
57 range_length: i64,
59}
60
61impl<const IS_COUNTER: bool, const IS_RATE: bool> ExtrapolatedRate<IS_COUNTER, IS_RATE> {
62 fn new(range_length: i64) -> Self {
64 Self { range_length }
65 }
66
67 fn func_name() -> &'static str {
68 match (IS_COUNTER, IS_RATE) {
69 (true, true) => "prom_rate",
70 (true, false) => "prom_increase",
71 (false, false) => "prom_delta",
72 (false, true) => {
73 unreachable!("gauge rate is not supported by ExtrapolatedRate")
74 }
75 }
76 }
77
78 fn scalar_udf_with_name(name: &str) -> ScalarUDF {
79 let input_types = vec![
80 RangeArray::convert_data_type(DataType::Timestamp(TimeUnit::Millisecond, None)),
82 RangeArray::convert_data_type(DataType::Float64),
84 DataType::Timestamp(TimeUnit::Millisecond, None),
86 DataType::Int64,
88 ];
89
90 create_udf(
91 name,
92 input_types,
93 DataType::Float64,
94 Volatility::Volatile,
95 Arc::new(move |input: &_| Self::create_function(input)?.calc(input)) as _,
96 )
97 }
98
99 fn create_function(inputs: &[ColumnarValue]) -> DfResult<Self> {
100 if inputs.len() != 4 {
101 return Err(DataFusionError::Plan(
102 "ExtrapolatedRate function should have 4 inputs".to_string(),
103 ));
104 }
105
106 let range_length_array = extract_array(&inputs[3])?;
107 let range_length_array = range_length_array
108 .as_any()
109 .downcast_ref::<Int64Array>()
110 .ok_or_else(|| {
111 DataFusionError::Execution(format!(
112 "{}: expect Int64 as range length type, found {}",
113 Self::func_name(),
114 range_length_array.data_type()
115 ))
116 })?;
117 if range_length_array.is_empty() || range_length_array.is_null(0) {
118 return Err(DataFusionError::Execution(format!(
119 "{}: range length must contain a non-null Int64 value",
120 Self::func_name()
121 )));
122 }
123 let range_length = range_length_array.value(0);
124
125 Ok(Self::new(range_length))
126 }
127
128 fn calc(&self, input: &[ColumnarValue]) -> DfResult<ColumnarValue> {
134 if input.len() != 4 {
135 return Err(DataFusionError::Plan(
136 "ExtrapolatedRate function should have 4 inputs".to_string(),
137 ));
138 }
139
140 let ts_dict = extract_range_dict(
141 &input[0],
142 Self::func_name(),
143 "timestamp range vector",
144 &DataType::Timestamp(TimeUnit::Millisecond, None),
145 )?;
146 let value_dict = extract_range_dict(
147 &input[1],
148 Self::func_name(),
149 "value range vector",
150 &DataType::Float64,
151 )?;
152 let eval_ts_array = extract_eval_timestamps(&input[2], Self::func_name())?;
153
154 let keys = ts_dict.keys().values();
155 let num_windows = keys.len();
156 if value_dict.keys().len() != num_windows {
157 return Err(DataFusionError::Execution(format!(
158 "{}: timestamp and value ranges should have the same number of windows, found {} and {}",
159 Self::func_name(),
160 num_windows,
161 value_dict.keys().len()
162 )));
163 }
164 if value_dict.keys().values() != keys {
165 return Err(DataFusionError::Execution(format!(
166 "{}: timestamp and value ranges should have the same window layout",
167 Self::func_name()
168 )));
169 }
170 if eval_ts_array.len() != num_windows {
171 return Err(DataFusionError::Execution(format!(
172 "{}: evaluation timestamp vector should have the same number of rows as range inputs, found {} and {}",
173 Self::func_name(),
174 eval_ts_array.len(),
175 num_windows
176 )));
177 }
178
179 let all_timestamps = ts_dict
180 .values()
181 .as_any()
182 .downcast_ref::<TimestampMillisecondArray>()
183 .expect("validated by extract_range_dict")
184 .values();
185 let all_values = value_dict
186 .values()
187 .as_any()
188 .downcast_ref::<Float64Array>()
189 .expect("validated by extract_range_dict")
190 .values();
191 let eval_ts = eval_ts_array.values();
192
193 let mut result_builder = Float64Builder::with_capacity(num_windows);
194 let range_length = self.range_length;
195 let range_length_secs = range_length as f64 / 1000.0;
196
197 let mut counter_correction = 0.0;
198 let mut prev_offset = usize::MAX;
199 let mut prev_length = 0usize;
200
201 for index in 0..num_windows {
202 let (raw_offset, raw_length) = unpack(keys[index]);
203 let offset = raw_offset as usize;
204 let length = raw_length as usize;
205
206 if length < 2 {
207 result_builder.append_null();
208 prev_offset = usize::MAX;
209 continue;
210 }
211
212 let end = offset + length;
213 let first_value = all_values[offset];
214 let last_value = all_values[end - 1];
215
216 let result_value = if IS_COUNTER {
217 if prev_offset != usize::MAX && offset == prev_offset + 1 && length == prev_length {
221 if all_values[prev_offset + 1] < all_values[prev_offset] {
222 counter_correction -= all_values[prev_offset];
223 }
224 if all_values[end - 1] < all_values[end - 2] {
225 counter_correction += all_values[end - 2];
226 }
227 } else {
228 counter_correction = 0.0;
229 for pair in all_values[offset..end].windows(2) {
230 if pair[1] < pair[0] {
231 counter_correction += pair[0];
232 }
233 }
234 }
235 last_value - first_value + counter_correction
236 } else {
237 last_value - first_value
238 };
239
240 prev_offset = offset;
241 prev_length = length;
242
243 let first_ts = all_timestamps[offset];
244 let last_ts = all_timestamps[end - 1];
245 let range_end = eval_ts[index];
246 let range_start = range_end - range_length;
247 let sampled_interval_ms = (last_ts - first_ts) as f64;
248 let average_interval_ms = sampled_interval_ms / (length - 1) as f64;
249 let mut duration_to_start_ms = (first_ts - range_start) as f64;
250 let duration_to_end_ms = (range_end - last_ts) as f64;
251
252 if IS_COUNTER && result_value > 0.0 && first_value >= 0.0 {
255 let duration_to_zero = sampled_interval_ms * (first_value / result_value);
256 if duration_to_zero < duration_to_start_ms {
257 duration_to_start_ms = duration_to_zero;
258 }
259 }
260
261 let extrapolation_threshold = average_interval_ms * 1.1;
262 let mut extrapolated_interval_ms = sampled_interval_ms;
263
264 if duration_to_start_ms < extrapolation_threshold {
267 extrapolated_interval_ms += duration_to_start_ms;
268 } else {
269 extrapolated_interval_ms += average_interval_ms / 2.0;
270 }
271 if duration_to_end_ms < extrapolation_threshold {
272 extrapolated_interval_ms += duration_to_end_ms;
273 } else {
274 extrapolated_interval_ms += average_interval_ms / 2.0;
275 }
276
277 let mut factor = extrapolated_interval_ms / sampled_interval_ms;
278
279 if IS_RATE {
280 factor /= range_length_secs;
281 }
282
283 result_builder.append_value(result_value * factor);
284 }
285
286 let result = ColumnarValue::Array(Arc::new(result_builder.finish()));
287 Ok(result)
288 }
289}
290
291fn extract_range_dict(
292 columnar_value: &ColumnarValue,
293 func_name: &str,
294 arg_name: &str,
295 expected_value_type: &DataType,
296) -> DfResult<DictionaryArray<Int64Type>> {
297 let array = extract_array(columnar_value)?;
298 let dict = array
299 .as_any()
300 .downcast_ref::<DictionaryArray<Int64Type>>()
301 .ok_or_else(|| {
302 DataFusionError::Execution(format!(
303 "{func_name}: expect {arg_name} as DictionaryArray<Int64>, found {}",
304 array.data_type()
305 ))
306 })?
307 .clone();
308
309 if &dict.value_type() != expected_value_type {
310 return Err(DataFusionError::Execution(format!(
311 "{func_name}: expect {arg_name} values of type {expected_value_type}, found {}",
312 dict.value_type()
313 )));
314 }
315
316 RangeArray::try_new(dict.clone()).map_err(DataFusionError::from)?;
317 Ok(dict)
318}
319
320fn extract_eval_timestamps(
321 columnar_value: &ColumnarValue,
322 func_name: &str,
323) -> DfResult<TimestampMillisecondArray> {
324 let array = extract_array(columnar_value)?;
325 let timestamps = array
326 .as_any()
327 .downcast_ref::<TimestampMillisecondArray>()
328 .ok_or_else(|| {
329 DataFusionError::Execution(format!(
330 "{func_name}: expect evaluation timestamp vector as Timestamp(Millisecond), found {}",
331 array.data_type()
332 ))
333 })?;
334 Ok(timestamps.clone())
335}
336
337impl ExtrapolatedRate<false, false> {
339 pub const fn name() -> &'static str {
340 "prom_delta"
341 }
342
343 pub fn scalar_udf() -> ScalarUDF {
344 Self::scalar_udf_with_name(Self::name())
345 }
346}
347
348impl ExtrapolatedRate<true, true> {
350 pub const fn name() -> &'static str {
351 "prom_rate"
352 }
353
354 pub fn scalar_udf() -> ScalarUDF {
355 Self::scalar_udf_with_name(Self::name())
356 }
357}
358
359impl ExtrapolatedRate<true, false> {
361 pub const fn name() -> &'static str {
362 "prom_increase"
363 }
364
365 pub fn scalar_udf() -> ScalarUDF {
366 Self::scalar_udf_with_name(Self::name())
367 }
368}
369
370impl Display for ExtrapolatedRate<false, false> {
371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372 f.write_str("PromQL Delta Function")
373 }
374}
375
376impl Display for ExtrapolatedRate<true, true> {
377 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
378 f.write_str("PromQL Rate Function")
379 }
380}
381
382impl Display for ExtrapolatedRate<true, false> {
383 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
384 f.write_str("PromQL Increase Function")
385 }
386}
387
388#[cfg(test)]
389mod test {
390
391 use datafusion::arrow::array::ArrayRef;
392 use datafusion_common::ScalarValue;
393
394 use super::*;
395
396 fn extrapolated_rate_runner<const IS_COUNTER: bool, const IS_RATE: bool>(
398 ts_range: RangeArray,
399 value_range: RangeArray,
400 timestamps: ArrayRef,
401 expected: Vec<f64>,
402 ) {
403 let input = vec![
404 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
405 ColumnarValue::Array(Arc::new(value_range.into_dict())),
406 ColumnarValue::Array(timestamps),
407 ColumnarValue::Array(Arc::new(Int64Array::from(vec![5]))),
408 ];
409 let output = extract_array(
410 &ExtrapolatedRate::<IS_COUNTER, IS_RATE>::new(5)
411 .calc(&input)
412 .unwrap(),
413 )
414 .unwrap()
415 .as_any()
416 .downcast_ref::<Float64Array>()
417 .unwrap()
418 .values()
419 .to_vec();
420 assert_eq!(output, expected);
421 }
422
423 fn sample_range_inputs() -> (ColumnarValue, ColumnarValue, ColumnarValue) {
424 let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
425 [1, 2, 3].into_iter().map(Some),
426 ));
427 let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
428 let ranges = [(0, 2), (1, 2)];
429
430 let ts_range = RangeArray::from_ranges(ts_values, ranges).unwrap();
431 let value_range = RangeArray::from_ranges(value_values, ranges).unwrap();
432 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
433 [2, 3].into_iter().map(Some),
434 )) as _;
435
436 (
437 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
438 ColumnarValue::Array(Arc::new(value_range.into_dict())),
439 ColumnarValue::Array(eval_ts),
440 )
441 }
442
443 #[test]
444 fn rate_rejects_wrong_input_arity() {
445 let err = ExtrapolatedRate::<true, true>::new(5)
446 .calc(&[])
447 .unwrap_err();
448
449 assert!(err.to_string().contains("should have 4 inputs"));
450 }
451
452 #[test]
453 fn rate_rejects_non_int64_range_length() {
454 let (ts_range, value_range, eval_ts) = sample_range_inputs();
455
456 let err = ExtrapolatedRate::<true, true>::create_function(&[
457 ts_range,
458 value_range,
459 eval_ts,
460 ColumnarValue::Scalar(ScalarValue::Float64(Some(5.0))),
461 ])
462 .unwrap_err();
463
464 assert!(err.to_string().contains("range length type"));
465 }
466
467 #[test]
468 fn rate_rejects_empty_range_length() {
469 let (ts_range, value_range, eval_ts) = sample_range_inputs();
470
471 let err = ExtrapolatedRate::<true, true>::create_function(&[
472 ts_range,
473 value_range,
474 eval_ts,
475 ColumnarValue::Array(Arc::new(Int64Array::from(Vec::<i64>::new()))),
476 ])
477 .unwrap_err();
478
479 assert!(err.to_string().contains("range length must contain"));
480 }
481
482 #[test]
483 fn rate_rejects_null_range_length() {
484 let (ts_range, value_range, eval_ts) = sample_range_inputs();
485
486 let err = ExtrapolatedRate::<true, true>::create_function(&[
487 ts_range,
488 value_range,
489 eval_ts,
490 ColumnarValue::Array(Arc::new(Int64Array::from(vec![None]))),
491 ])
492 .unwrap_err();
493
494 assert!(err.to_string().contains("range length must contain"));
495 }
496
497 #[test]
498 fn increase_abnormal_input() {
499 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
500 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
501 ));
502 let values_array = Arc::new(Float64Array::from_iter([
503 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
504 ]));
505 let ranges = [(0, 2), (0, 5), (1, 1), (3, 3), (8, 1), (9, 0)];
506 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
507 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
508 let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
509 Some(2),
510 Some(5),
511 Some(2),
512 Some(6),
513 Some(9),
514 None,
515 ])) as _;
516 extrapolated_rate_runner::<true, false>(
517 ts_range,
518 value_range,
519 timestamps,
520 vec![2.0, 5.0, 0.0, 2.5, 0.0, 0.0],
521 );
522 }
523
524 #[test]
525 fn increase_normal_input() {
526 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
527 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
528 ));
529 let values_array = Arc::new(Float64Array::from_iter([
530 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
531 ]));
532 let ranges = [
533 (0, 2),
534 (1, 2),
535 (2, 2),
536 (3, 2),
537 (4, 2),
538 (5, 2),
539 (6, 2),
540 (7, 2),
541 ];
542 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
543 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
544 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
545 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
546 )) as _;
547 extrapolated_rate_runner::<true, false>(
548 ts_range,
549 value_range,
550 timestamps,
551 vec![2.0, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
553 );
554 }
555
556 #[test]
557 fn increase_short_input() {
558 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
559 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
560 ));
561 let values_array = Arc::new(Float64Array::from_iter([
562 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
563 ]));
564 let ranges = [
565 (0, 1),
566 (1, 0),
567 (2, 1),
568 (3, 0),
569 (4, 3),
570 (5, 1),
571 (6, 0),
572 (7, 2),
573 ];
574 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
575 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
576 let timestamps = Arc::new(TimestampMillisecondArray::from_iter([
577 Some(1),
578 None,
579 Some(3),
580 None,
581 Some(7),
582 Some(6),
583 None,
584 Some(9),
585 ])) as _;
586 extrapolated_rate_runner::<true, false>(
587 ts_range,
588 value_range,
589 timestamps,
590 vec![0.0, 0.0, 0.0, 0.0, 2.5, 0.0, 0.0, 1.5],
591 );
592 }
593
594 #[test]
595 fn increase_counter_reset() {
596 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
597 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
598 ));
599 let values_array = Arc::new(Float64Array::from_iter([
601 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
602 ]));
603 let ranges = [
604 (0, 2),
605 (1, 2),
606 (2, 2),
607 (3, 2),
608 (4, 2),
609 (5, 2),
610 (6, 2),
611 (7, 2),
612 ];
613 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
614 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
615 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
616 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
617 )) as _;
618 extrapolated_rate_runner::<true, false>(
619 ts_range,
620 value_range,
621 timestamps,
622 vec![2.0, 1.5, 1.5, 1.5, 2.0, 1.5, 1.5, 1.5],
626 );
627 }
628
629 #[test]
630 fn increase_counter_reset_wide_windows() {
631 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
632 [1, 2, 3, 4, 5, 6, 7].into_iter().map(Some),
633 ));
634 let values_array = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 1.0, 2.0, 1.0, 2.0]));
635 let ranges = [(0, 4), (1, 4), (2, 4), (3, 4)];
636 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
637 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
638 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
639 [4, 5, 6, 7].into_iter().map(Some),
640 )) as _;
641 extrapolated_rate_runner::<true, false>(
642 ts_range,
643 value_range,
644 timestamps,
645 vec![4.0, 3.5, 3.5, 4.0],
646 );
647 }
648
649 #[test]
650 fn rate_rejects_non_array_timestamp_ranges() {
651 let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
652 let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
653 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
654
655 let err = ExtrapolatedRate::<true, true>::new(5)
656 .calc(&[
657 ColumnarValue::Scalar(ScalarValue::Int64(Some(0))),
658 ColumnarValue::Array(Arc::new(value_range.into_dict())),
659 ColumnarValue::Array(eval_ts),
660 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
661 ])
662 .unwrap_err();
663
664 assert!(err.to_string().contains("timestamp range vector"));
665 }
666
667 #[test]
668 fn rate_rejects_non_timestamp_timestamp_range_values() {
669 let ts_values = Arc::new(Int64Array::from_iter([1, 2]));
670 let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0]));
671 let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
672 let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
673 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
674
675 let err = ExtrapolatedRate::<true, true>::new(5)
676 .calc(&[
677 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
678 ColumnarValue::Array(Arc::new(value_range.into_dict())),
679 ColumnarValue::Array(eval_ts),
680 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
681 ])
682 .unwrap_err();
683
684 assert!(err.to_string().contains("values of type Timestamp"));
685 }
686
687 #[test]
688 fn rate_rejects_non_float_value_range_values() {
689 let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
690 [1, 2].into_iter().map(Some),
691 ));
692 let value_values = Arc::new(Int64Array::from_iter([1, 2]));
693 let ts_range = RangeArray::from_ranges(ts_values, [(0, 2)]).unwrap();
694 let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
695 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter([Some(2)]));
696
697 let err = ExtrapolatedRate::<true, true>::new(5)
698 .calc(&[
699 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
700 ColumnarValue::Array(Arc::new(value_range.into_dict())),
701 ColumnarValue::Array(eval_ts),
702 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
703 ])
704 .unwrap_err();
705
706 assert!(
707 err.to_string()
708 .contains("value range vector values of type Float64")
709 );
710 }
711
712 #[test]
713 fn rate_rejects_mismatched_range_counts() {
714 let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
715 [1, 2, 3].into_iter().map(Some),
716 ));
717 let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0]));
718 let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
719 let value_range = RangeArray::from_ranges(value_values, [(0, 2)]).unwrap();
720 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
721 [2, 3].into_iter().map(Some),
722 ));
723
724 let err = ExtrapolatedRate::<true, true>::new(5)
725 .calc(&[
726 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
727 ColumnarValue::Array(Arc::new(value_range.into_dict())),
728 ColumnarValue::Array(eval_ts),
729 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
730 ])
731 .unwrap_err();
732
733 assert!(err.to_string().contains("same number of windows"));
734 }
735
736 #[test]
737 fn rate_rejects_mismatched_range_layouts() {
738 let ts_values = Arc::new(TimestampMillisecondArray::from_iter(
739 [1, 2, 3, 4].into_iter().map(Some),
740 ));
741 let value_values = Arc::new(Float64Array::from_iter([1.0, 2.0, 3.0, 4.0]));
742 let ts_range = RangeArray::from_ranges(ts_values, [(0, 2), (1, 2)]).unwrap();
743 let value_range = RangeArray::from_ranges(value_values, [(0, 2), (2, 2)]).unwrap();
744 let eval_ts = Arc::new(TimestampMillisecondArray::from_iter(
745 [2, 4].into_iter().map(Some),
746 ));
747
748 let err = ExtrapolatedRate::<true, true>::new(5)
749 .calc(&[
750 ColumnarValue::Array(Arc::new(ts_range.into_dict())),
751 ColumnarValue::Array(Arc::new(value_range.into_dict())),
752 ColumnarValue::Array(eval_ts),
753 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
754 ])
755 .unwrap_err();
756
757 assert!(err.to_string().contains("same window layout"));
758 }
759
760 #[test]
761 fn rate_rejects_non_timestamp_eval_vector() {
762 let (ts_range, value_range, _) = sample_range_inputs();
763
764 let err = ExtrapolatedRate::<true, true>::new(5)
765 .calc(&[
766 ts_range,
767 value_range,
768 ColumnarValue::Array(Arc::new(Float64Array::from_iter([2.0, 3.0]))),
769 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
770 ])
771 .unwrap_err();
772
773 assert!(err.to_string().contains("evaluation timestamp vector"));
774 }
775
776 #[test]
777 fn rate_rejects_mismatched_eval_timestamp_rows() {
778 let (ts_range, value_range, _) = sample_range_inputs();
779
780 let err = ExtrapolatedRate::<true, true>::new(5)
781 .calc(&[
782 ts_range,
783 value_range,
784 ColumnarValue::Array(Arc::new(TimestampMillisecondArray::from_iter([Some(2)]))),
785 ColumnarValue::Scalar(ScalarValue::Int64(Some(5))),
786 ])
787 .unwrap_err();
788
789 assert!(err.to_string().contains("same number of rows"));
790 }
791
792 #[test]
793 fn rate_counter_reset() {
794 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
795 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
796 ));
797 let values_array = Arc::new(Float64Array::from_iter([
799 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
800 ]));
801 let ranges = [
802 (0, 2),
803 (1, 2),
804 (2, 2),
805 (3, 2),
806 (4, 2),
807 (5, 2),
808 (6, 2),
809 (7, 2),
810 ];
811 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
812 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
813 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
814 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
815 )) as _;
816 extrapolated_rate_runner::<true, true>(
817 ts_range,
818 value_range,
819 timestamps,
820 vec![400.0, 300.0, 300.0, 300.0, 400.0, 300.0, 300.0, 300.0],
821 );
822 }
823
824 #[test]
825 fn rate_normal_input() {
826 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
827 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
828 ));
829 let values_array = Arc::new(Float64Array::from_iter([
830 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
831 ]));
832 let ranges = [
833 (0, 2),
834 (1, 2),
835 (2, 2),
836 (3, 2),
837 (4, 2),
838 (5, 2),
839 (6, 2),
840 (7, 2),
841 ];
842 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
843 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
844 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
845 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
846 )) as _;
847 extrapolated_rate_runner::<true, true>(
848 ts_range,
849 value_range,
850 timestamps,
851 vec![400.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0, 300.0],
852 );
853 }
854
855 #[test]
856 fn delta_counter_reset() {
857 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
858 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
859 ));
860 let values_array = Arc::new(Float64Array::from_iter([
862 1.0, 2.0, 3.0, 4.0, 1.0, 2.0, 3.0, 4.0, 5.0,
863 ]));
864 let ranges = [
865 (0, 2),
866 (1, 2),
867 (2, 2),
868 (3, 2),
869 (4, 2),
870 (5, 2),
871 (6, 2),
872 (7, 2),
873 ];
874 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
875 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
876 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
877 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
878 )) as _;
879 extrapolated_rate_runner::<false, false>(
880 ts_range,
881 value_range,
882 timestamps,
883 vec![1.5, 1.5, 1.5, -4.5, 1.5, 1.5, 1.5, 1.5],
885 );
886 }
887
888 #[test]
889 fn delta_normal_input() {
890 let ts_array = Arc::new(TimestampMillisecondArray::from_iter(
891 [1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
892 ));
893 let values_array = Arc::new(Float64Array::from_iter([
894 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
895 ]));
896 let ranges = [
897 (0, 2),
898 (1, 2),
899 (2, 2),
900 (3, 2),
901 (4, 2),
902 (5, 2),
903 (6, 2),
904 (7, 2),
905 ];
906 let ts_range = RangeArray::from_ranges(ts_array, ranges).unwrap();
907 let value_range = RangeArray::from_ranges(values_array, ranges).unwrap();
908 let timestamps = Arc::new(TimestampMillisecondArray::from_iter(
909 [2, 3, 4, 5, 6, 7, 8, 9].into_iter().map(Some),
910 )) as _;
911 extrapolated_rate_runner::<false, false>(
912 ts_range,
913 value_range,
914 timestamps,
915 vec![1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5],
916 );
917 }
918}