1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datafusion_expr::col;
37use datatypes::arrow::compute;
38use datatypes::arrow::error::Result as ArrowResult;
39use futures::{Stream, StreamExt, ready};
40use greptime_proto::substrait_extension as pb;
41use prost::Message;
42use snafu::ResultExt;
43
44use crate::error::{DeserializeSnafu, Result};
45use crate::extension_plan::{
46 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
47};
48use crate::metrics::PROMQL_SERIES_COUNT;
49
50#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
56pub struct InstantManipulate {
57 start: Millisecond,
58 end: Millisecond,
59 lookback_delta: Millisecond,
60 interval: Millisecond,
61 time_index_column: String,
62 field_column: Option<String>,
64 input: LogicalPlan,
65 unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70 pub time_index_idx: u64,
71 pub field_index_idx: u64,
72}
73
74impl UserDefinedLogicalNodeCore for InstantManipulate {
75 fn name(&self) -> &str {
76 Self::name()
77 }
78
79 fn inputs(&self) -> Vec<&LogicalPlan> {
80 vec![&self.input]
81 }
82
83 fn schema(&self) -> &DFSchemaRef {
84 self.input.schema()
85 }
86
87 fn expressions(&self) -> Vec<Expr> {
88 if self.unfix.is_some() {
89 return vec![];
90 }
91
92 let mut exprs = vec![col(&self.time_index_column)];
93 if let Some(field) = &self.field_column {
94 exprs.push(col(field));
95 }
96 exprs
97 }
98
99 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
100 if self.unfix.is_some() {
101 return None;
102 }
103
104 let input_schema = self.input.schema();
105 if output_columns.is_empty() {
106 let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
107 return Some(vec![indices]);
108 }
109
110 let mut required = output_columns.to_vec();
111 required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
112 if let Some(field) = &self.field_column {
113 required.push(input_schema.index_of_column_by_name(None, field)?);
114 }
115
116 required.sort_unstable();
117 required.dedup();
118 Some(vec![required])
119 }
120
121 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122 write!(
123 f,
124 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
125 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
126 )
127 }
128
129 fn with_exprs_and_inputs(
130 &self,
131 _exprs: Vec<Expr>,
132 inputs: Vec<LogicalPlan>,
133 ) -> DataFusionResult<Self> {
134 if inputs.len() != 1 {
135 return Err(DataFusionError::Internal(
136 "InstantManipulate should have exact one input".to_string(),
137 ));
138 }
139
140 let input: LogicalPlan = inputs.into_iter().next().unwrap();
141 let input_schema = input.schema();
142
143 if let Some(unfix) = &self.unfix {
144 let time_index_column = resolve_column_name(
146 unfix.time_index_idx,
147 input_schema,
148 "InstantManipulate",
149 "time index",
150 )?;
151
152 let field_column = if unfix.field_index_idx == u64::MAX {
153 None
154 } else {
155 Some(resolve_column_name(
156 unfix.field_index_idx,
157 input_schema,
158 "InstantManipulate",
159 "field",
160 )?)
161 };
162
163 Ok(Self {
164 start: self.start,
165 end: self.end,
166 lookback_delta: self.lookback_delta,
167 interval: self.interval,
168 time_index_column,
169 field_column,
170 input,
171 unfix: None,
172 })
173 } else {
174 Ok(Self {
175 start: self.start,
176 end: self.end,
177 lookback_delta: self.lookback_delta,
178 interval: self.interval,
179 time_index_column: self.time_index_column.clone(),
180 field_column: self.field_column.clone(),
181 input,
182 unfix: None,
183 })
184 }
185 }
186}
187
188impl InstantManipulate {
189 pub fn new(
190 start: Millisecond,
191 end: Millisecond,
192 lookback_delta: Millisecond,
193 interval: Millisecond,
194 time_index_column: String,
195 field_column: Option<String>,
196 input: LogicalPlan,
197 ) -> Self {
198 Self {
199 start,
200 end,
201 lookback_delta,
202 interval,
203 time_index_column,
204 field_column,
205 input,
206 unfix: None,
207 }
208 }
209
210 pub const fn name() -> &'static str {
211 "InstantManipulate"
212 }
213
214 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
215 Arc::new(InstantManipulateExec {
216 start: self.start,
217 end: self.end,
218 lookback_delta: self.lookback_delta,
219 interval: self.interval,
220 time_index_column: self.time_index_column.clone(),
221 field_column: self.field_column.clone(),
222 input: exec_input,
223 metric: ExecutionPlanMetricsSet::new(),
224 })
225 }
226
227 pub fn serialize(&self) -> Vec<u8> {
228 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
229
230 let field_index_idx = self
231 .field_column
232 .as_ref()
233 .map(|name| serialize_column_index(self.input.schema(), name))
234 .unwrap_or(u64::MAX);
235
236 pb::InstantManipulate {
237 start: self.start,
238 end: self.end,
239 interval: self.interval,
240 lookback_delta: self.lookback_delta,
241 time_index_idx,
242 field_index_idx,
243 ..Default::default()
244 }
245 .encode_to_vec()
246 }
247
248 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
249 let pb_instant_manipulate =
250 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
251 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
252 produce_one_row: false,
253 schema: Arc::new(DFSchema::empty()),
254 });
255
256 let unfix = UnfixIndices {
257 time_index_idx: pb_instant_manipulate.time_index_idx,
258 field_index_idx: pb_instant_manipulate.field_index_idx,
259 };
260
261 Ok(Self {
262 start: pb_instant_manipulate.start,
263 end: pb_instant_manipulate.end,
264 lookback_delta: pb_instant_manipulate.lookback_delta,
265 interval: pb_instant_manipulate.interval,
266 time_index_column: String::new(),
267 field_column: None,
268 input: placeholder_plan,
269 unfix: Some(unfix),
270 })
271 }
272}
273
274#[derive(Debug)]
275pub struct InstantManipulateExec {
276 start: Millisecond,
277 end: Millisecond,
278 lookback_delta: Millisecond,
279 interval: Millisecond,
280 time_index_column: String,
281 field_column: Option<String>,
282
283 input: Arc<dyn ExecutionPlan>,
284 metric: ExecutionPlanMetricsSet,
285}
286
287impl ExecutionPlan for InstantManipulateExec {
288 fn as_any(&self) -> &dyn Any {
289 self
290 }
291
292 fn schema(&self) -> SchemaRef {
293 self.input.schema()
294 }
295
296 fn properties(&self) -> &PlanProperties {
297 self.input.properties()
298 }
299
300 fn required_input_distribution(&self) -> Vec<Distribution> {
301 self.input.required_input_distribution()
302 }
303
304 fn maintains_input_order(&self) -> Vec<bool> {
306 vec![false; self.children().len()]
307 }
308
309 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
310 vec![&self.input]
311 }
312
313 fn with_new_children(
314 self: Arc<Self>,
315 children: Vec<Arc<dyn ExecutionPlan>>,
316 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
317 assert!(!children.is_empty());
318 Ok(Arc::new(Self {
319 start: self.start,
320 end: self.end,
321 lookback_delta: self.lookback_delta,
322 interval: self.interval,
323 time_index_column: self.time_index_column.clone(),
324 field_column: self.field_column.clone(),
325 input: children[0].clone(),
326 metric: self.metric.clone(),
327 }))
328 }
329
330 fn execute(
331 &self,
332 partition: usize,
333 context: Arc<TaskContext>,
334 ) -> DataFusionResult<SendableRecordBatchStream> {
335 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
336 let metrics_builder = MetricBuilder::new(&self.metric);
337 let num_series = Count::new();
338 metrics_builder
339 .with_partition(partition)
340 .build(MetricValue::Count {
341 name: METRIC_NUM_SERIES.into(),
342 count: num_series.clone(),
343 });
344
345 let input = self.input.execute(partition, context)?;
346 let schema = input.schema();
347 let time_index = schema
348 .column_with_name(&self.time_index_column)
349 .expect("time index column not found")
350 .0;
351 let field_index = self
352 .field_column
353 .as_ref()
354 .and_then(|name| schema.column_with_name(name))
355 .map(|x| x.0);
356 Ok(Box::pin(InstantManipulateStream {
357 start: self.start,
358 end: self.end,
359 lookback_delta: self.lookback_delta,
360 interval: self.interval,
361 time_index,
362 field_index,
363 schema,
364 input,
365 metric: baseline_metric,
366 num_series,
367 }))
368 }
369
370 fn metrics(&self) -> Option<MetricsSet> {
371 Some(self.metric.clone_inner())
372 }
373
374 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
375 let input_stats = self.input.partition_statistics(partition)?;
376
377 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
378 let estimated_total_bytes = input_stats
379 .total_byte_size
380 .get_value()
381 .zip(input_stats.num_rows.get_value())
382 .map(|(size, rows)| {
383 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
384 })
385 .unwrap_or(Precision::Absent);
386
387 Ok(Statistics {
388 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
389 total_byte_size: estimated_total_bytes,
390 column_statistics: Statistics::unknown_column(&self.schema()),
392 })
393 }
394
395 fn name(&self) -> &str {
396 "InstantManipulateExec"
397 }
398}
399
400impl DisplayAs for InstantManipulateExec {
401 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
402 match t {
403 DisplayFormatType::Default
404 | DisplayFormatType::Verbose
405 | DisplayFormatType::TreeRender => {
406 write!(
407 f,
408 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
409 self.start,
410 self.end,
411 self.lookback_delta,
412 self.interval,
413 self.time_index_column
414 )
415 }
416 }
417 }
418}
419
420pub struct InstantManipulateStream {
421 start: Millisecond,
422 end: Millisecond,
423 lookback_delta: Millisecond,
424 interval: Millisecond,
425 time_index: usize,
427 field_index: Option<usize>,
428
429 schema: SchemaRef,
430 input: SendableRecordBatchStream,
431 metric: BaselineMetrics,
432 num_series: Count,
434}
435
436impl RecordBatchStream for InstantManipulateStream {
437 fn schema(&self) -> SchemaRef {
438 self.schema.clone()
439 }
440}
441
442impl Stream for InstantManipulateStream {
443 type Item = DataFusionResult<RecordBatch>;
444
445 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
446 let poll = match ready!(self.input.poll_next_unpin(cx)) {
447 Some(Ok(batch)) => {
448 if batch.num_rows() == 0 {
449 return Poll::Pending;
450 }
451 let timer = std::time::Instant::now();
452 self.num_series.add(1);
453 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
454 self.metric.elapsed_compute().add_elapsed(timer);
455 Poll::Ready(Some(result))
456 }
457 None => {
458 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
459 Poll::Ready(None)
460 }
461 Some(Err(e)) => Poll::Ready(Some(Err(e))),
462 };
463 self.metric.record_poll(poll)
464 }
465}
466
467impl InstantManipulateStream {
468 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
474 let ts_column = input
475 .column(self.time_index)
476 .as_any()
477 .downcast_ref::<TimestampMillisecondArray>()
478 .ok_or_else(|| {
479 DataFusionError::Execution(
480 "Time index Column downcast to TimestampMillisecondArray failed".into(),
481 )
482 })?;
483
484 if ts_column.is_empty() {
486 return Ok(input);
487 }
488
489 let field_column = self
491 .field_index
492 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
493
494 let first_ts = ts_column.value(0);
496 let last_ts = ts_column.value(ts_column.len() - 1);
497 let last_useful = if self.lookback_delta > 0 {
502 last_ts + self.lookback_delta - 1
503 } else {
504 last_ts
505 };
506
507 let max_start = first_ts.max(self.start);
508 let min_end = last_useful.min(self.end);
509
510 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
511 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
512
513 let mut take_indices = vec![];
514
515 let mut cursor = 0;
516
517 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
518 let mut aligned_ts = vec![];
519
520 'next: for expected_ts in aligned_ts_iter {
522 while cursor < ts_column.len() {
524 let curr = ts_column.value(cursor);
525 match curr.cmp(&expected_ts) {
526 Ordering::Equal => {
527 if let Some(field_column) = &field_column
528 && field_column.value(cursor).is_nan()
529 {
530 } else {
532 take_indices.push(cursor as u64);
533 aligned_ts.push(expected_ts);
534 }
535 continue 'next;
536 }
537 Ordering::Greater => break,
538 Ordering::Less => {}
539 }
540 cursor += 1;
541 }
542 if cursor == ts_column.len() {
543 cursor -= 1;
544 if ts_column.value(cursor) + self.lookback_delta <= expected_ts {
546 break;
547 }
548 }
549
550 let curr_ts = ts_column.value(cursor);
552 if curr_ts + self.lookback_delta <= expected_ts {
553 continue;
554 }
555 if curr_ts > expected_ts {
556 if let Some(prev_cursor) = cursor.checked_sub(1) {
558 let prev_ts = ts_column.value(prev_cursor);
559 if prev_ts + self.lookback_delta > expected_ts {
560 if let Some(field_column) = &field_column
562 && field_column.value(prev_cursor).is_nan()
563 {
564 continue;
566 }
567 take_indices.push(prev_cursor as u64);
569 aligned_ts.push(expected_ts);
570 }
571 }
572 } else if let Some(field_column) = &field_column
573 && field_column.value(cursor).is_nan()
574 {
575 } else {
577 take_indices.push(cursor as u64);
579 aligned_ts.push(expected_ts);
580 }
581 }
582
583 self.take_record_batch_optional(input, take_indices, aligned_ts)
585 }
586
587 fn take_record_batch_optional(
589 &self,
590 record_batch: RecordBatch,
591 take_indices: Vec<u64>,
592 aligned_ts: Vec<Millisecond>,
593 ) -> DataFusionResult<RecordBatch> {
594 assert_eq!(take_indices.len(), aligned_ts.len());
595
596 let indices_array = UInt64Array::from(take_indices);
597 let mut arrays = record_batch
598 .columns()
599 .iter()
600 .map(|array| compute::take(array, &indices_array, None))
601 .collect::<ArrowResult<Vec<_>>>()?;
602 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
603
604 let result = RecordBatch::try_new(record_batch.schema(), arrays)
605 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
606 Ok(result)
607 }
608}
609
610#[cfg(test)]
611mod test {
612 use datafusion::common::ToDFSchema;
613 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
614 use datafusion::prelude::SessionContext;
615
616 use super::*;
617 use crate::extension_plan::test_util::{
618 TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
619 };
620
621 async fn do_normalize_test(
622 start: Millisecond,
623 end: Millisecond,
624 lookback_delta: Millisecond,
625 interval: Millisecond,
626 expected: String,
627 contains_nan: bool,
628 ) {
629 let memory_exec = if contains_nan {
630 Arc::new(prepare_test_data_with_nan())
631 } else {
632 Arc::new(prepare_test_data())
633 };
634 let normalize_exec = Arc::new(InstantManipulateExec {
635 start,
636 end,
637 lookback_delta,
638 interval,
639 time_index_column: TIME_INDEX_COLUMN.to_string(),
640 field_column: Some("value".to_string()),
641 input: memory_exec,
642 metric: ExecutionPlanMetricsSet::new(),
643 });
644 let session_context = SessionContext::default();
645 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
646 .await
647 .unwrap();
648 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
649 .unwrap()
650 .to_string();
651
652 assert_eq!(result_literal, expected);
653 }
654
655 #[test]
656 fn pruning_should_keep_time_and_field_columns_for_exec() {
657 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
658 let input = LogicalPlan::EmptyRelation(EmptyRelation {
659 produce_one_row: false,
660 schema: df_schema,
661 });
662 let plan = InstantManipulate::new(
663 0,
664 0,
665 0,
666 0,
667 TIME_INDEX_COLUMN.to_string(),
668 Some("value".to_string()),
669 input,
670 );
671
672 let output_columns = [2usize];
674 let required = plan.necessary_children_exprs(&output_columns).unwrap();
675 let required = &required[0];
676 assert_eq!(required.as_slice(), &[0, 1, 2]);
677 }
678
679 #[tokio::test]
680 async fn lookback_10s_interval_30s() {
681 let expected = String::from(
682 "+---------------------+-------+------+\
683 \n| timestamp | value | path |\
684 \n+---------------------+-------+------+\
685 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
686 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
687 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
688 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
689 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
690 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
691 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
692 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
693 \n+---------------------+-------+------+",
694 );
695 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
696 }
697
698 #[tokio::test]
699 async fn lookback_10s_interval_10s() {
700 let expected = String::from(
701 "+---------------------+-------+------+\
702 \n| timestamp | value | path |\
703 \n+---------------------+-------+------+\
704 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
705 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
706 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
707 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
708 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
709 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
710 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
711 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
712 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
713 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
714 \n+---------------------+-------+------+",
715 );
716 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
717 }
718
719 #[tokio::test]
720 async fn lookback_30s_interval_30s() {
721 let expected = String::from(
722 "+---------------------+-------+------+\
723 \n| timestamp | value | path |\
724 \n+---------------------+-------+------+\
725 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
726 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
727 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
728 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
729 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
730 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
731 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
732 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
733 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
734 \n+---------------------+-------+------+",
735 );
736 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
737 }
738
739 #[tokio::test]
740 async fn lookback_30s_interval_10s() {
741 let expected = String::from(
742 "+---------------------+-------+------+\
743 \n| timestamp | value | path |\
744 \n+---------------------+-------+------+\
745 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
746 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
747 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
748 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
749 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
750 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
751 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
752 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
753 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
754 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
755 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
756 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
757 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
758 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
759 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
760 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
761 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
762 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
763 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
764 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
765 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
766 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
767 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
768 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
769 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
770 \n+---------------------+-------+------+",
771 );
772 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
773 }
774
775 #[tokio::test]
776 async fn lookback_60s_interval_10s() {
777 let expected = String::from(
778 "+---------------------+-------+------+\
779 \n| timestamp | value | path |\
780 \n+---------------------+-------+------+\
781 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
782 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
783 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
784 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
785 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
786 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
787 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
788 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
789 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
790 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
791 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
792 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
793 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
794 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
795 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
796 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
797 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
798 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
799 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
800 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
801 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
802 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
803 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
804 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
805 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
806 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
807 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
808 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
809 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
810 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
811 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
812 \n+---------------------+-------+------+",
813 );
814 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
815 }
816
817 #[tokio::test]
818 async fn lookback_60s_interval_30s() {
819 let expected = String::from(
820 "+---------------------+-------+------+\
821 \n| timestamp | value | path |\
822 \n+---------------------+-------+------+\
823 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
824 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
825 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
826 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
827 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
828 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
829 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
830 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
831 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
832 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
833 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
834 \n+---------------------+-------+------+",
835 );
836 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
837 }
838
839 #[tokio::test]
840 async fn small_range_lookback_0s_interval_1s() {
841 let expected = String::from(
842 "+---------------------+-------+------+\
843 \n| timestamp | value | path |\
844 \n+---------------------+-------+------+\
845 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
846 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
847 \n+---------------------+-------+------+",
848 );
849 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
850 }
851
852 #[tokio::test]
853 async fn small_range_lookback_10s_interval_10s() {
854 let expected = String::from(
855 "+---------------------+-------+------+\
856 \n| timestamp | value | path |\
857 \n+---------------------+-------+------+\
858 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
859 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
860 \n+---------------------+-------+------+",
861 );
862 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
863 }
864
865 #[tokio::test]
866 async fn large_range_lookback_30s_interval_60s() {
867 let expected = String::from(
868 "+---------------------+-------+------+\
869 \n| timestamp | value | path |\
870 \n+---------------------+-------+------+\
871 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
872 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
873 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
874 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
875 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
876 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
877 \n+---------------------+-------+------+",
878 );
879 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
880 }
881
882 #[tokio::test]
883 async fn small_range_lookback_30s_interval_30s() {
884 let expected = String::from(
885 "+---------------------+-------+------+\
886 \n| timestamp | value | path |\
887 \n+---------------------+-------+------+\
888 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
889 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
890 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
891 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
892 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
893 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
894 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
895 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
896 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
897 \n+---------------------+-------+------+",
898 );
899 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
900 }
901
902 #[tokio::test]
903 async fn lookback_10s_interval_10s_with_nan() {
904 let expected = String::from(
905 "+---------------------+-------+\
906 \n| timestamp | value |\
907 \n+---------------------+-------+\
908 \n| 1970-01-01T00:00:00 | 0.0 |\
909 \n| 1970-01-01T00:01:00 | 6.0 |\
910 \n| 1970-01-01T00:02:00 | 12.0 |\
911 \n+---------------------+-------+",
912 );
913 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
914 }
915
916 #[tokio::test]
917 async fn lookback_10s_interval_10s_with_nan_unaligned() {
918 let expected = String::from(
919 "+-------------------------+-------+\
920 \n| timestamp | value |\
921 \n+-------------------------+-------+\
922 \n| 1970-01-01T00:00:00.001 | 0.0 |\
923 \n| 1970-01-01T00:01:00.001 | 6.0 |\
924 \n| 1970-01-01T00:02:00.001 | 12.0 |\
925 \n+-------------------------+-------+",
926 );
927 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
928 }
929
930 #[tokio::test]
931 async fn ultra_large_range() {
932 let expected = String::from(
933 "+-------------------------+-------+\
934 \n| timestamp | value |\
935 \n+-------------------------+-------+\
936 \n| 1970-01-01T00:00:00.001 | 0.0 |\
937 \n| 1970-01-01T00:01:00.001 | 6.0 |\
938 \n| 1970-01-01T00:02:00.001 | 12.0 |\
939 \n+-------------------------+-------+",
940 );
941 do_normalize_test(
942 -900_000_000_000_000 + 1,
943 900_000_000_000_000,
944 10_000,
945 10_000,
946 expected,
947 true,
948 )
949 .await;
950 }
951}