promql/extension_plan/
instant_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::SchemaRef;
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{ColumnStatistics, DFSchema, DFSchemaRef};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
29use datafusion::physical_plan::metrics::{
30    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
31};
32use datafusion::physical_plan::{
33    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
34    SendableRecordBatchStream, Statistics,
35};
36use datatypes::arrow::compute;
37use datatypes::arrow::error::Result as ArrowResult;
38use futures::{Stream, StreamExt, ready};
39use greptime_proto::substrait_extension as pb;
40use prost::Message;
41use snafu::ResultExt;
42
43use crate::error::{DeserializeSnafu, Result};
44use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond};
45use crate::metrics::PROMQL_SERIES_COUNT;
46
47/// Manipulate the input record batch to make it suitable for Instant Operator.
48///
49/// This plan will try to align the input time series, for every timestamp between
50/// `start` and `end` with step `interval`. Find in the `lookback` range if data
51/// is missing at the given timestamp.
52#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
53pub struct InstantManipulate {
54    start: Millisecond,
55    end: Millisecond,
56    lookback_delta: Millisecond,
57    interval: Millisecond,
58    time_index_column: String,
59    /// A optional column for validating staleness
60    field_column: Option<String>,
61    input: LogicalPlan,
62}
63
64impl UserDefinedLogicalNodeCore for InstantManipulate {
65    fn name(&self) -> &str {
66        Self::name()
67    }
68
69    fn inputs(&self) -> Vec<&LogicalPlan> {
70        vec![&self.input]
71    }
72
73    fn schema(&self) -> &DFSchemaRef {
74        self.input.schema()
75    }
76
77    fn expressions(&self) -> Vec<Expr> {
78        vec![]
79    }
80
81    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
82        write!(
83            f,
84            "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
85            self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
86        )
87    }
88
89    fn with_exprs_and_inputs(
90        &self,
91        _exprs: Vec<Expr>,
92        inputs: Vec<LogicalPlan>,
93    ) -> DataFusionResult<Self> {
94        if inputs.len() != 1 {
95            return Err(DataFusionError::Internal(
96                "InstantManipulate should have exact one input".to_string(),
97            ));
98        }
99
100        Ok(Self {
101            start: self.start,
102            end: self.end,
103            lookback_delta: self.lookback_delta,
104            interval: self.interval,
105            time_index_column: self.time_index_column.clone(),
106            field_column: self.field_column.clone(),
107            input: inputs.into_iter().next().unwrap(),
108        })
109    }
110}
111
112impl InstantManipulate {
113    pub fn new(
114        start: Millisecond,
115        end: Millisecond,
116        lookback_delta: Millisecond,
117        interval: Millisecond,
118        time_index_column: String,
119        field_column: Option<String>,
120        input: LogicalPlan,
121    ) -> Self {
122        Self {
123            start,
124            end,
125            lookback_delta,
126            interval,
127            time_index_column,
128            field_column,
129            input,
130        }
131    }
132
133    pub const fn name() -> &'static str {
134        "InstantManipulate"
135    }
136
137    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
138        Arc::new(InstantManipulateExec {
139            start: self.start,
140            end: self.end,
141            lookback_delta: self.lookback_delta,
142            interval: self.interval,
143            time_index_column: self.time_index_column.clone(),
144            field_column: self.field_column.clone(),
145            input: exec_input,
146            metric: ExecutionPlanMetricsSet::new(),
147        })
148    }
149
150    pub fn serialize(&self) -> Vec<u8> {
151        pb::InstantManipulate {
152            start: self.start,
153            end: self.end,
154            interval: self.interval,
155            lookback_delta: self.lookback_delta,
156            time_index: self.time_index_column.clone(),
157            field_index: self.field_column.clone().unwrap_or_default(),
158        }
159        .encode_to_vec()
160    }
161
162    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
163        let pb_instant_manipulate =
164            pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
165        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
166            produce_one_row: false,
167            schema: Arc::new(DFSchema::empty()),
168        });
169        let field_column = if pb_instant_manipulate.field_index.is_empty() {
170            None
171        } else {
172            Some(pb_instant_manipulate.field_index)
173        };
174        Ok(Self {
175            start: pb_instant_manipulate.start,
176            end: pb_instant_manipulate.end,
177            lookback_delta: pb_instant_manipulate.lookback_delta,
178            interval: pb_instant_manipulate.interval,
179            time_index_column: pb_instant_manipulate.time_index,
180            field_column,
181            input: placeholder_plan,
182        })
183    }
184}
185
186#[derive(Debug)]
187pub struct InstantManipulateExec {
188    start: Millisecond,
189    end: Millisecond,
190    lookback_delta: Millisecond,
191    interval: Millisecond,
192    time_index_column: String,
193    field_column: Option<String>,
194
195    input: Arc<dyn ExecutionPlan>,
196    metric: ExecutionPlanMetricsSet,
197}
198
199impl ExecutionPlan for InstantManipulateExec {
200    fn as_any(&self) -> &dyn Any {
201        self
202    }
203
204    fn schema(&self) -> SchemaRef {
205        self.input.schema()
206    }
207
208    fn properties(&self) -> &PlanProperties {
209        self.input.properties()
210    }
211
212    fn required_input_distribution(&self) -> Vec<Distribution> {
213        self.input.required_input_distribution()
214    }
215
216    // Prevent reordering of input
217    fn maintains_input_order(&self) -> Vec<bool> {
218        vec![false; self.children().len()]
219    }
220
221    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
222        vec![&self.input]
223    }
224
225    fn with_new_children(
226        self: Arc<Self>,
227        children: Vec<Arc<dyn ExecutionPlan>>,
228    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
229        assert!(!children.is_empty());
230        Ok(Arc::new(Self {
231            start: self.start,
232            end: self.end,
233            lookback_delta: self.lookback_delta,
234            interval: self.interval,
235            time_index_column: self.time_index_column.clone(),
236            field_column: self.field_column.clone(),
237            input: children[0].clone(),
238            metric: self.metric.clone(),
239        }))
240    }
241
242    fn execute(
243        &self,
244        partition: usize,
245        context: Arc<TaskContext>,
246    ) -> DataFusionResult<SendableRecordBatchStream> {
247        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
248        let metrics_builder = MetricBuilder::new(&self.metric);
249        let num_series = Count::new();
250        metrics_builder
251            .with_partition(partition)
252            .build(MetricValue::Count {
253                name: METRIC_NUM_SERIES.into(),
254                count: num_series.clone(),
255            });
256
257        let input = self.input.execute(partition, context)?;
258        let schema = input.schema();
259        let time_index = schema
260            .column_with_name(&self.time_index_column)
261            .expect("time index column not found")
262            .0;
263        let field_index = self
264            .field_column
265            .as_ref()
266            .and_then(|name| schema.column_with_name(name))
267            .map(|x| x.0);
268        Ok(Box::pin(InstantManipulateStream {
269            start: self.start,
270            end: self.end,
271            lookback_delta: self.lookback_delta,
272            interval: self.interval,
273            time_index,
274            field_index,
275            schema,
276            input,
277            metric: baseline_metric,
278            num_series,
279        }))
280    }
281
282    fn metrics(&self) -> Option<MetricsSet> {
283        Some(self.metric.clone_inner())
284    }
285
286    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
287        let input_stats = self.input.partition_statistics(partition)?;
288
289        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
290        let estimated_total_bytes = input_stats
291            .total_byte_size
292            .get_value()
293            .zip(input_stats.num_rows.get_value())
294            .map(|(size, rows)| {
295                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
296            })
297            .unwrap_or(Precision::Absent);
298
299        Ok(Statistics {
300            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
301            total_byte_size: estimated_total_bytes,
302            // TODO(ruihang): support this column statistics
303            column_statistics: vec![
304                ColumnStatistics::new_unknown();
305                self.schema().flattened_fields().len()
306            ],
307        })
308    }
309
310    fn name(&self) -> &str {
311        "InstantManipulateExec"
312    }
313}
314
315impl DisplayAs for InstantManipulateExec {
316    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
317        match t {
318            DisplayFormatType::Default
319            | DisplayFormatType::Verbose
320            | DisplayFormatType::TreeRender => {
321                write!(
322                    f,
323                    "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
324                    self.start,
325                    self.end,
326                    self.lookback_delta,
327                    self.interval,
328                    self.time_index_column
329                )
330            }
331        }
332    }
333}
334
335pub struct InstantManipulateStream {
336    start: Millisecond,
337    end: Millisecond,
338    lookback_delta: Millisecond,
339    interval: Millisecond,
340    // Column index of TIME INDEX column's position in schema
341    time_index: usize,
342    field_index: Option<usize>,
343
344    schema: SchemaRef,
345    input: SendableRecordBatchStream,
346    metric: BaselineMetrics,
347    /// Number of series processed.
348    num_series: Count,
349}
350
351impl RecordBatchStream for InstantManipulateStream {
352    fn schema(&self) -> SchemaRef {
353        self.schema.clone()
354    }
355}
356
357impl Stream for InstantManipulateStream {
358    type Item = DataFusionResult<RecordBatch>;
359
360    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
361        let poll = match ready!(self.input.poll_next_unpin(cx)) {
362            Some(Ok(batch)) => {
363                if batch.num_rows() == 0 {
364                    return Poll::Pending;
365                }
366                let timer = std::time::Instant::now();
367                self.num_series.add(1);
368                let result = Ok(batch).and_then(|batch| self.manipulate(batch));
369                self.metric.elapsed_compute().add_elapsed(timer);
370                Poll::Ready(Some(result))
371            }
372            None => {
373                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
374                Poll::Ready(None)
375            }
376            Some(Err(e)) => Poll::Ready(Some(Err(e))),
377        };
378        self.metric.record_poll(poll)
379    }
380}
381
382impl InstantManipulateStream {
383    // refer to Go version: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1571
384    // and the function `vectorSelectorSingle`
385    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
386        let mut take_indices = vec![];
387
388        let ts_column = input
389            .column(self.time_index)
390            .as_any()
391            .downcast_ref::<TimestampMillisecondArray>()
392            .ok_or_else(|| {
393                DataFusionError::Execution(
394                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
395                )
396            })?;
397
398        // Early return for empty input
399        if ts_column.is_empty() {
400            return Ok(input);
401        }
402
403        // field column for staleness check
404        let field_column = self
405            .field_index
406            .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
407
408        // Optimize iteration range based on actual data bounds
409        let first_ts = ts_column.value(0);
410        let last_ts = ts_column.value(ts_column.len() - 1);
411        let last_useful = last_ts + self.lookback_delta;
412
413        let max_start = first_ts.max(self.start);
414        let min_end = last_useful.min(self.end);
415
416        let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
417        let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
418
419        let mut cursor = 0;
420
421        let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
422        let mut aligned_ts = vec![];
423
424        // calculate the offsets to take
425        'next: for expected_ts in aligned_ts_iter {
426            // first, search toward end to see if there is matched timestamp
427            while cursor < ts_column.len() {
428                let curr = ts_column.value(cursor);
429                match curr.cmp(&expected_ts) {
430                    Ordering::Equal => {
431                        if let Some(field_column) = &field_column
432                            && field_column.value(cursor).is_nan()
433                        {
434                            // ignore the NaN value
435                        } else {
436                            take_indices.push(cursor as u64);
437                            aligned_ts.push(expected_ts);
438                        }
439                        continue 'next;
440                    }
441                    Ordering::Greater => break,
442                    Ordering::Less => {}
443                }
444                cursor += 1;
445            }
446            if cursor == ts_column.len() {
447                cursor -= 1;
448                // short cut this loop
449                if ts_column.value(cursor) + self.lookback_delta < expected_ts {
450                    break;
451                }
452            }
453
454            // then examine the value
455            let curr_ts = ts_column.value(cursor);
456            if curr_ts + self.lookback_delta < expected_ts {
457                continue;
458            }
459            if curr_ts > expected_ts {
460                // exceeds current expected timestamp, examine the previous value
461                if let Some(prev_cursor) = cursor.checked_sub(1) {
462                    let prev_ts = ts_column.value(prev_cursor);
463                    if prev_ts + self.lookback_delta >= expected_ts {
464                        // only use the point in the time range
465                        if let Some(field_column) = &field_column
466                            && field_column.value(prev_cursor).is_nan()
467                        {
468                            // if the newest value is NaN, it means the value is stale, so we should not use it
469                            continue;
470                        }
471                        // use this point
472                        take_indices.push(prev_cursor as u64);
473                        aligned_ts.push(expected_ts);
474                    }
475                }
476            } else if let Some(field_column) = &field_column
477                && field_column.value(cursor).is_nan()
478            {
479                // if the newest value is NaN, it means the value is stale, so we should not use it
480            } else {
481                // use this point
482                take_indices.push(cursor as u64);
483                aligned_ts.push(expected_ts);
484            }
485        }
486
487        // take record batch and replace the time index column
488        self.take_record_batch_optional(input, take_indices, aligned_ts)
489    }
490
491    /// Helper function to apply "take" on record batch.
492    fn take_record_batch_optional(
493        &self,
494        record_batch: RecordBatch,
495        take_indices: Vec<u64>,
496        aligned_ts: Vec<Millisecond>,
497    ) -> DataFusionResult<RecordBatch> {
498        assert_eq!(take_indices.len(), aligned_ts.len());
499
500        let indices_array = UInt64Array::from(take_indices);
501        let mut arrays = record_batch
502            .columns()
503            .iter()
504            .map(|array| compute::take(array, &indices_array, None))
505            .collect::<ArrowResult<Vec<_>>>()?;
506        arrays[self.time_index] = Arc::new(TimestampMillisecondArray::from(aligned_ts));
507
508        let result = RecordBatch::try_new(record_batch.schema(), arrays)
509            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
510        Ok(result)
511    }
512}
513
514#[cfg(test)]
515mod test {
516    use datafusion::prelude::SessionContext;
517
518    use super::*;
519    use crate::extension_plan::test_util::{
520        TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
521    };
522
523    async fn do_normalize_test(
524        start: Millisecond,
525        end: Millisecond,
526        lookback_delta: Millisecond,
527        interval: Millisecond,
528        expected: String,
529        contains_nan: bool,
530    ) {
531        let memory_exec = if contains_nan {
532            Arc::new(prepare_test_data_with_nan())
533        } else {
534            Arc::new(prepare_test_data())
535        };
536        let normalize_exec = Arc::new(InstantManipulateExec {
537            start,
538            end,
539            lookback_delta,
540            interval,
541            time_index_column: TIME_INDEX_COLUMN.to_string(),
542            field_column: Some("value".to_string()),
543            input: memory_exec,
544            metric: ExecutionPlanMetricsSet::new(),
545        });
546        let session_context = SessionContext::default();
547        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
548            .await
549            .unwrap();
550        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
551            .unwrap()
552            .to_string();
553
554        assert_eq!(result_literal, expected);
555    }
556
557    #[tokio::test]
558    async fn lookback_10s_interval_30s() {
559        let expected = String::from(
560            "+---------------------+-------+------+\
561            \n| timestamp           | value | path |\
562            \n+---------------------+-------+------+\
563            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
564            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
565            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
566            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
567            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
568            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
569            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
570            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
571            \n+---------------------+-------+------+",
572        );
573        do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
574    }
575
576    #[tokio::test]
577    async fn lookback_10s_interval_10s() {
578        let expected = String::from(
579            "+---------------------+-------+------+\
580            \n| timestamp           | value | path |\
581            \n+---------------------+-------+------+\
582            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
583            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
584            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
585            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
586            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
587            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
588            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
589            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
590            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
591            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
592            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
593            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
594            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
595            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
596            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
597            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
598            \n+---------------------+-------+------+",
599        );
600        do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
601    }
602
603    #[tokio::test]
604    async fn lookback_30s_interval_30s() {
605        let expected = String::from(
606            "+---------------------+-------+------+\
607            \n| timestamp           | value | path |\
608            \n+---------------------+-------+------+\
609            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
610            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
611            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
612            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
613            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
614            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
615            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
616            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
617            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
618            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
619            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
620            \n+---------------------+-------+------+",
621        );
622        do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
623    }
624
625    #[tokio::test]
626    async fn lookback_30s_interval_10s() {
627        let expected = String::from(
628            "+---------------------+-------+------+\
629            \n| timestamp           | value | path |\
630            \n+---------------------+-------+------+\
631            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
632            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
633            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
634            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
635            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
636            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
637            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
638            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
639            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
640            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
641            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
642            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
643            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
644            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
645            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
646            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
647            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
648            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
649            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
650            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
651            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
652            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
653            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
654            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
655            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
656            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
657            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
658            \n+---------------------+-------+------+",
659        );
660        do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
661    }
662
663    #[tokio::test]
664    async fn lookback_60s_interval_10s() {
665        let expected = String::from(
666            "+---------------------+-------+------+\
667            \n| timestamp           | value | path |\
668            \n+---------------------+-------+------+\
669            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
670            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
671            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
672            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
673            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
674            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
675            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
676            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
677            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
678            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
679            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
680            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
681            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
682            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
683            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
684            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
685            \n| 1970-01-01T00:02:40 | 1.0   | foo  |\
686            \n| 1970-01-01T00:02:50 | 1.0   | foo  |\
687            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
688            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
689            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
690            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
691            \n| 1970-01-01T00:03:40 | 1.0   | foo  |\
692            \n| 1970-01-01T00:03:50 | 1.0   | foo  |\
693            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
694            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
695            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
696            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
697            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
698            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
699            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
700            \n+---------------------+-------+------+",
701        );
702        do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
703    }
704
705    #[tokio::test]
706    async fn lookback_60s_interval_30s() {
707        let expected = String::from(
708            "+---------------------+-------+------+\
709            \n| timestamp           | value | path |\
710            \n+---------------------+-------+------+\
711            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
712            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
713            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
714            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
715            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
716            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
717            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
718            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
719            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
720            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
721            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
722            \n+---------------------+-------+------+",
723        );
724        do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
725    }
726
727    #[tokio::test]
728    async fn small_range_lookback_0s_interval_1s() {
729        let expected = String::from(
730            "+---------------------+-------+------+\
731            \n| timestamp           | value | path |\
732            \n+---------------------+-------+------+\
733            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
734            \n| 1970-01-01T00:04:01 | 1.0   | foo  |\
735            \n+---------------------+-------+------+",
736        );
737        do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
738    }
739
740    #[tokio::test]
741    async fn small_range_lookback_10s_interval_10s() {
742        let expected = String::from(
743            "+---------------------+-------+------+\
744            \n| timestamp           | value | path |\
745            \n+---------------------+-------+------+\
746            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
747            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
748            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
749            \n+---------------------+-------+------+",
750        );
751        do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
752    }
753
754    #[tokio::test]
755    async fn large_range_lookback_30s_interval_60s() {
756        let expected = String::from(
757            "+---------------------+-------+------+\
758            \n| timestamp           | value | path |\
759            \n+---------------------+-------+------+\
760            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
761            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
762            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
763            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
764            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
765            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
766            \n+---------------------+-------+------+",
767        );
768        do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
769    }
770
771    #[tokio::test]
772    async fn small_range_lookback_30s_interval_30s() {
773        let expected = String::from(
774            "+---------------------+-------+------+\
775            \n| timestamp           | value | path |\
776            \n+---------------------+-------+------+\
777            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
778            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
779            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
780            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
781            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
782            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
783            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
784            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
785            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
786            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
787            \n+---------------------+-------+------+",
788        );
789        do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
790    }
791
792    #[tokio::test]
793    async fn lookback_10s_interval_10s_with_nan() {
794        let expected = String::from(
795            "+---------------------+-------+\
796            \n| timestamp           | value |\
797            \n+---------------------+-------+\
798            \n| 1970-01-01T00:00:00 | 0.0   |\
799            \n| 1970-01-01T00:00:10 | 0.0   |\
800            \n| 1970-01-01T00:01:00 | 6.0   |\
801            \n| 1970-01-01T00:01:10 | 6.0   |\
802            \n| 1970-01-01T00:02:00 | 12.0  |\
803            \n| 1970-01-01T00:02:10 | 12.0  |\
804            \n+---------------------+-------+",
805        );
806        do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
807    }
808
809    #[tokio::test]
810    async fn lookback_10s_interval_10s_with_nan_unaligned() {
811        let expected = String::from(
812            "+-------------------------+-------+\
813            \n| timestamp               | value |\
814            \n+-------------------------+-------+\
815            \n| 1970-01-01T00:00:00.001 | 0.0   |\
816            \n| 1970-01-01T00:01:00.001 | 6.0   |\
817            \n| 1970-01-01T00:02:00.001 | 12.0  |\
818            \n+-------------------------+-------+",
819        );
820        do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
821    }
822
823    #[tokio::test]
824    async fn ultra_large_range() {
825        let expected = String::from(
826            "+-------------------------+-------+\
827            \n| timestamp               | value |\
828            \n+-------------------------+-------+\
829            \n| 1970-01-01T00:00:00.001 | 0.0   |\
830            \n| 1970-01-01T00:01:00.001 | 6.0   |\
831            \n| 1970-01-01T00:02:00.001 | 12.0  |\
832            \n+-------------------------+-------+",
833        );
834        do_normalize_test(
835            -900_000_000_000_000 + 1,
836            900_000_000_000_000,
837            10_000,
838            10_000,
839            expected,
840            true,
841        )
842        .await;
843    }
844}