1use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34 SendableRecordBatchStream, Statistics,
35};
36use datafusion_expr::col;
37use datatypes::arrow::compute;
38use datatypes::arrow::error::Result as ArrowResult;
39use futures::{Stream, StreamExt, ready};
40use greptime_proto::substrait_extension as pb;
41use prost::Message;
42use snafu::ResultExt;
43
44use crate::error::{DeserializeSnafu, Result};
45use crate::extension_plan::{
46 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
47};
48use crate::metrics::PROMQL_SERIES_COUNT;
49
50#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
56pub struct InstantManipulate {
57 start: Millisecond,
58 end: Millisecond,
59 lookback_delta: Millisecond,
60 interval: Millisecond,
61 time_index_column: String,
62 field_column: Option<String>,
64 input: LogicalPlan,
65 unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70 pub time_index_idx: u64,
71 pub field_index_idx: u64,
72}
73
74impl UserDefinedLogicalNodeCore for InstantManipulate {
75 fn name(&self) -> &str {
76 Self::name()
77 }
78
79 fn inputs(&self) -> Vec<&LogicalPlan> {
80 vec![&self.input]
81 }
82
83 fn schema(&self) -> &DFSchemaRef {
84 self.input.schema()
85 }
86
87 fn expressions(&self) -> Vec<Expr> {
88 if self.unfix.is_some() {
89 return vec![];
90 }
91
92 let mut exprs = vec![col(&self.time_index_column)];
93 if let Some(field) = &self.field_column {
94 exprs.push(col(field));
95 }
96 exprs
97 }
98
99 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
100 if self.unfix.is_some() {
101 return None;
102 }
103
104 let input_schema = self.input.schema();
105 if output_columns.is_empty() {
106 let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
107 return Some(vec![indices]);
108 }
109
110 let mut required = output_columns.to_vec();
111 required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
112 if let Some(field) = &self.field_column {
113 required.push(input_schema.index_of_column_by_name(None, field)?);
114 }
115
116 required.sort_unstable();
117 required.dedup();
118 Some(vec![required])
119 }
120
121 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122 write!(
123 f,
124 "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
125 self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
126 )
127 }
128
129 fn with_exprs_and_inputs(
130 &self,
131 _exprs: Vec<Expr>,
132 inputs: Vec<LogicalPlan>,
133 ) -> DataFusionResult<Self> {
134 if inputs.len() != 1 {
135 return Err(DataFusionError::Internal(
136 "InstantManipulate should have exact one input".to_string(),
137 ));
138 }
139
140 let input: LogicalPlan = inputs.into_iter().next().unwrap();
141 let input_schema = input.schema();
142
143 if let Some(unfix) = &self.unfix {
144 let time_index_column = resolve_column_name(
146 unfix.time_index_idx,
147 input_schema,
148 "InstantManipulate",
149 "time index",
150 )?;
151
152 let field_column = if unfix.field_index_idx == u64::MAX {
153 None
154 } else {
155 Some(resolve_column_name(
156 unfix.field_index_idx,
157 input_schema,
158 "InstantManipulate",
159 "field",
160 )?)
161 };
162
163 Ok(Self {
164 start: self.start,
165 end: self.end,
166 lookback_delta: self.lookback_delta,
167 interval: self.interval,
168 time_index_column,
169 field_column,
170 input,
171 unfix: None,
172 })
173 } else {
174 Ok(Self {
175 start: self.start,
176 end: self.end,
177 lookback_delta: self.lookback_delta,
178 interval: self.interval,
179 time_index_column: self.time_index_column.clone(),
180 field_column: self.field_column.clone(),
181 input,
182 unfix: None,
183 })
184 }
185 }
186}
187
188impl InstantManipulate {
189 pub fn new(
190 start: Millisecond,
191 end: Millisecond,
192 lookback_delta: Millisecond,
193 interval: Millisecond,
194 time_index_column: String,
195 field_column: Option<String>,
196 input: LogicalPlan,
197 ) -> Self {
198 Self {
199 start,
200 end,
201 lookback_delta,
202 interval,
203 time_index_column,
204 field_column,
205 input,
206 unfix: None,
207 }
208 }
209
210 pub const fn name() -> &'static str {
211 "InstantManipulate"
212 }
213
214 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
215 Arc::new(InstantManipulateExec {
216 start: self.start,
217 end: self.end,
218 lookback_delta: self.lookback_delta,
219 interval: self.interval,
220 time_index_column: self.time_index_column.clone(),
221 field_column: self.field_column.clone(),
222 input: exec_input,
223 metric: ExecutionPlanMetricsSet::new(),
224 })
225 }
226
227 pub fn serialize(&self) -> Vec<u8> {
228 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
229
230 let field_index_idx = self
231 .field_column
232 .as_ref()
233 .map(|name| serialize_column_index(self.input.schema(), name))
234 .unwrap_or(u64::MAX);
235
236 pb::InstantManipulate {
237 start: self.start,
238 end: self.end,
239 interval: self.interval,
240 lookback_delta: self.lookback_delta,
241 time_index_idx,
242 field_index_idx,
243 ..Default::default()
244 }
245 .encode_to_vec()
246 }
247
248 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
249 let pb_instant_manipulate =
250 pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
251 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
252 produce_one_row: false,
253 schema: Arc::new(DFSchema::empty()),
254 });
255
256 let unfix = UnfixIndices {
257 time_index_idx: pb_instant_manipulate.time_index_idx,
258 field_index_idx: pb_instant_manipulate.field_index_idx,
259 };
260
261 Ok(Self {
262 start: pb_instant_manipulate.start,
263 end: pb_instant_manipulate.end,
264 lookback_delta: pb_instant_manipulate.lookback_delta,
265 interval: pb_instant_manipulate.interval,
266 time_index_column: String::new(),
267 field_column: None,
268 input: placeholder_plan,
269 unfix: Some(unfix),
270 })
271 }
272}
273
274#[derive(Debug)]
275pub struct InstantManipulateExec {
276 start: Millisecond,
277 end: Millisecond,
278 lookback_delta: Millisecond,
279 interval: Millisecond,
280 time_index_column: String,
281 field_column: Option<String>,
282
283 input: Arc<dyn ExecutionPlan>,
284 metric: ExecutionPlanMetricsSet,
285}
286
287impl ExecutionPlan for InstantManipulateExec {
288 fn as_any(&self) -> &dyn Any {
289 self
290 }
291
292 fn schema(&self) -> SchemaRef {
293 self.input.schema()
294 }
295
296 fn properties(&self) -> &PlanProperties {
297 self.input.properties()
298 }
299
300 fn required_input_distribution(&self) -> Vec<Distribution> {
301 self.input.required_input_distribution()
302 }
303
304 fn maintains_input_order(&self) -> Vec<bool> {
306 vec![false; self.children().len()]
307 }
308
309 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
310 vec![&self.input]
311 }
312
313 fn with_new_children(
314 self: Arc<Self>,
315 children: Vec<Arc<dyn ExecutionPlan>>,
316 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
317 assert!(!children.is_empty());
318 Ok(Arc::new(Self {
319 start: self.start,
320 end: self.end,
321 lookback_delta: self.lookback_delta,
322 interval: self.interval,
323 time_index_column: self.time_index_column.clone(),
324 field_column: self.field_column.clone(),
325 input: children[0].clone(),
326 metric: self.metric.clone(),
327 }))
328 }
329
330 fn execute(
331 &self,
332 partition: usize,
333 context: Arc<TaskContext>,
334 ) -> DataFusionResult<SendableRecordBatchStream> {
335 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
336 let metrics_builder = MetricBuilder::new(&self.metric);
337 let num_series = Count::new();
338 metrics_builder
339 .with_partition(partition)
340 .build(MetricValue::Count {
341 name: METRIC_NUM_SERIES.into(),
342 count: num_series.clone(),
343 });
344
345 let input = self.input.execute(partition, context)?;
346 let schema = input.schema();
347 let time_index = schema
348 .column_with_name(&self.time_index_column)
349 .expect("time index column not found")
350 .0;
351 let field_index = self
352 .field_column
353 .as_ref()
354 .and_then(|name| schema.column_with_name(name))
355 .map(|x| x.0);
356 Ok(Box::pin(InstantManipulateStream {
357 start: self.start,
358 end: self.end,
359 lookback_delta: self.lookback_delta,
360 interval: self.interval,
361 time_index,
362 field_index,
363 schema,
364 input,
365 metric: baseline_metric,
366 num_series,
367 }))
368 }
369
370 fn metrics(&self) -> Option<MetricsSet> {
371 Some(self.metric.clone_inner())
372 }
373
374 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
375 let input_stats = self.input.partition_statistics(partition)?;
376
377 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
378 let estimated_total_bytes = input_stats
379 .total_byte_size
380 .get_value()
381 .zip(input_stats.num_rows.get_value())
382 .map(|(size, rows)| {
383 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
384 })
385 .unwrap_or(Precision::Absent);
386
387 Ok(Statistics {
388 num_rows: Precision::Inexact(estimated_row_num.floor() as _),
389 total_byte_size: estimated_total_bytes,
390 column_statistics: vec![
392 ColumnStatistics::new_unknown();
393 self.schema().flattened_fields().len()
394 ],
395 })
396 }
397
398 fn name(&self) -> &str {
399 "InstantManipulateExec"
400 }
401}
402
403impl DisplayAs for InstantManipulateExec {
404 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405 match t {
406 DisplayFormatType::Default
407 | DisplayFormatType::Verbose
408 | DisplayFormatType::TreeRender => {
409 write!(
410 f,
411 "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
412 self.start,
413 self.end,
414 self.lookback_delta,
415 self.interval,
416 self.time_index_column
417 )
418 }
419 }
420 }
421}
422
423pub struct InstantManipulateStream {
424 start: Millisecond,
425 end: Millisecond,
426 lookback_delta: Millisecond,
427 interval: Millisecond,
428 time_index: usize,
430 field_index: Option<usize>,
431
432 schema: SchemaRef,
433 input: SendableRecordBatchStream,
434 metric: BaselineMetrics,
435 num_series: Count,
437}
438
439impl RecordBatchStream for InstantManipulateStream {
440 fn schema(&self) -> SchemaRef {
441 self.schema.clone()
442 }
443}
444
445impl Stream for InstantManipulateStream {
446 type Item = DataFusionResult<RecordBatch>;
447
448 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449 let poll = match ready!(self.input.poll_next_unpin(cx)) {
450 Some(Ok(batch)) => {
451 if batch.num_rows() == 0 {
452 return Poll::Pending;
453 }
454 let timer = std::time::Instant::now();
455 self.num_series.add(1);
456 let result = Ok(batch).and_then(|batch| self.manipulate(batch));
457 self.metric.elapsed_compute().add_elapsed(timer);
458 Poll::Ready(Some(result))
459 }
460 None => {
461 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
462 Poll::Ready(None)
463 }
464 Some(Err(e)) => Poll::Ready(Some(Err(e))),
465 };
466 self.metric.record_poll(poll)
467 }
468}
469
470impl InstantManipulateStream {
471 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
474 let ts_column = input
475 .column(self.time_index)
476 .as_any()
477 .downcast_ref::<TimestampMillisecondArray>()
478 .ok_or_else(|| {
479 DataFusionError::Execution(
480 "Time index Column downcast to TimestampMillisecondArray failed".into(),
481 )
482 })?;
483
484 if ts_column.is_empty() {
486 return Ok(input);
487 }
488
489 let field_column = self
491 .field_index
492 .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
493
494 let first_ts = ts_column.value(0);
496 let last_ts = ts_column.value(ts_column.len() - 1);
497 let last_useful = last_ts + self.lookback_delta;
498
499 let max_start = first_ts.max(self.start);
500 let min_end = last_useful.min(self.end);
501
502 let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
503 let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
504
505 let mut take_indices = vec![];
506
507 let mut cursor = 0;
508
509 let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
510 let mut aligned_ts = vec![];
511
512 'next: for expected_ts in aligned_ts_iter {
514 while cursor < ts_column.len() {
516 let curr = ts_column.value(cursor);
517 match curr.cmp(&expected_ts) {
518 Ordering::Equal => {
519 if let Some(field_column) = &field_column
520 && field_column.value(cursor).is_nan()
521 {
522 } else {
524 take_indices.push(cursor as u64);
525 aligned_ts.push(expected_ts);
526 }
527 continue 'next;
528 }
529 Ordering::Greater => break,
530 Ordering::Less => {}
531 }
532 cursor += 1;
533 }
534 if cursor == ts_column.len() {
535 cursor -= 1;
536 if ts_column.value(cursor) + self.lookback_delta < expected_ts {
538 break;
539 }
540 }
541
542 let curr_ts = ts_column.value(cursor);
544 if curr_ts + self.lookback_delta < expected_ts {
545 continue;
546 }
547 if curr_ts > expected_ts {
548 if let Some(prev_cursor) = cursor.checked_sub(1) {
550 let prev_ts = ts_column.value(prev_cursor);
551 if prev_ts + self.lookback_delta >= expected_ts {
552 if let Some(field_column) = &field_column
554 && field_column.value(prev_cursor).is_nan()
555 {
556 continue;
558 }
559 take_indices.push(prev_cursor as u64);
561 aligned_ts.push(expected_ts);
562 }
563 }
564 } else if let Some(field_column) = &field_column
565 && field_column.value(cursor).is_nan()
566 {
567 } else {
569 take_indices.push(cursor as u64);
571 aligned_ts.push(expected_ts);
572 }
573 }
574
575 self.take_record_batch_optional(input, take_indices, aligned_ts)
577 }
578
579 fn take_record_batch_optional(
581 &self,
582 record_batch: RecordBatch,
583 take_indices: Vec<u64>,
584 aligned_ts: Vec<Millisecond>,
585 ) -> DataFusionResult<RecordBatch> {
586 assert_eq!(take_indices.len(), aligned_ts.len());
587
588 let indices_array = UInt64Array::from(take_indices);
589 let mut arrays = record_batch
590 .columns()
591 .iter()
592 .map(|array| compute::take(array, &indices_array, None))
593 .collect::<ArrowResult<Vec<_>>>()?;
594 arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
595
596 let result = RecordBatch::try_new(record_batch.schema(), arrays)
597 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
598 Ok(result)
599 }
600}
601
602#[cfg(test)]
603mod test {
604 use datafusion::common::ToDFSchema;
605 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
606 use datafusion::prelude::SessionContext;
607
608 use super::*;
609 use crate::extension_plan::test_util::{
610 TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
611 };
612
613 async fn do_normalize_test(
614 start: Millisecond,
615 end: Millisecond,
616 lookback_delta: Millisecond,
617 interval: Millisecond,
618 expected: String,
619 contains_nan: bool,
620 ) {
621 let memory_exec = if contains_nan {
622 Arc::new(prepare_test_data_with_nan())
623 } else {
624 Arc::new(prepare_test_data())
625 };
626 let normalize_exec = Arc::new(InstantManipulateExec {
627 start,
628 end,
629 lookback_delta,
630 interval,
631 time_index_column: TIME_INDEX_COLUMN.to_string(),
632 field_column: Some("value".to_string()),
633 input: memory_exec,
634 metric: ExecutionPlanMetricsSet::new(),
635 });
636 let session_context = SessionContext::default();
637 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
638 .await
639 .unwrap();
640 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
641 .unwrap()
642 .to_string();
643
644 assert_eq!(result_literal, expected);
645 }
646
647 #[test]
648 fn pruning_should_keep_time_and_field_columns_for_exec() {
649 let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
650 let input = LogicalPlan::EmptyRelation(EmptyRelation {
651 produce_one_row: false,
652 schema: df_schema,
653 });
654 let plan = InstantManipulate::new(
655 0,
656 0,
657 0,
658 0,
659 TIME_INDEX_COLUMN.to_string(),
660 Some("value".to_string()),
661 input,
662 );
663
664 let output_columns = [2usize];
666 let required = plan.necessary_children_exprs(&output_columns).unwrap();
667 let required = &required[0];
668 assert_eq!(required.as_slice(), &[0, 1, 2]);
669 }
670
671 #[tokio::test]
672 async fn lookback_10s_interval_30s() {
673 let expected = String::from(
674 "+---------------------+-------+------+\
675 \n| timestamp | value | path |\
676 \n+---------------------+-------+------+\
677 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
678 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
679 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
680 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
681 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
682 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
683 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
684 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
685 \n+---------------------+-------+------+",
686 );
687 do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
688 }
689
690 #[tokio::test]
691 async fn lookback_10s_interval_10s() {
692 let expected = String::from(
693 "+---------------------+-------+------+\
694 \n| timestamp | value | path |\
695 \n+---------------------+-------+------+\
696 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
697 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
698 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
699 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
700 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
701 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
702 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
703 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
704 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
705 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
706 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
707 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
708 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
709 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
710 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
711 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
712 \n+---------------------+-------+------+",
713 );
714 do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
715 }
716
717 #[tokio::test]
718 async fn lookback_30s_interval_30s() {
719 let expected = String::from(
720 "+---------------------+-------+------+\
721 \n| timestamp | value | path |\
722 \n+---------------------+-------+------+\
723 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
724 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
725 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
726 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
727 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
728 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
729 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
730 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
731 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
732 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
733 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
734 \n+---------------------+-------+------+",
735 );
736 do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
737 }
738
739 #[tokio::test]
740 async fn lookback_30s_interval_10s() {
741 let expected = String::from(
742 "+---------------------+-------+------+\
743 \n| timestamp | value | path |\
744 \n+---------------------+-------+------+\
745 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
746 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
747 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
748 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
749 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
750 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
751 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
752 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
753 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
754 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
755 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
756 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
757 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
758 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
759 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
760 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
761 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
762 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
763 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
764 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
765 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
766 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
767 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
768 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
769 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
770 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
771 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
772 \n+---------------------+-------+------+",
773 );
774 do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
775 }
776
777 #[tokio::test]
778 async fn lookback_60s_interval_10s() {
779 let expected = String::from(
780 "+---------------------+-------+------+\
781 \n| timestamp | value | path |\
782 \n+---------------------+-------+------+\
783 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
784 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
785 \n| 1970-01-01T00:00:20 | 1.0 | foo |\
786 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
787 \n| 1970-01-01T00:00:40 | 1.0 | foo |\
788 \n| 1970-01-01T00:00:50 | 1.0 | foo |\
789 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
790 \n| 1970-01-01T00:01:10 | 1.0 | foo |\
791 \n| 1970-01-01T00:01:20 | 1.0 | foo |\
792 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
793 \n| 1970-01-01T00:01:40 | 1.0 | foo |\
794 \n| 1970-01-01T00:01:50 | 1.0 | foo |\
795 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
796 \n| 1970-01-01T00:02:10 | 1.0 | foo |\
797 \n| 1970-01-01T00:02:20 | 1.0 | foo |\
798 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
799 \n| 1970-01-01T00:02:40 | 1.0 | foo |\
800 \n| 1970-01-01T00:02:50 | 1.0 | foo |\
801 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
802 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
803 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
804 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
805 \n| 1970-01-01T00:03:40 | 1.0 | foo |\
806 \n| 1970-01-01T00:03:50 | 1.0 | foo |\
807 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
808 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
809 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
810 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
811 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
812 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
813 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
814 \n+---------------------+-------+------+",
815 );
816 do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
817 }
818
819 #[tokio::test]
820 async fn lookback_60s_interval_30s() {
821 let expected = String::from(
822 "+---------------------+-------+------+\
823 \n| timestamp | value | path |\
824 \n+---------------------+-------+------+\
825 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
826 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
827 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
828 \n| 1970-01-01T00:01:30 | 1.0 | foo |\
829 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
830 \n| 1970-01-01T00:02:30 | 1.0 | foo |\
831 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
832 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
833 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
834 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
835 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
836 \n+---------------------+-------+------+",
837 );
838 do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
839 }
840
841 #[tokio::test]
842 async fn small_range_lookback_0s_interval_1s() {
843 let expected = String::from(
844 "+---------------------+-------+------+\
845 \n| timestamp | value | path |\
846 \n+---------------------+-------+------+\
847 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
848 \n| 1970-01-01T00:04:01 | 1.0 | foo |\
849 \n+---------------------+-------+------+",
850 );
851 do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
852 }
853
854 #[tokio::test]
855 async fn small_range_lookback_10s_interval_10s() {
856 let expected = String::from(
857 "+---------------------+-------+------+\
858 \n| timestamp | value | path |\
859 \n+---------------------+-------+------+\
860 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
861 \n| 1970-01-01T00:00:10 | 1.0 | foo |\
862 \n| 1970-01-01T00:00:30 | 1.0 | foo |\
863 \n+---------------------+-------+------+",
864 );
865 do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
866 }
867
868 #[tokio::test]
869 async fn large_range_lookback_30s_interval_60s() {
870 let expected = String::from(
871 "+---------------------+-------+------+\
872 \n| timestamp | value | path |\
873 \n+---------------------+-------+------+\
874 \n| 1970-01-01T00:00:00 | 1.0 | foo |\
875 \n| 1970-01-01T00:01:00 | 1.0 | foo |\
876 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
877 \n| 1970-01-01T00:03:00 | 1.0 | foo |\
878 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
879 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
880 \n+---------------------+-------+------+",
881 );
882 do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
883 }
884
885 #[tokio::test]
886 async fn small_range_lookback_30s_interval_30s() {
887 let expected = String::from(
888 "+---------------------+-------+------+\
889 \n| timestamp | value | path |\
890 \n+---------------------+-------+------+\
891 \n| 1970-01-01T00:03:10 | 1.0 | foo |\
892 \n| 1970-01-01T00:03:20 | 1.0 | foo |\
893 \n| 1970-01-01T00:03:30 | 1.0 | foo |\
894 \n| 1970-01-01T00:04:00 | 1.0 | foo |\
895 \n| 1970-01-01T00:04:10 | 1.0 | foo |\
896 \n| 1970-01-01T00:04:20 | 1.0 | foo |\
897 \n| 1970-01-01T00:04:30 | 1.0 | foo |\
898 \n| 1970-01-01T00:04:40 | 1.0 | foo |\
899 \n| 1970-01-01T00:04:50 | 1.0 | foo |\
900 \n| 1970-01-01T00:05:00 | 1.0 | foo |\
901 \n+---------------------+-------+------+",
902 );
903 do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
904 }
905
906 #[tokio::test]
907 async fn lookback_10s_interval_10s_with_nan() {
908 let expected = String::from(
909 "+---------------------+-------+\
910 \n| timestamp | value |\
911 \n+---------------------+-------+\
912 \n| 1970-01-01T00:00:00 | 0.0 |\
913 \n| 1970-01-01T00:00:10 | 0.0 |\
914 \n| 1970-01-01T00:01:00 | 6.0 |\
915 \n| 1970-01-01T00:01:10 | 6.0 |\
916 \n| 1970-01-01T00:02:00 | 12.0 |\
917 \n| 1970-01-01T00:02:10 | 12.0 |\
918 \n+---------------------+-------+",
919 );
920 do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
921 }
922
923 #[tokio::test]
924 async fn lookback_10s_interval_10s_with_nan_unaligned() {
925 let expected = String::from(
926 "+-------------------------+-------+\
927 \n| timestamp | value |\
928 \n+-------------------------+-------+\
929 \n| 1970-01-01T00:00:00.001 | 0.0 |\
930 \n| 1970-01-01T00:01:00.001 | 6.0 |\
931 \n| 1970-01-01T00:02:00.001 | 12.0 |\
932 \n+-------------------------+-------+",
933 );
934 do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
935 }
936
937 #[tokio::test]
938 async fn ultra_large_range() {
939 let expected = String::from(
940 "+-------------------------+-------+\
941 \n| timestamp | value |\
942 \n+-------------------------+-------+\
943 \n| 1970-01-01T00:00:00.001 | 0.0 |\
944 \n| 1970-01-01T00:01:00.001 | 6.0 |\
945 \n| 1970-01-01T00:02:00.001 | 12.0 |\
946 \n+-------------------------+-------+",
947 );
948 do_normalize_test(
949 -900_000_000_000_000 + 1,
950 900_000_000_000_000,
951 10_000,
952 10_000,
953 expected,
954 true,
955 )
956 .await;
957 }
958}