1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
53pub struct InstantManipulate {
54 start: Millisecond,
55 end: Millisecond,
56 lookback_delta: Millisecond,
57 interval: Millisecond,
58 time_index_column: String,
59 field_column: Option<String>,
61 input: LogicalPlan,
62}
63
64impl UserDefinedLogicalNodeCore for InstantManipulate {
65 fn name(&self) -> &str {
66 Self::name()
67 }
68
69 fn inputs(&self) -> Vec<&LogicalPlan> {
70 vec![&self.input]
71 }
72
73 fn schema(&self) -> &DFSchemaRef {
74 self.input.schema()
75 }
76
77 fn expressions(&self) -> Vec<Expr> {
78 vec![]
79 }
80
81 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82 write!(
83 f,
84 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
85 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
86 )
87 }
88
89 fn with_exprs_and_inputs(
90 &self,
91 _exprs: Vec<Expr>,
92 inputs: Vec<LogicalPlan>,
93 ) -> DataFusionResult<Self> {
94 if inputs.len() != 1 {
95 return Err(DataFusionError::Internal(
96 "InstantManipulate should have exact one input".to_string(),
97 ));
98 }
99
100 Ok(Self {
101 start: self.start,
102 end: self.end,
103 lookback_delta: self.lookback_delta,
104 interval: self.interval,
105 time_index_column: self.time_index_column.clone(),
106 field_column: self.field_column.clone(),
107 input: inputs.into_iter().next().unwrap(),
108 })
109 }
110}
111
112impl InstantManipulate {
113 pub fn new(
114 start: Millisecond,
115 end: Millisecond,
116 lookback_delta: Millisecond,
117 interval: Millisecond,
118 time_index_column: String,
119 field_column: Option<String>,
120 input: LogicalPlan,
121 ) -> Self {
122 Self {
123 start,
124 end,
125 lookback_delta,
126 interval,
127 time_index_column,
128 field_column,
129 input,
130 }
131 }
132
133 pub const fn name() -> &'static str {
134 "InstantManipulate"
135 }
136
137 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
138 Arc::new(InstantManipulateExec {
139 start: self.start,
140 end: self.end,
141 lookback_delta: self.lookback_delta,
142 interval: self.interval,
143 time_index_column: self.time_index_column.clone(),
144 field_column: self.field_column.clone(),
145 input: exec_input,
146 metric: ExecutionPlanMetricsSet::new(),
147 })
148 }
149
150 pub fn serialize(&self) -> Vec<u8> {
151 pb::InstantManipulate {
152 start: self.start,
153 end: self.end,
154 interval: self.interval,
155 lookback_delta: self.lookback_delta,
156 time_index: self.time_index_column.clone(),
157 field_index: self.field_column.clone().unwrap_or_default(),
158 }
159 .encode_to_vec()
160 }
161
162 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
163 let pb_instant_manipulate =
164 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
165 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
166 produce_one_row: false,
167 schema: Arc::new(DFSchema::empty()),
168 });
169 let field_column = if pb_instant_manipulate.field_index.is_empty() {
170 None
171 } else {
172 Some(pb_instant_manipulate.field_index)
173 };
174 Ok(Self {
175 start: pb_instant_manipulate.start,
176 end: pb_instant_manipulate.end,
177 lookback_delta: pb_instant_manipulate.lookback_delta,
178 interval: pb_instant_manipulate.interval,
179 time_index_column: pb_instant_manipulate.time_index,
180 field_column,
181 input: placeholder_plan,
182 })
183 }
184}
185
186#[derive(Debug)]
187pub struct InstantManipulateExec {
188 start: Millisecond,
189 end: Millisecond,
190 lookback_delta: Millisecond,
191 interval: Millisecond,
192 time_index_column: String,
193 field_column: Option<String>,
194
195 input: Arc<dyn ExecutionPlan>,
196 metric: ExecutionPlanMetricsSet,
197}
198
199impl ExecutionPlan for InstantManipulateExec {
200 fn as_any(&self) -> &dyn Any {
201 self
202 }
203
204 fn schema(&self) -> SchemaRef {
205 self.input.schema()
206 }
207
208 fn properties(&self) -> &PlanProperties {
209 self.input.properties()
210 }
211
212 fn required_input_distribution(&self) -> Vec<Distribution> {
213 self.input.required_input_distribution()
214 }
215
216 fn maintains_input_order(&self) -> Vec<bool> {
218 vec![false; self.children().len()]
219 }
220
221 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
222 vec![&self.input]
223 }
224
225 fn with_new_children(
226 self: Arc<Self>,
227 children: Vec<Arc<dyn ExecutionPlan>>,
228 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
229 assert!(!children.is_empty());
230 Ok(Arc::new(Self {
231 start: self.start,
232 end: self.end,
233 lookback_delta: self.lookback_delta,
234 interval: self.interval,
235 time_index_column: self.time_index_column.clone(),
236 field_column: self.field_column.clone(),
237 input: children[0].clone(),
238 metric: self.metric.clone(),
239 }))
240 }
241
242 fn execute(
243 &self,
244 partition: usize,
245 context: Arc<TaskContext>,
246 ) -> DataFusionResult<SendableRecordBatchStream> {
247 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
248 let metrics_builder = MetricBuilder::new(&self.metric);
249 let num_series = Count::new();
250 metrics_builder
251 .with_partition(partition)
252 .build(MetricValue::Count {
253 name: METRIC_NUM_SERIES.into(),
254 count: num_series.clone(),
255 });
256
257 let input = self.input.execute(partition, context)?;
258 let schema = input.schema();
259 let time_index = schema
260 .column_with_name(&self.time_index_column)
261 .expect("time index column not found")
262 .0;
263 let field_index = self
264 .field_column
265 .as_ref()
266 .and_then(|name| schema.column_with_name(name))
267 .map(|x| x.0);
268 Ok(Box::pin(InstantManipulateStream {
269 start: self.start,
270 end: self.end,
271 lookback_delta: self.lookback_delta,
272 interval: self.interval,
273 time_index,
274 field_index,
275 schema,
276 input,
277 metric: baseline_metric,
278 num_series,
279 }))
280 }
281
282 fn metrics(&self) -> Option<MetricsSet> {
283 Some(self.metric.clone_inner())
284 }
285
286 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
287 let input_stats = self.input.partition_statistics(partition)?;
288
289 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
290 let estimated_total_bytes = input_stats
291 .total_byte_size
292 .get_value()
293 .zip(input_stats.num_rows.get_value())
294 .map(|(size, rows)| {
295 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
296 })
297 .unwrap_or(Precision::Absent);
298
299 Ok(Statistics {
300 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
301 total_byte_size: estimated_total_bytes,
302 column_statistics: vec![
304 ColumnStatistics::new_unknown();
305 self.schema().flattened_fields().len()
306 ],
307 })
308 }
309
310 fn name(&self) -> &str {
311 "InstantManipulateExec"
312 }
313}
314
315impl DisplayAs for InstantManipulateExec {
316 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
317 match t {
318 DisplayFormatType::Default
319 | DisplayFormatType::Verbose
320 | DisplayFormatType::TreeRender => {
321 write!(
322 f,
323 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
324 self.start,
325 self.end,
326 self.lookback_delta,
327 self.interval,
328 self.time_index_column
329 )
330 }
331 }
332 }
333}
334
335pub struct InstantManipulateStream {
336 start: Millisecond,
337 end: Millisecond,
338 lookback_delta: Millisecond,
339 interval: Millisecond,
340 time_index: usize,
342 field_index: Option<usize>,
343
344 schema: SchemaRef,
345 input: SendableRecordBatchStream,
346 metric: BaselineMetrics,
347 num_series: Count,
349}
350
351impl RecordBatchStream for InstantManipulateStream {
352 fn schema(&self) -> SchemaRef {
353 self.schema.clone()
354 }
355}
356
357impl Stream for InstantManipulateStream {
358 type Item = DataFusionResult<RecordBatch>;
359
360 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
361 let poll = match ready!(self.input.poll_next_unpin(cx)) {
362 Some(Ok(batch)) => {
363 if batch.num_rows() == 0 {
364 return Poll::Pending;
365 }
366 let timer = std::time::Instant::now();
367 self.num_series.add(1);
368 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
369 self.metric.elapsed_compute().add_elapsed(timer);
370 Poll::Ready(Some(result))
371 }
372 None => {
373 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
374 Poll::Ready(None)
375 }
376 Some(Err(e)) => Poll::Ready(Some(Err(e))),
377 };
378 self.metric.record_poll(poll)
379 }
380}
381
382impl InstantManipulateStream {
383 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
386 let mut take_indices = vec![];
387
388 let ts_column = input
389 .column(self.time_index)
390 .as_any()
391 .downcast_ref::<TimestampMillisecondArray>()
392 .ok_or_else(|| {
393 DataFusionError::Execution(
394 "Time index Column downcast to TimestampMillisecondArray failed".into(),
395 )
396 })?;
397
398 if ts_column.is_empty() {
400 return Ok(input);
401 }
402
403 let field_column = self
405 .field_index
406 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
407
408 let first_ts = ts_column.value(0);
410 let last_ts = ts_column.value(ts_column.len() - 1);
411 let last_useful = last_ts + self.lookback_delta;
412
413 let max_start = first_ts.max(self.start);
414 let min_end = last_useful.min(self.end);
415
416 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
417 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
418
419 let mut cursor = 0;
420
421 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
422 let mut aligned_ts = vec![];
423
424 'next: for expected_ts in aligned_ts_iter {
426 while cursor < ts_column.len() {
428 let curr = ts_column.value(cursor);
429 match curr.cmp(&expected_ts) {
430 Ordering::Equal => {
431 if let Some(field_column) = &field_column
432 && field_column.value(cursor).is_nan()
433 {
434 } else {
436 take_indices.push(cursor as u64);
437 aligned_ts.push(expected_ts);
438 }
439 continue 'next;
440 }
441 Ordering::Greater => break,
442 Ordering::Less => {}
443 }
444 cursor += 1;
445 }
446 if cursor == ts_column.len() {
447 cursor -= 1;
448 if ts_column.value(cursor) + self.lookback_delta < expected_ts {
450 break;
451 }
452 }
453
454 let curr_ts = ts_column.value(cursor);
456 if curr_ts + self.lookback_delta < expected_ts {
457 continue;
458 }
459 if curr_ts > expected_ts {
460 if let Some(prev_cursor) = cursor.checked_sub(1) {
462 let prev_ts = ts_column.value(prev_cursor);
463 if prev_ts + self.lookback_delta >= expected_ts {
464 if let Some(field_column) = &field_column
466 && field_column.value(prev_cursor).is_nan()
467 {
468 continue;
470 }
471 take_indices.push(prev_cursor as u64);
473 aligned_ts.push(expected_ts);
474 }
475 }
476 } else if let Some(field_column) = &field_column
477 && field_column.value(cursor).is_nan()
478 {
479 } else {
481 take_indices.push(cursor as u64);
483 aligned_ts.push(expected_ts);
484 }
485 }
486
487 self.take_record_batch_optional(input, take_indices, aligned_ts)
489 }
490
491 fn take_record_batch_optional(
493 &self,
494 record_batch: RecordBatch,
495 take_indices: Vec<u64>,
496 aligned_ts: Vec<Millisecond>,
497 ) -> DataFusionResult<RecordBatch> {
498 assert_eq!(take_indices.len(), aligned_ts.len());
499
500 let indices_array = UInt64Array::from(take_indices);
501 let mut arrays = record_batch
502 .columns()
503 .iter()
504 .map(|array| compute::take(array, &indices_array, None))
505 .collect::<ArrowResult<Vec<_>>>()?;
506 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
507
508 let result = RecordBatch::try_new(record_batch.schema(), arrays)
509 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
510 Ok(result)
511 }
512}
513
514#[cfg(test)]
515mod test {
516 use datafusion::prelude::SessionContext;
517
518 use super::*;
519 use crate::extension_plan::test_util::{
520 TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
521 };
522
523 async fn do_normalize_test(
524 start: Millisecond,
525 end: Millisecond,
526 lookback_delta: Millisecond,
527 interval: Millisecond,
528 expected: String,
529 contains_nan: bool,
530 ) {
531 let memory_exec = if contains_nan {
532 Arc::new(prepare_test_data_with_nan())
533 } else {
534 Arc::new(prepare_test_data())
535 };
536 let normalize_exec = Arc::new(InstantManipulateExec {
537 start,
538 end,
539 lookback_delta,
540 interval,
541 time_index_column: TIME_INDEX_COLUMN.to_string(),
542 field_column: Some("value".to_string()),
543 input: memory_exec,
544 metric: ExecutionPlanMetricsSet::new(),
545 });
546 let session_context = SessionContext::default();
547 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
548 .await
549 .unwrap();
550 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
551 .unwrap()
552 .to_string();
553
554 assert_eq!(result_literal, expected);
555 }
556
557 #[tokio::test]
558 async fn lookback_10s_interval_30s() {
559 let expected = String::from(
560 "+---------------------+-------+------+\
561 \n| timestamp | value | path |\
562 \n+---------------------+-------+------+\
563 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
564 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
565 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
566 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
567 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
568 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
569 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
570 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
571 \n+---------------------+-------+------+",
572 );
573 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
574 }
575
576 #[tokio::test]
577 async fn lookback_10s_interval_10s() {
578 let expected = String::from(
579 "+---------------------+-------+------+\
580 \n| timestamp | value | path |\
581 \n+---------------------+-------+------+\
582 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
583 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
584 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
585 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
586 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
587 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
588 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
589 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
590 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
591 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
592 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
593 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
594 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
595 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
596 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
597 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
598 \n+---------------------+-------+------+",
599 );
600 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
601 }
602
603 #[tokio::test]
604 async fn lookback_30s_interval_30s() {
605 let expected = String::from(
606 "+---------------------+-------+------+\
607 \n| timestamp | value | path |\
608 \n+---------------------+-------+------+\
609 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
610 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
611 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
612 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
613 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
614 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
615 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
616 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
617 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
618 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
619 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
620 \n+---------------------+-------+------+",
621 );
622 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
623 }
624
625 #[tokio::test]
626 async fn lookback_30s_interval_10s() {
627 let expected = String::from(
628 "+---------------------+-------+------+\
629 \n| timestamp | value | path |\
630 \n+---------------------+-------+------+\
631 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
632 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
633 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
634 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
635 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
636 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
637 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
638 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
639 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
640 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
641 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
642 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
643 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
644 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
645 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
646 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
647 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
648 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
649 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
650 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
651 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
652 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
653 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
654 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
655 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
656 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
657 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
658 \n+---------------------+-------+------+",
659 );
660 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
661 }
662
663 #[tokio::test]
664 async fn lookback_60s_interval_10s() {
665 let expected = String::from(
666 "+---------------------+-------+------+\
667 \n| timestamp | value | path |\
668 \n+---------------------+-------+------+\
669 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
670 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
671 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
672 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
673 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
674 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
675 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
676 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
677 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
678 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
679 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
680 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
681 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
682 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
683 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
684 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
685 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
686 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
687 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
688 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
689 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
690 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
691 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
692 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
693 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
694 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
695 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
696 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
697 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
698 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
699 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
700 \n+---------------------+-------+------+",
701 );
702 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
703 }
704
705 #[tokio::test]
706 async fn lookback_60s_interval_30s() {
707 let expected = String::from(
708 "+---------------------+-------+------+\
709 \n| timestamp | value | path |\
710 \n+---------------------+-------+------+\
711 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
712 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
713 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
714 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
715 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
716 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
717 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
718 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
719 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
720 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
721 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
722 \n+---------------------+-------+------+",
723 );
724 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
725 }
726
727 #[tokio::test]
728 async fn small_range_lookback_0s_interval_1s() {
729 let expected = String::from(
730 "+---------------------+-------+------+\
731 \n| timestamp | value | path |\
732 \n+---------------------+-------+------+\
733 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
734 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
735 \n+---------------------+-------+------+",
736 );
737 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
738 }
739
740 #[tokio::test]
741 async fn small_range_lookback_10s_interval_10s() {
742 let expected = String::from(
743 "+---------------------+-------+------+\
744 \n| timestamp | value | path |\
745 \n+---------------------+-------+------+\
746 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
747 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
748 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
749 \n+---------------------+-------+------+",
750 );
751 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
752 }
753
754 #[tokio::test]
755 async fn large_range_lookback_30s_interval_60s() {
756 let expected = String::from(
757 "+---------------------+-------+------+\
758 \n| timestamp | value | path |\
759 \n+---------------------+-------+------+\
760 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
761 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
762 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
763 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
764 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
765 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
766 \n+---------------------+-------+------+",
767 );
768 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
769 }
770
771 #[tokio::test]
772 async fn small_range_lookback_30s_interval_30s() {
773 let expected = String::from(
774 "+---------------------+-------+------+\
775 \n| timestamp | value | path |\
776 \n+---------------------+-------+------+\
777 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
778 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
779 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
780 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
781 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
782 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
783 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
784 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
785 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
786 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
787 \n+---------------------+-------+------+",
788 );
789 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
790 }
791
792 #[tokio::test]
793 async fn lookback_10s_interval_10s_with_nan() {
794 let expected = String::from(
795 "+---------------------+-------+\
796 \n| timestamp | value |\
797 \n+---------------------+-------+\
798 \n| 1970-01-01T00:00:00 | 0.0 |\
799 \n| 1970-01-01T00:00:10 | 0.0 |\
800 \n| 1970-01-01T00:01:00 | 6.0 |\
801 \n| 1970-01-01T00:01:10 | 6.0 |\
802 \n| 1970-01-01T00:02:00 | 12.0 |\
803 \n| 1970-01-01T00:02:10 | 12.0 |\
804 \n+---------------------+-------+",
805 );
806 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
807 }
808
809 #[tokio::test]
810 async fn lookback_10s_interval_10s_with_nan_unaligned() {
811 let expected = String::from(
812 "+-------------------------+-------+\
813 \n| timestamp | value |\
814 \n+-------------------------+-------+\
815 \n| 1970-01-01T00:00:00.001 | 0.0 |\
816 \n| 1970-01-01T00:01:00.001 | 6.0 |\
817 \n| 1970-01-01T00:02:00.001 | 12.0 |\
818 \n+-------------------------+-------+",
819 );
820 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
821 }
822
823 #[tokio::test]
824 async fn ultra_large_range() {
825 let expected = String::from(
826 "+-------------------------+-------+\
827 \n| timestamp | value |\
828 \n+-------------------------+-------+\
829 \n| 1970-01-01T00:00:00.001 | 0.0 |\
830 \n| 1970-01-01T00:01:00.001 | 6.0 |\
831 \n| 1970-01-01T00:02:00.001 | 12.0 |\
832 \n+-------------------------+-------+",
833 );
834 do_normalize_test(
835 -900_000_000_000_000 + 1,
836 900_000_000_000_000,
837 10_000,
838 10_000,
839 expected,
840 true,
841 )
842 .await;
843 }
844}