promql/extension_plan/
instant_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream, Statistics,
35};
36use datafusion_expr::col;
37use datatypes::arrow::compute;
38use datatypes::arrow::error::Result as ArrowResult;
39use futures::{Stream, StreamExt, ready};
40use greptime_proto::substrait_extension as pb;
41use prost::Message;
42use snafu::ResultExt;
43
44use crate::error::{DeserializeSnafu, Result};
45use crate::extension_plan::{
46    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
47};
48use crate::metrics::PROMQL_SERIES_COUNT;
49
50/// Manipulate the input record batch to make it suitable for Instant Operator.
51///
52/// This plan will try to align the input time series, for every timestamp between
53/// `start` and `end` with step `interval`. Find in the `lookback` range if data
54/// is missing at the given timestamp.
55#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
56pub struct InstantManipulate {
57    start: Millisecond,
58    end: Millisecond,
59    lookback_delta: Millisecond,
60    interval: Millisecond,
61    time_index_column: String,
62    /// A optional column for validating staleness
63    field_column: Option<String>,
64    input: LogicalPlan,
65    unfix: Option<UnfixIndices>,
66}
67
68#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
69struct UnfixIndices {
70    pub time_index_idx: u64,
71    pub field_index_idx: u64,
72}
73
74impl UserDefinedLogicalNodeCore for InstantManipulate {
75    fn name(&self) -> &str {
76        Self::name()
77    }
78
79    fn inputs(&self) -> Vec<&LogicalPlan> {
80        vec![&self.input]
81    }
82
83    fn schema(&self) -> &DFSchemaRef {
84        self.input.schema()
85    }
86
87    fn expressions(&self) -> Vec<Expr> {
88        if self.unfix.is_some() {
89            return vec![];
90        }
91
92        let mut exprs = vec![col(&self.time_index_column)];
93        if let Some(field) = &self.field_column {
94            exprs.push(col(field));
95        }
96        exprs
97    }
98
99    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
100        if self.unfix.is_some() {
101            return None;
102        }
103
104        let input_schema = self.input.schema();
105        if output_columns.is_empty() {
106            let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
107            return Some(vec![indices]);
108        }
109
110        let mut required = output_columns.to_vec();
111        required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
112        if let Some(field) = &self.field_column {
113            required.push(input_schema.index_of_column_by_name(None, field)?);
114        }
115
116        required.sort_unstable();
117        required.dedup();
118        Some(vec![required])
119    }
120
121    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122        write!(
123            f,
124            "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
125            self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
126        )
127    }
128
129    fn with_exprs_and_inputs(
130        &self,
131        _exprs: Vec<Expr>,
132        inputs: Vec<LogicalPlan>,
133    ) -> DataFusionResult<Self> {
134        if inputs.len() != 1 {
135            return Err(DataFusionError::Internal(
136                "InstantManipulate should have exact one input".to_string(),
137            ));
138        }
139
140        let input: LogicalPlan = inputs.into_iter().next().unwrap();
141        let input_schema = input.schema();
142
143        if let Some(unfix) = &self.unfix {
144            // transform indices to names
145            let time_index_column = resolve_column_name(
146                unfix.time_index_idx,
147                input_schema,
148                "InstantManipulate",
149                "time index",
150            )?;
151
152            let field_column = if unfix.field_index_idx == u64::MAX {
153                None
154            } else {
155                Some(resolve_column_name(
156                    unfix.field_index_idx,
157                    input_schema,
158                    "InstantManipulate",
159                    "field",
160                )?)
161            };
162
163            Ok(Self {
164                start: self.start,
165                end: self.end,
166                lookback_delta: self.lookback_delta,
167                interval: self.interval,
168                time_index_column,
169                field_column,
170                input,
171                unfix: None,
172            })
173        } else {
174            Ok(Self {
175                start: self.start,
176                end: self.end,
177                lookback_delta: self.lookback_delta,
178                interval: self.interval,
179                time_index_column: self.time_index_column.clone(),
180                field_column: self.field_column.clone(),
181                input,
182                unfix: None,
183            })
184        }
185    }
186}
187
188impl InstantManipulate {
189    pub fn new(
190        start: Millisecond,
191        end: Millisecond,
192        lookback_delta: Millisecond,
193        interval: Millisecond,
194        time_index_column: String,
195        field_column: Option<String>,
196        input: LogicalPlan,
197    ) -> Self {
198        Self {
199            start,
200            end,
201            lookback_delta,
202            interval,
203            time_index_column,
204            field_column,
205            input,
206            unfix: None,
207        }
208    }
209
210    pub const fn name() -> &'static str {
211        "InstantManipulate"
212    }
213
214    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
215        Arc::new(InstantManipulateExec {
216            start: self.start,
217            end: self.end,
218            lookback_delta: self.lookback_delta,
219            interval: self.interval,
220            time_index_column: self.time_index_column.clone(),
221            field_column: self.field_column.clone(),
222            input: exec_input,
223            metric: ExecutionPlanMetricsSet::new(),
224        })
225    }
226
227    pub fn serialize(&self) -> Vec<u8> {
228        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
229
230        let field_index_idx = self
231            .field_column
232            .as_ref()
233            .map(|name| serialize_column_index(self.input.schema(), name))
234            .unwrap_or(u64::MAX);
235
236        pb::InstantManipulate {
237            start: self.start,
238            end: self.end,
239            interval: self.interval,
240            lookback_delta: self.lookback_delta,
241            time_index_idx,
242            field_index_idx,
243            ..Default::default()
244        }
245        .encode_to_vec()
246    }
247
248    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
249        let pb_instant_manipulate =
250            pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
251        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
252            produce_one_row: false,
253            schema: Arc::new(DFSchema::empty()),
254        });
255
256        let unfix = UnfixIndices {
257            time_index_idx: pb_instant_manipulate.time_index_idx,
258            field_index_idx: pb_instant_manipulate.field_index_idx,
259        };
260
261        Ok(Self {
262            start: pb_instant_manipulate.start,
263            end: pb_instant_manipulate.end,
264            lookback_delta: pb_instant_manipulate.lookback_delta,
265            interval: pb_instant_manipulate.interval,
266            time_index_column: String::new(),
267            field_column: None,
268            input: placeholder_plan,
269            unfix: Some(unfix),
270        })
271    }
272}
273
274#[derive(Debug)]
275pub struct InstantManipulateExec {
276    start: Millisecond,
277    end: Millisecond,
278    lookback_delta: Millisecond,
279    interval: Millisecond,
280    time_index_column: String,
281    field_column: Option<String>,
282
283    input: Arc<dyn ExecutionPlan>,
284    metric: ExecutionPlanMetricsSet,
285}
286
287impl ExecutionPlan for InstantManipulateExec {
288    fn as_any(&self) -> &dyn Any {
289        self
290    }
291
292    fn schema(&self) -> SchemaRef {
293        self.input.schema()
294    }
295
296    fn properties(&self) -> &PlanProperties {
297        self.input.properties()
298    }
299
300    fn required_input_distribution(&self) -> Vec<Distribution> {
301        self.input.required_input_distribution()
302    }
303
304    // Prevent reordering of input
305    fn maintains_input_order(&self) -> Vec<bool> {
306        vec![false; self.children().len()]
307    }
308
309    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
310        vec![&self.input]
311    }
312
313    fn with_new_children(
314        self: Arc<Self>,
315        children: Vec<Arc<dyn ExecutionPlan>>,
316    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
317        assert!(!children.is_empty());
318        Ok(Arc::new(Self {
319            start: self.start,
320            end: self.end,
321            lookback_delta: self.lookback_delta,
322            interval: self.interval,
323            time_index_column: self.time_index_column.clone(),
324            field_column: self.field_column.clone(),
325            input: children[0].clone(),
326            metric: self.metric.clone(),
327        }))
328    }
329
330    fn execute(
331        &self,
332        partition: usize,
333        context: Arc<TaskContext>,
334    ) -> DataFusionResult<SendableRecordBatchStream> {
335        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
336        let metrics_builder = MetricBuilder::new(&self.metric);
337        let num_series = Count::new();
338        metrics_builder
339            .with_partition(partition)
340            .build(MetricValue::Count {
341                name: METRIC_NUM_SERIES.into(),
342                count: num_series.clone(),
343            });
344
345        let input = self.input.execute(partition, context)?;
346        let schema = input.schema();
347        let time_index = schema
348            .column_with_name(&self.time_index_column)
349            .expect("time index column not found")
350            .0;
351        let field_index = self
352            .field_column
353            .as_ref()
354            .and_then(|name| schema.column_with_name(name))
355            .map(|x| x.0);
356        Ok(Box::pin(InstantManipulateStream {
357            start: self.start,
358            end: self.end,
359            lookback_delta: self.lookback_delta,
360            interval: self.interval,
361            time_index,
362            field_index,
363            schema,
364            input,
365            metric: baseline_metric,
366            num_series,
367        }))
368    }
369
370    fn metrics(&self) -> Option<MetricsSet> {
371        Some(self.metric.clone_inner())
372    }
373
374    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
375        let input_stats = self.input.partition_statistics(partition)?;
376
377        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
378        let estimated_total_bytes = input_stats
379            .total_byte_size
380            .get_value()
381            .zip(input_stats.num_rows.get_value())
382            .map(|(size, rows)| {
383                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
384            })
385            .unwrap_or(Precision::Absent);
386
387        Ok(Statistics {
388            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
389            total_byte_size: estimated_total_bytes,
390            // TODO(ruihang): support this column statistics
391            column_statistics: vec![
392                ColumnStatistics::new_unknown();
393                self.schema().flattened_fields().len()
394            ],
395        })
396    }
397
398    fn name(&self) -> &str {
399        "InstantManipulateExec"
400    }
401}
402
403impl DisplayAs for InstantManipulateExec {
404    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
405        match t {
406            DisplayFormatType::Default
407            | DisplayFormatType::Verbose
408            | DisplayFormatType::TreeRender => {
409                write!(
410                    f,
411                    "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
412                    self.start,
413                    self.end,
414                    self.lookback_delta,
415                    self.interval,
416                    self.time_index_column
417                )
418            }
419        }
420    }
421}
422
423pub struct InstantManipulateStream {
424    start: Millisecond,
425    end: Millisecond,
426    lookback_delta: Millisecond,
427    interval: Millisecond,
428    // Column index of TIME INDEX column's position in schema
429    time_index: usize,
430    field_index: Option<usize>,
431
432    schema: SchemaRef,
433    input: SendableRecordBatchStream,
434    metric: BaselineMetrics,
435    /// Number of series processed.
436    num_series: Count,
437}
438
439impl RecordBatchStream for InstantManipulateStream {
440    fn schema(&self) -> SchemaRef {
441        self.schema.clone()
442    }
443}
444
445impl Stream for InstantManipulateStream {
446    type Item = DataFusionResult<RecordBatch>;
447
448    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
449        let poll = match ready!(self.input.poll_next_unpin(cx)) {
450            Some(Ok(batch)) => {
451                if batch.num_rows() == 0 {
452                    return Poll::Pending;
453                }
454                let timer = std::time::Instant::now();
455                self.num_series.add(1);
456                let result = Ok(batch).and_then(|batch| self.manipulate(batch));
457                self.metric.elapsed_compute().add_elapsed(timer);
458                Poll::Ready(Some(result))
459            }
460            None => {
461                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
462                Poll::Ready(None)
463            }
464            Some(Err(e)) => Poll::Ready(Some(Err(e))),
465        };
466        self.metric.record_poll(poll)
467    }
468}
469
470impl InstantManipulateStream {
471    // refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
472    // and the function `vectorSelectorSingle`
473    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
474        let ts_column = input
475            .column(self.time_index)
476            .as_any()
477            .downcast_ref::<TimestampMillisecondArray>()
478            .ok_or_else(|| {
479                DataFusionError::Execution(
480                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
481                )
482            })?;
483
484        // Early return for empty input
485        if ts_column.is_empty() {
486            return Ok(input);
487        }
488
489        // field column for staleness check
490        let field_column = self
491            .field_index
492            .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
493
494        // Optimize iteration range based on actual data bounds
495        let first_ts = ts_column.value(0);
496        let last_ts = ts_column.value(ts_column.len() - 1);
497        let last_useful = last_ts + self.lookback_delta;
498
499        let max_start = first_ts.max(self.start);
500        let min_end = last_useful.min(self.end);
501
502        let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
503        let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
504
505        let mut take_indices = vec![];
506
507        let mut cursor = 0;
508
509        let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
510        let mut aligned_ts = vec![];
511
512        // calculate the offsets to take
513        'next: for expected_ts in aligned_ts_iter {
514            // first, search toward end to see if there is matched timestamp
515            while cursor < ts_column.len() {
516                let curr = ts_column.value(cursor);
517                match curr.cmp(&expected_ts) {
518                    Ordering::Equal => {
519                        if let Some(field_column) = &field_column
520                            && field_column.value(cursor).is_nan()
521                        {
522                            // ignore the NaN value
523                        } else {
524                            take_indices.push(cursor as u64);
525                            aligned_ts.push(expected_ts);
526                        }
527                        continue 'next;
528                    }
529                    Ordering::Greater => break,
530                    Ordering::Less => {}
531                }
532                cursor += 1;
533            }
534            if cursor == ts_column.len() {
535                cursor -= 1;
536                // short cut this loop
537                if ts_column.value(cursor) + self.lookback_delta < expected_ts {
538                    break;
539                }
540            }
541
542            // then examine the value
543            let curr_ts = ts_column.value(cursor);
544            if curr_ts + self.lookback_delta < expected_ts {
545                continue;
546            }
547            if curr_ts > expected_ts {
548                // exceeds current expected timestamp, examine the previous value
549                if let Some(prev_cursor) = cursor.checked_sub(1) {
550                    let prev_ts = ts_column.value(prev_cursor);
551                    if prev_ts + self.lookback_delta >= expected_ts {
552                        // only use the point in the time range
553                        if let Some(field_column) = &field_column
554                            && field_column.value(prev_cursor).is_nan()
555                        {
556                            // if the newest value is NaN, it means the value is stale, so we should not use it
557                            continue;
558                        }
559                        // use this point
560                        take_indices.push(prev_cursor as u64);
561                        aligned_ts.push(expected_ts);
562                    }
563                }
564            } else if let Some(field_column) = &field_column
565                && field_column.value(cursor).is_nan()
566            {
567                // if the newest value is NaN, it means the value is stale, so we should not use it
568            } else {
569                // use this point
570                take_indices.push(cursor as u64);
571                aligned_ts.push(expected_ts);
572            }
573        }
574
575        // take record batch and replace the time index column
576        self.take_record_batch_optional(input, take_indices, aligned_ts)
577    }
578
579    /// Helper function to apply "take" on record batch.
580    fn take_record_batch_optional(
581        &self,
582        record_batch: RecordBatch,
583        take_indices: Vec<u64>,
584        aligned_ts: Vec<Millisecond>,
585    ) -> DataFusionResult<RecordBatch> {
586        assert_eq!(take_indices.len(), aligned_ts.len());
587
588        let indices_array = UInt64Array::from(take_indices);
589        let mut arrays = record_batch
590            .columns()
591            .iter()
592            .map(|array| compute::take(array, &indices_array, None))
593            .collect::<ArrowResult<Vec<_>>>()?;
594        arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
595
596        let result = RecordBatch::try_new(record_batch.schema(), arrays)
597            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
598        Ok(result)
599    }
600}
601
602#[cfg(test)]
603mod test {
604    use datafusion::common::ToDFSchema;
605    use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
606    use datafusion::prelude::SessionContext;
607
608    use super::*;
609    use crate::extension_plan::test_util::{
610        TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
611    };
612
613    async fn do_normalize_test(
614        start: Millisecond,
615        end: Millisecond,
616        lookback_delta: Millisecond,
617        interval: Millisecond,
618        expected: String,
619        contains_nan: bool,
620    ) {
621        let memory_exec = if contains_nan {
622            Arc::new(prepare_test_data_with_nan())
623        } else {
624            Arc::new(prepare_test_data())
625        };
626        let normalize_exec = Arc::new(InstantManipulateExec {
627            start,
628            end,
629            lookback_delta,
630            interval,
631            time_index_column: TIME_INDEX_COLUMN.to_string(),
632            field_column: Some("value".to_string()),
633            input: memory_exec,
634            metric: ExecutionPlanMetricsSet::new(),
635        });
636        let session_context = SessionContext::default();
637        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
638            .await
639            .unwrap();
640        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
641            .unwrap()
642            .to_string();
643
644        assert_eq!(result_literal, expected);
645    }
646
647    #[test]
648    fn pruning_should_keep_time_and_field_columns_for_exec() {
649        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
650        let input = LogicalPlan::EmptyRelation(EmptyRelation {
651            produce_one_row: false,
652            schema: df_schema,
653        });
654        let plan = InstantManipulate::new(
655            0,
656            0,
657            0,
658            0,
659            TIME_INDEX_COLUMN.to_string(),
660            Some("value".to_string()),
661            input,
662        );
663
664        // Simulate a parent projection requesting only the `path` column.
665        let output_columns = [2usize];
666        let required = plan.necessary_children_exprs(&output_columns).unwrap();
667        let required = &required[0];
668        assert_eq!(required.as_slice(), &[0, 1, 2]);
669    }
670
671    #[tokio::test]
672    async fn lookback_10s_interval_30s() {
673        let expected = String::from(
674            "+---------------------+-------+------+\
675            \n| timestamp           | value | path |\
676            \n+---------------------+-------+------+\
677            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
678            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
679            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
680            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
681            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
682            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
683            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
684            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
685            \n+---------------------+-------+------+",
686        );
687        do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
688    }
689
690    #[tokio::test]
691    async fn lookback_10s_interval_10s() {
692        let expected = String::from(
693            "+---------------------+-------+------+\
694            \n| timestamp           | value | path |\
695            \n+---------------------+-------+------+\
696            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
697            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
698            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
699            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
700            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
701            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
702            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
703            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
704            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
705            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
706            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
707            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
708            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
709            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
710            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
711            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
712            \n+---------------------+-------+------+",
713        );
714        do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
715    }
716
717    #[tokio::test]
718    async fn lookback_30s_interval_30s() {
719        let expected = String::from(
720            "+---------------------+-------+------+\
721            \n| timestamp           | value | path |\
722            \n+---------------------+-------+------+\
723            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
724            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
725            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
726            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
727            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
728            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
729            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
730            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
731            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
732            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
733            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
734            \n+---------------------+-------+------+",
735        );
736        do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
737    }
738
739    #[tokio::test]
740    async fn lookback_30s_interval_10s() {
741        let expected = String::from(
742            "+---------------------+-------+------+\
743            \n| timestamp           | value | path |\
744            \n+---------------------+-------+------+\
745            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
746            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
747            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
748            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
749            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
750            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
751            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
752            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
753            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
754            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
755            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
756            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
757            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
758            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
759            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
760            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
761            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
762            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
763            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
764            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
765            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
766            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
767            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
768            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
769            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
770            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
771            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
772            \n+---------------------+-------+------+",
773        );
774        do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
775    }
776
777    #[tokio::test]
778    async fn lookback_60s_interval_10s() {
779        let expected = String::from(
780            "+---------------------+-------+------+\
781            \n| timestamp           | value | path |\
782            \n+---------------------+-------+------+\
783            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
784            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
785            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
786            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
787            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
788            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
789            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
790            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
791            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
792            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
793            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
794            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
795            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
796            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
797            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
798            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
799            \n| 1970-01-01T00:02:40 | 1.0   | foo  |\
800            \n| 1970-01-01T00:02:50 | 1.0   | foo  |\
801            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
802            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
803            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
804            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
805            \n| 1970-01-01T00:03:40 | 1.0   | foo  |\
806            \n| 1970-01-01T00:03:50 | 1.0   | foo  |\
807            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
808            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
809            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
810            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
811            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
812            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
813            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
814            \n+---------------------+-------+------+",
815        );
816        do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
817    }
818
819    #[tokio::test]
820    async fn lookback_60s_interval_30s() {
821        let expected = String::from(
822            "+---------------------+-------+------+\
823            \n| timestamp           | value | path |\
824            \n+---------------------+-------+------+\
825            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
826            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
827            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
828            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
829            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
830            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
831            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
832            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
833            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
834            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
835            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
836            \n+---------------------+-------+------+",
837        );
838        do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
839    }
840
841    #[tokio::test]
842    async fn small_range_lookback_0s_interval_1s() {
843        let expected = String::from(
844            "+---------------------+-------+------+\
845            \n| timestamp           | value | path |\
846            \n+---------------------+-------+------+\
847            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
848            \n| 1970-01-01T00:04:01 | 1.0   | foo  |\
849            \n+---------------------+-------+------+",
850        );
851        do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
852    }
853
854    #[tokio::test]
855    async fn small_range_lookback_10s_interval_10s() {
856        let expected = String::from(
857            "+---------------------+-------+------+\
858            \n| timestamp           | value | path |\
859            \n+---------------------+-------+------+\
860            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
861            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
862            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
863            \n+---------------------+-------+------+",
864        );
865        do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
866    }
867
868    #[tokio::test]
869    async fn large_range_lookback_30s_interval_60s() {
870        let expected = String::from(
871            "+---------------------+-------+------+\
872            \n| timestamp           | value | path |\
873            \n+---------------------+-------+------+\
874            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
875            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
876            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
877            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
878            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
879            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
880            \n+---------------------+-------+------+",
881        );
882        do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
883    }
884
885    #[tokio::test]
886    async fn small_range_lookback_30s_interval_30s() {
887        let expected = String::from(
888            "+---------------------+-------+------+\
889            \n| timestamp           | value | path |\
890            \n+---------------------+-------+------+\
891            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
892            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
893            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
894            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
895            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
896            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
897            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
898            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
899            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
900            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
901            \n+---------------------+-------+------+",
902        );
903        do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
904    }
905
906    #[tokio::test]
907    async fn lookback_10s_interval_10s_with_nan() {
908        let expected = String::from(
909            "+---------------------+-------+\
910            \n| timestamp           | value |\
911            \n+---------------------+-------+\
912            \n| 1970-01-01T00:00:00 | 0.0   |\
913            \n| 1970-01-01T00:00:10 | 0.0   |\
914            \n| 1970-01-01T00:01:00 | 6.0   |\
915            \n| 1970-01-01T00:01:10 | 6.0   |\
916            \n| 1970-01-01T00:02:00 | 12.0  |\
917            \n| 1970-01-01T00:02:10 | 12.0  |\
918            \n+---------------------+-------+",
919        );
920        do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
921    }
922
923    #[tokio::test]
924    async fn lookback_10s_interval_10s_with_nan_unaligned() {
925        let expected = String::from(
926            "+-------------------------+-------+\
927            \n| timestamp               | value |\
928            \n+-------------------------+-------+\
929            \n| 1970-01-01T00:00:00.001 | 0.0   |\
930            \n| 1970-01-01T00:01:00.001 | 6.0   |\
931            \n| 1970-01-01T00:02:00.001 | 12.0  |\
932            \n+-------------------------+-------+",
933        );
934        do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
935    }
936
937    #[tokio::test]
938    async fn ultra_large_range() {
939        let expected = String::from(
940            "+-------------------------+-------+\
941            \n| timestamp               | value |\
942            \n+-------------------------+-------+\
943            \n| 1970-01-01T00:00:00.001 | 0.0   |\
944            \n| 1970-01-01T00:01:00.001 | 6.0   |\
945            \n| 1970-01-01T00:02:00.001 | 12.0  |\
946            \n+-------------------------+-------+",
947        );
948        do_normalize_test(
949            -900_000_000_000_000 + 1,
950            900_000_000_000_000,
951            10_000,
952            10_000,
953            expected,
954            true,
955        )
956        .await;
957    }
958}