promql/extension_plan/
instant_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{
45    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
46};
47use crate::metrics::PROMQL_SERIES_COUNT;
48
49/// Manipulate the input record batch to make it suitable for Instant Operator.
50///
51/// This plan will try to align the input time series, for every timestamp between
52/// `start` and `end` with step `interval`. Find in the `lookback` range if data
53/// is missing at the given timestamp.
54#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
55pub struct InstantManipulate {
56    start: Millisecond,
57    end: Millisecond,
58    lookback_delta: Millisecond,
59    interval: Millisecond,
60    time_index_column: String,
61    /// A optional column for validating staleness
62    field_column: Option<String>,
63    input: LogicalPlan,
64    unfix: Option<UnfixIndices>,
65}
66
67#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
68struct UnfixIndices {
69    pub time_index_idx: u64,
70    pub field_index_idx: u64,
71}
72
73impl UserDefinedLogicalNodeCore for InstantManipulate {
74    fn name(&self) -> &str {
75        Self::name()
76    }
77
78    fn inputs(&self) -> Vec<&LogicalPlan> {
79        vec![&self.input]
80    }
81
82    fn schema(&self) -> &DFSchemaRef {
83        self.input.schema()
84    }
85
86    fn expressions(&self) -> Vec<Expr> {
87        vec![]
88    }
89
90    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91        write!(
92            f,
93            "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
94            self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
95        )
96    }
97
98    fn with_exprs_and_inputs(
99        &self,
100        _exprs: Vec<Expr>,
101        inputs: Vec<LogicalPlan>,
102    ) -> DataFusionResult<Self> {
103        if inputs.len() != 1 {
104            return Err(DataFusionError::Internal(
105                "InstantManipulate should have exact one input".to_string(),
106            ));
107        }
108
109        let input: LogicalPlan = inputs.into_iter().next().unwrap();
110        let input_schema = input.schema();
111
112        if let Some(unfix) = &self.unfix {
113            // transform indices to names
114            let time_index_column = resolve_column_name(
115                unfix.time_index_idx,
116                input_schema,
117                "InstantManipulate",
118                "time index",
119            )?;
120
121            let field_column = if unfix.field_index_idx == u64::MAX {
122                None
123            } else {
124                Some(resolve_column_name(
125                    unfix.field_index_idx,
126                    input_schema,
127                    "InstantManipulate",
128                    "field",
129                )?)
130            };
131
132            Ok(Self {
133                start: self.start,
134                end: self.end,
135                lookback_delta: self.lookback_delta,
136                interval: self.interval,
137                time_index_column,
138                field_column,
139                input,
140                unfix: None,
141            })
142        } else {
143            Ok(Self {
144                start: self.start,
145                end: self.end,
146                lookback_delta: self.lookback_delta,
147                interval: self.interval,
148                time_index_column: self.time_index_column.clone(),
149                field_column: self.field_column.clone(),
150                input,
151                unfix: None,
152            })
153        }
154    }
155}
156
157impl InstantManipulate {
158    pub fn new(
159        start: Millisecond,
160        end: Millisecond,
161        lookback_delta: Millisecond,
162        interval: Millisecond,
163        time_index_column: String,
164        field_column: Option<String>,
165        input: LogicalPlan,
166    ) -> Self {
167        Self {
168            start,
169            end,
170            lookback_delta,
171            interval,
172            time_index_column,
173            field_column,
174            input,
175            unfix: None,
176        }
177    }
178
179    pub const fn name() -> &'static str {
180        "InstantManipulate"
181    }
182
183    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
184        Arc::new(InstantManipulateExec {
185            start: self.start,
186            end: self.end,
187            lookback_delta: self.lookback_delta,
188            interval: self.interval,
189            time_index_column: self.time_index_column.clone(),
190            field_column: self.field_column.clone(),
191            input: exec_input,
192            metric: ExecutionPlanMetricsSet::new(),
193        })
194    }
195
196    pub fn serialize(&self) -> Vec<u8> {
197        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
198
199        let field_index_idx = self
200            .field_column
201            .as_ref()
202            .map(|name| serialize_column_index(self.input.schema(), name))
203            .unwrap_or(u64::MAX);
204
205        pb::InstantManipulate {
206            start: self.start,
207            end: self.end,
208            interval: self.interval,
209            lookback_delta: self.lookback_delta,
210            time_index_idx,
211            field_index_idx,
212            ..Default::default()
213        }
214        .encode_to_vec()
215    }
216
217    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
218        let pb_instant_manipulate =
219            pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
220        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
221            produce_one_row: false,
222            schema: Arc::new(DFSchema::empty()),
223        });
224
225        let unfix = UnfixIndices {
226            time_index_idx: pb_instant_manipulate.time_index_idx,
227            field_index_idx: pb_instant_manipulate.field_index_idx,
228        };
229
230        Ok(Self {
231            start: pb_instant_manipulate.start,
232            end: pb_instant_manipulate.end,
233            lookback_delta: pb_instant_manipulate.lookback_delta,
234            interval: pb_instant_manipulate.interval,
235            time_index_column: String::new(),
236            field_column: None,
237            input: placeholder_plan,
238            unfix: Some(unfix),
239        })
240    }
241}
242
243#[derive(Debug)]
244pub struct InstantManipulateExec {
245    start: Millisecond,
246    end: Millisecond,
247    lookback_delta: Millisecond,
248    interval: Millisecond,
249    time_index_column: String,
250    field_column: Option<String>,
251
252    input: Arc<dyn ExecutionPlan>,
253    metric: ExecutionPlanMetricsSet,
254}
255
256impl ExecutionPlan for InstantManipulateExec {
257    fn as_any(&self) -> &dyn Any {
258        self
259    }
260
261    fn schema(&self) -> SchemaRef {
262        self.input.schema()
263    }
264
265    fn properties(&self) -> &PlanProperties {
266        self.input.properties()
267    }
268
269    fn required_input_distribution(&self) -> Vec<Distribution> {
270        self.input.required_input_distribution()
271    }
272
273    // Prevent reordering of input
274    fn maintains_input_order(&self) -> Vec<bool> {
275        vec![false; self.children().len()]
276    }
277
278    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
279        vec![&self.input]
280    }
281
282    fn with_new_children(
283        self: Arc<Self>,
284        children: Vec<Arc<dyn ExecutionPlan>>,
285    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
286        assert!(!children.is_empty());
287        Ok(Arc::new(Self {
288            start: self.start,
289            end: self.end,
290            lookback_delta: self.lookback_delta,
291            interval: self.interval,
292            time_index_column: self.time_index_column.clone(),
293            field_column: self.field_column.clone(),
294            input: children[0].clone(),
295            metric: self.metric.clone(),
296        }))
297    }
298
299    fn execute(
300        &self,
301        partition: usize,
302        context: Arc<TaskContext>,
303    ) -> DataFusionResult<SendableRecordBatchStream> {
304        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
305        let metrics_builder = MetricBuilder::new(&self.metric);
306        let num_series = Count::new();
307        metrics_builder
308            .with_partition(partition)
309            .build(MetricValue::Count {
310                name: METRIC_NUM_SERIES.into(),
311                count: num_series.clone(),
312            });
313
314        let input = self.input.execute(partition, context)?;
315        let schema = input.schema();
316        let time_index = schema
317            .column_with_name(&self.time_index_column)
318            .expect("time index column not found")
319            .0;
320        let field_index = self
321            .field_column
322            .as_ref()
323            .and_then(|name| schema.column_with_name(name))
324            .map(|x| x.0);
325        Ok(Box::pin(InstantManipulateStream {
326            start: self.start,
327            end: self.end,
328            lookback_delta: self.lookback_delta,
329            interval: self.interval,
330            time_index,
331            field_index,
332            schema,
333            input,
334            metric: baseline_metric,
335            num_series,
336        }))
337    }
338
339    fn metrics(&self) -> Option<MetricsSet> {
340        Some(self.metric.clone_inner())
341    }
342
343    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
344        let input_stats = self.input.partition_statistics(partition)?;
345
346        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
347        let estimated_total_bytes = input_stats
348            .total_byte_size
349            .get_value()
350            .zip(input_stats.num_rows.get_value())
351            .map(|(size, rows)| {
352                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
353            })
354            .unwrap_or(Precision::Absent);
355
356        Ok(Statistics {
357            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
358            total_byte_size: estimated_total_bytes,
359            // TODO(ruihang): support this column statistics
360            column_statistics: vec![
361                ColumnStatistics::new_unknown();
362                self.schema().flattened_fields().len()
363            ],
364        })
365    }
366
367    fn name(&self) -> &str {
368        "InstantManipulateExec"
369    }
370}
371
372impl DisplayAs for InstantManipulateExec {
373    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
374        match t {
375            DisplayFormatType::Default
376            | DisplayFormatType::Verbose
377            | DisplayFormatType::TreeRender => {
378                write!(
379                    f,
380                    "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
381                    self.start,
382                    self.end,
383                    self.lookback_delta,
384                    self.interval,
385                    self.time_index_column
386                )
387            }
388        }
389    }
390}
391
392pub struct InstantManipulateStream {
393    start: Millisecond,
394    end: Millisecond,
395    lookback_delta: Millisecond,
396    interval: Millisecond,
397    // Column index of TIME INDEX column's position in schema
398    time_index: usize,
399    field_index: Option<usize>,
400
401    schema: SchemaRef,
402    input: SendableRecordBatchStream,
403    metric: BaselineMetrics,
404    /// Number of series processed.
405    num_series: Count,
406}
407
408impl RecordBatchStream for InstantManipulateStream {
409    fn schema(&self) -> SchemaRef {
410        self.schema.clone()
411    }
412}
413
414impl Stream for InstantManipulateStream {
415    type Item = DataFusionResult<RecordBatch>;
416
417    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
418        let poll = match ready!(self.input.poll_next_unpin(cx)) {
419            Some(Ok(batch)) => {
420                if batch.num_rows() == 0 {
421                    return Poll::Pending;
422                }
423                let timer = std::time::Instant::now();
424                self.num_series.add(1);
425                let result = Ok(batch).and_then(|batch| self.manipulate(batch));
426                self.metric.elapsed_compute().add_elapsed(timer);
427                Poll::Ready(Some(result))
428            }
429            None => {
430                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
431                Poll::Ready(None)
432            }
433            Some(Err(e)) => Poll::Ready(Some(Err(e))),
434        };
435        self.metric.record_poll(poll)
436    }
437}
438
439impl InstantManipulateStream {
440    // refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
441    // and the function `vectorSelectorSingle`
442    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
443        let mut take_indices = vec![];
444
445        let ts_column = input
446            .column(self.time_index)
447            .as_any()
448            .downcast_ref::<TimestampMillisecondArray>()
449            .ok_or_else(|| {
450                DataFusionError::Execution(
451                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
452                )
453            })?;
454
455        // Early return for empty input
456        if ts_column.is_empty() {
457            return Ok(input);
458        }
459
460        // field column for staleness check
461        let field_column = self
462            .field_index
463            .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
464
465        // Optimize iteration range based on actual data bounds
466        let first_ts = ts_column.value(0);
467        let last_ts = ts_column.value(ts_column.len() - 1);
468        let last_useful = last_ts + self.lookback_delta;
469
470        let max_start = first_ts.max(self.start);
471        let min_end = last_useful.min(self.end);
472
473        let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
474        let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
475
476        let mut cursor = 0;
477
478        let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
479        let mut aligned_ts = vec![];
480
481        // calculate the offsets to take
482        'next: for expected_ts in aligned_ts_iter {
483            // first, search toward end to see if there is matched timestamp
484            while cursor < ts_column.len() {
485                let curr = ts_column.value(cursor);
486                match curr.cmp(&expected_ts) {
487                    Ordering::Equal => {
488                        if let Some(field_column) = &field_column
489                            && field_column.value(cursor).is_nan()
490                        {
491                            // ignore the NaN value
492                        } else {
493                            take_indices.push(cursor as u64);
494                            aligned_ts.push(expected_ts);
495                        }
496                        continue 'next;
497                    }
498                    Ordering::Greater => break,
499                    Ordering::Less => {}
500                }
501                cursor += 1;
502            }
503            if cursor == ts_column.len() {
504                cursor -= 1;
505                // short cut this loop
506                if ts_column.value(cursor) + self.lookback_delta < expected_ts {
507                    break;
508                }
509            }
510
511            // then examine the value
512            let curr_ts = ts_column.value(cursor);
513            if curr_ts + self.lookback_delta < expected_ts {
514                continue;
515            }
516            if curr_ts > expected_ts {
517                // exceeds current expected timestamp, examine the previous value
518                if let Some(prev_cursor) = cursor.checked_sub(1) {
519                    let prev_ts = ts_column.value(prev_cursor);
520                    if prev_ts + self.lookback_delta >= expected_ts {
521                        // only use the point in the time range
522                        if let Some(field_column) = &field_column
523                            && field_column.value(prev_cursor).is_nan()
524                        {
525                            // if the newest value is NaN, it means the value is stale, so we should not use it
526                            continue;
527                        }
528                        // use this point
529                        take_indices.push(prev_cursor as u64);
530                        aligned_ts.push(expected_ts);
531                    }
532                }
533            } else if let Some(field_column) = &field_column
534                && field_column.value(cursor).is_nan()
535            {
536                // if the newest value is NaN, it means the value is stale, so we should not use it
537            } else {
538                // use this point
539                take_indices.push(cursor as u64);
540                aligned_ts.push(expected_ts);
541            }
542        }
543
544        // take record batch and replace the time index column
545        self.take_record_batch_optional(input, take_indices, aligned_ts)
546    }
547
548    /// Helper function to apply "take" on record batch.
549    fn take_record_batch_optional(
550        &self,
551        record_batch: RecordBatch,
552        take_indices: Vec<u64>,
553        aligned_ts: Vec<Millisecond>,
554    ) -> DataFusionResult<RecordBatch> {
555        assert_eq!(take_indices.len(), aligned_ts.len());
556
557        let indices_array = UInt64Array::from(take_indices);
558        let mut arrays = record_batch
559            .columns()
560            .iter()
561            .map(|array| compute::take(array, &indices_array, None))
562            .collect::<ArrowResult<Vec<_>>>()?;
563        arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
564
565        let result = RecordBatch::try_new(record_batch.schema(), arrays)
566            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
567        Ok(result)
568    }
569}
570
571#[cfg(test)]
572mod test {
573    use datafusion::prelude::SessionContext;
574
575    use super::*;
576    use crate::extension_plan::test_util::{
577        TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
578    };
579
580    async fn do_normalize_test(
581        start: Millisecond,
582        end: Millisecond,
583        lookback_delta: Millisecond,
584        interval: Millisecond,
585        expected: String,
586        contains_nan: bool,
587    ) {
588        let memory_exec = if contains_nan {
589            Arc::new(prepare_test_data_with_nan())
590        } else {
591            Arc::new(prepare_test_data())
592        };
593        let normalize_exec = Arc::new(InstantManipulateExec {
594            start,
595            end,
596            lookback_delta,
597            interval,
598            time_index_column: TIME_INDEX_COLUMN.to_string(),
599            field_column: Some("value".to_string()),
600            input: memory_exec,
601            metric: ExecutionPlanMetricsSet::new(),
602        });
603        let session_context = SessionContext::default();
604        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
605            .await
606            .unwrap();
607        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
608            .unwrap()
609            .to_string();
610
611        assert_eq!(result_literal, expected);
612    }
613
614    #[tokio::test]
615    async fn lookback_10s_interval_30s() {
616        let expected = String::from(
617            "+---------------------+-------+------+\
618            \n| timestamp           | value | path |\
619            \n+---------------------+-------+------+\
620            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
621            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
622            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
623            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
624            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
625            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
626            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
627            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
628            \n+---------------------+-------+------+",
629        );
630        do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
631    }
632
633    #[tokio::test]
634    async fn lookback_10s_interval_10s() {
635        let expected = String::from(
636            "+---------------------+-------+------+\
637            \n| timestamp           | value | path |\
638            \n+---------------------+-------+------+\
639            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
640            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
641            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
642            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
643            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
644            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
645            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
646            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
647            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
648            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
649            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
650            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
651            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
652            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
653            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
654            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
655            \n+---------------------+-------+------+",
656        );
657        do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
658    }
659
660    #[tokio::test]
661    async fn lookback_30s_interval_30s() {
662        let expected = String::from(
663            "+---------------------+-------+------+\
664            \n| timestamp           | value | path |\
665            \n+---------------------+-------+------+\
666            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
667            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
668            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
669            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
670            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
671            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
672            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
673            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
674            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
675            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
676            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
677            \n+---------------------+-------+------+",
678        );
679        do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
680    }
681
682    #[tokio::test]
683    async fn lookback_30s_interval_10s() {
684        let expected = String::from(
685            "+---------------------+-------+------+\
686            \n| timestamp           | value | path |\
687            \n+---------------------+-------+------+\
688            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
689            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
690            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
691            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
692            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
693            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
694            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
695            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
696            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
697            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
698            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
699            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
700            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
701            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
702            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
703            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
704            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
705            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
706            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
707            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
708            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
709            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
710            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
711            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
712            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
713            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
714            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
715            \n+---------------------+-------+------+",
716        );
717        do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
718    }
719
720    #[tokio::test]
721    async fn lookback_60s_interval_10s() {
722        let expected = String::from(
723            "+---------------------+-------+------+\
724            \n| timestamp           | value | path |\
725            \n+---------------------+-------+------+\
726            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
727            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
728            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
729            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
730            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
731            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
732            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
733            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
734            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
735            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
736            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
737            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
738            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
739            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
740            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
741            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
742            \n| 1970-01-01T00:02:40 | 1.0   | foo  |\
743            \n| 1970-01-01T00:02:50 | 1.0   | foo  |\
744            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
745            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
746            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
747            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
748            \n| 1970-01-01T00:03:40 | 1.0   | foo  |\
749            \n| 1970-01-01T00:03:50 | 1.0   | foo  |\
750            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
751            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
752            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
753            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
754            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
755            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
756            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
757            \n+---------------------+-------+------+",
758        );
759        do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
760    }
761
762    #[tokio::test]
763    async fn lookback_60s_interval_30s() {
764        let expected = String::from(
765            "+---------------------+-------+------+\
766            \n| timestamp           | value | path |\
767            \n+---------------------+-------+------+\
768            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
769            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
770            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
771            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
772            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
773            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
774            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
775            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
776            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
777            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
778            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
779            \n+---------------------+-------+------+",
780        );
781        do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
782    }
783
784    #[tokio::test]
785    async fn small_range_lookback_0s_interval_1s() {
786        let expected = String::from(
787            "+---------------------+-------+------+\
788            \n| timestamp           | value | path |\
789            \n+---------------------+-------+------+\
790            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
791            \n| 1970-01-01T00:04:01 | 1.0   | foo  |\
792            \n+---------------------+-------+------+",
793        );
794        do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
795    }
796
797    #[tokio::test]
798    async fn small_range_lookback_10s_interval_10s() {
799        let expected = String::from(
800            "+---------------------+-------+------+\
801            \n| timestamp           | value | path |\
802            \n+---------------------+-------+------+\
803            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
804            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
805            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
806            \n+---------------------+-------+------+",
807        );
808        do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
809    }
810
811    #[tokio::test]
812    async fn large_range_lookback_30s_interval_60s() {
813        let expected = String::from(
814            "+---------------------+-------+------+\
815            \n| timestamp           | value | path |\
816            \n+---------------------+-------+------+\
817            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
818            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
819            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
820            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
821            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
822            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
823            \n+---------------------+-------+------+",
824        );
825        do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
826    }
827
828    #[tokio::test]
829    async fn small_range_lookback_30s_interval_30s() {
830        let expected = String::from(
831            "+---------------------+-------+------+\
832            \n| timestamp           | value | path |\
833            \n+---------------------+-------+------+\
834            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
835            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
836            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
837            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
838            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
839            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
840            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
841            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
842            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
843            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
844            \n+---------------------+-------+------+",
845        );
846        do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
847    }
848
849    #[tokio::test]
850    async fn lookback_10s_interval_10s_with_nan() {
851        let expected = String::from(
852            "+---------------------+-------+\
853            \n| timestamp           | value |\
854            \n+---------------------+-------+\
855            \n| 1970-01-01T00:00:00 | 0.0   |\
856            \n| 1970-01-01T00:00:10 | 0.0   |\
857            \n| 1970-01-01T00:01:00 | 6.0   |\
858            \n| 1970-01-01T00:01:10 | 6.0   |\
859            \n| 1970-01-01T00:02:00 | 12.0  |\
860            \n| 1970-01-01T00:02:10 | 12.0  |\
861            \n+---------------------+-------+",
862        );
863        do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
864    }
865
866    #[tokio::test]
867    async fn lookback_10s_interval_10s_with_nan_unaligned() {
868        let expected = String::from(
869            "+-------------------------+-------+\
870            \n| timestamp               | value |\
871            \n+-------------------------+-------+\
872            \n| 1970-01-01T00:00:00.001 | 0.0   |\
873            \n| 1970-01-01T00:01:00.001 | 6.0   |\
874            \n| 1970-01-01T00:02:00.001 | 12.0  |\
875            \n+-------------------------+-------+",
876        );
877        do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
878    }
879
880    #[tokio::test]
881    async fn ultra_large_range() {
882        let expected = String::from(
883            "+-------------------------+-------+\
884            \n| timestamp               | value |\
885            \n+-------------------------+-------+\
886            \n| 1970-01-01T00:00:00.001 | 0.0   |\
887            \n| 1970-01-01T00:01:00.001 | 6.0   |\
888            \n| 1970-01-01T00:02:00.001 | 12.0  |\
889            \n+-------------------------+-------+",
890        );
891        do_normalize_test(
892            -900_000_000_000_000 + 1,
893            900_000_000_000_000,
894            10_000,
895            10_000,
896            expected,
897            true,
898        )
899        .await;
900    }
901}