Skip to main content

promql/extension_plan/
instant_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::cmp::Ordering;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, Float64Array, TimestampMillisecondArray, UInt64Array};
22use datafusion::arrow::datatypes::{DataType, SchemaRef};
23use datafusion::arrow::record_batch::RecordBatch;
24use datafusion::common::stats::Precision;
25use datafusion::common::{DFSchema, DFSchemaRef, ScalarValue};
26use datafusion::error::{DataFusionError, Result as DataFusionResult};
27use datafusion::execution::context::TaskContext;
28use datafusion::logical_expr::{
29    EmptyRelation, Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore,
30};
31use datafusion::physical_plan::metrics::{
32    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
33};
34use datafusion::physical_plan::{
35    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
36    SendableRecordBatchStream, Statistics,
37};
38use datafusion_expr::col;
39use datatypes::arrow::compute;
40use futures::{Stream, StreamExt, ready};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::series_divide::SeriesDivide;
47use crate::extension_plan::{
48    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
49};
50use crate::metrics::PROMQL_SERIES_COUNT;
51
52const MAX_INSTANT_MANIPULATE_OUTPUT_POINTS: usize = 1_000_000;
53
54/// Manipulate the input record batch to make it suitable for Instant Operator.
55///
56/// This plan will try to align the input time series, for every timestamp between
57/// `start` and `end` with step `interval`. Find in the `lookback` range if data
58/// is missing at the given timestamp.
59#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
60pub struct InstantManipulate {
61    start: Millisecond,
62    end: Millisecond,
63    lookback_delta: Millisecond,
64    interval: Millisecond,
65    time_index_column: String,
66    // Planner-provided tag-column hint for execution fast paths.
67    tag_columns: Vec<String>,
68    /// A optional column for validating staleness
69    field_column: Option<String>,
70    input: LogicalPlan,
71    unfix: Option<UnfixIndices>,
72}
73
74#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
75struct UnfixIndices {
76    pub time_index_idx: u64,
77    pub field_index_idx: u64,
78}
79
80impl UserDefinedLogicalNodeCore for InstantManipulate {
81    fn name(&self) -> &str {
82        Self::name()
83    }
84
85    fn inputs(&self) -> Vec<&LogicalPlan> {
86        vec![&self.input]
87    }
88
89    fn schema(&self) -> &DFSchemaRef {
90        self.input.schema()
91    }
92
93    fn expressions(&self) -> Vec<Expr> {
94        if self.unfix.is_some() {
95            return vec![];
96        }
97
98        let mut exprs = vec![col(&self.time_index_column)];
99        if let Some(field) = &self.field_column {
100            exprs.push(col(field));
101        }
102        exprs
103    }
104
105    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
106        if self.unfix.is_some() {
107            return None;
108        }
109
110        let input_schema = self.input.schema();
111        if output_columns.is_empty() {
112            let indices = (0..input_schema.fields().len()).collect::<Vec<_>>();
113            return Some(vec![indices]);
114        }
115
116        let mut required = output_columns.to_vec();
117        required.push(input_schema.index_of_column_by_name(None, &self.time_index_column)?);
118        if let Some(field) = &self.field_column {
119            required.push(input_schema.index_of_column_by_name(None, field)?);
120        }
121
122        required.sort_unstable();
123        required.dedup();
124        Some(vec![required])
125    }
126
127    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
128        write!(
129            f,
130            "PromInstantManipulate: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
131            self.start, self.end, self.lookback_delta, self.interval, self.time_index_column
132        )
133    }
134
135    fn with_exprs_and_inputs(
136        &self,
137        _exprs: Vec<Expr>,
138        inputs: Vec<LogicalPlan>,
139    ) -> DataFusionResult<Self> {
140        if inputs.len() != 1 {
141            return Err(DataFusionError::Internal(
142                "InstantManipulate should have exact one input".to_string(),
143            ));
144        }
145
146        let input: LogicalPlan = inputs.into_iter().next().unwrap();
147        let input_schema = input.schema();
148
149        if let Some(unfix) = &self.unfix {
150            // transform indices to names
151            let time_index_column = resolve_column_name(
152                unfix.time_index_idx,
153                input_schema,
154                "InstantManipulate",
155                "time index",
156            )?;
157
158            let field_column = if unfix.field_index_idx == u64::MAX {
159                None
160            } else {
161                Some(resolve_column_name(
162                    unfix.field_index_idx,
163                    input_schema,
164                    "InstantManipulate",
165                    "field",
166                )?)
167            };
168
169            Ok(Self {
170                start: self.start,
171                end: self.end,
172                lookback_delta: self.lookback_delta,
173                interval: self.interval,
174                time_index_column,
175                tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
176                field_column,
177                input,
178                unfix: None,
179            })
180        } else {
181            Ok(Self {
182                start: self.start,
183                end: self.end,
184                lookback_delta: self.lookback_delta,
185                interval: self.interval,
186                time_index_column: self.time_index_column.clone(),
187                tag_columns: Self::resolve_tag_columns(&input, &self.tag_columns),
188                field_column: self.field_column.clone(),
189                input,
190                unfix: None,
191            })
192        }
193    }
194}
195
196impl InstantManipulate {
197    #[allow(clippy::too_many_arguments)]
198    pub fn new(
199        start: Millisecond,
200        end: Millisecond,
201        lookback_delta: Millisecond,
202        interval: Millisecond,
203        time_index_column: String,
204        tag_columns: Vec<String>,
205        field_column: Option<String>,
206        input: LogicalPlan,
207    ) -> Self {
208        Self {
209            start,
210            end,
211            lookback_delta,
212            interval,
213            time_index_column,
214            tag_columns,
215            field_column,
216            input,
217            unfix: None,
218        }
219    }
220
221    pub const fn name() -> &'static str {
222        "InstantManipulate"
223    }
224
225    fn resolve_tag_columns(input: &LogicalPlan, tag_columns: &[String]) -> Vec<String> {
226        if !tag_columns.is_empty() {
227            return tag_columns.to_vec();
228        }
229
230        Self::find_series_divide_tags(input).unwrap_or_default()
231    }
232
233    fn find_series_divide_tags(plan: &LogicalPlan) -> Option<Vec<String>> {
234        if let LogicalPlan::Extension(Extension { node }) = plan
235            && let Some(series_divide) = node.as_any().downcast_ref::<SeriesDivide>()
236        {
237            return Some(series_divide.tags().to_vec());
238        }
239
240        plan.inputs()
241            .into_iter()
242            .find_map(Self::find_series_divide_tags)
243    }
244
245    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
246        let reuse_tsid_column = matches!(self.tag_columns.as_slice(), [tag] if tag == "__tsid");
247
248        Arc::new(InstantManipulateExec {
249            start: self.start,
250            end: self.end,
251            lookback_delta: self.lookback_delta,
252            interval: self.interval,
253            time_index_column: self.time_index_column.clone(),
254            field_column: self.field_column.clone(),
255            reuse_tsid_column,
256            input: exec_input,
257            metric: ExecutionPlanMetricsSet::new(),
258        })
259    }
260
261    pub fn serialize(&self) -> Vec<u8> {
262        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index_column);
263
264        let field_index_idx = self
265            .field_column
266            .as_ref()
267            .map(|name| serialize_column_index(self.input.schema(), name))
268            .unwrap_or(u64::MAX);
269
270        pb::InstantManipulate {
271            start: self.start,
272            end: self.end,
273            interval: self.interval,
274            lookback_delta: self.lookback_delta,
275            time_index_idx,
276            field_index_idx,
277            ..Default::default()
278        }
279        .encode_to_vec()
280    }
281
282    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
283        let pb_instant_manipulate =
284            pb::InstantManipulate::decode(bytes).context(DeserializeSnafu)?;
285        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
286            produce_one_row: false,
287            schema: Arc::new(DFSchema::empty()),
288        });
289
290        let unfix = UnfixIndices {
291            time_index_idx: pb_instant_manipulate.time_index_idx,
292            field_index_idx: pb_instant_manipulate.field_index_idx,
293        };
294
295        Ok(Self {
296            start: pb_instant_manipulate.start,
297            end: pb_instant_manipulate.end,
298            lookback_delta: pb_instant_manipulate.lookback_delta,
299            interval: pb_instant_manipulate.interval,
300            time_index_column: String::new(),
301            tag_columns: Vec::new(),
302            field_column: None,
303            input: placeholder_plan,
304            unfix: Some(unfix),
305        })
306    }
307}
308
309#[derive(Debug)]
310pub struct InstantManipulateExec {
311    start: Millisecond,
312    end: Millisecond,
313    lookback_delta: Millisecond,
314    interval: Millisecond,
315    time_index_column: String,
316    field_column: Option<String>,
317    reuse_tsid_column: bool,
318
319    input: Arc<dyn ExecutionPlan>,
320    metric: ExecutionPlanMetricsSet,
321}
322
323impl ExecutionPlan for InstantManipulateExec {
324    fn as_any(&self) -> &dyn Any {
325        self
326    }
327
328    fn schema(&self) -> SchemaRef {
329        self.input.schema()
330    }
331
332    fn properties(&self) -> &Arc<PlanProperties> {
333        self.input.properties()
334    }
335
336    fn required_input_distribution(&self) -> Vec<Distribution> {
337        self.input.required_input_distribution()
338    }
339
340    // Prevent reordering of input
341    fn maintains_input_order(&self) -> Vec<bool> {
342        vec![false; self.children().len()]
343    }
344
345    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
346        vec![&self.input]
347    }
348
349    fn with_new_children(
350        self: Arc<Self>,
351        children: Vec<Arc<dyn ExecutionPlan>>,
352    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
353        assert!(!children.is_empty());
354        Ok(Arc::new(Self {
355            start: self.start,
356            end: self.end,
357            lookback_delta: self.lookback_delta,
358            interval: self.interval,
359            time_index_column: self.time_index_column.clone(),
360            field_column: self.field_column.clone(),
361            reuse_tsid_column: self.reuse_tsid_column,
362            input: children[0].clone(),
363            metric: self.metric.clone(),
364        }))
365    }
366
367    fn execute(
368        &self,
369        partition: usize,
370        context: Arc<TaskContext>,
371    ) -> DataFusionResult<SendableRecordBatchStream> {
372        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
373        let num_series = Count::new();
374        MetricBuilder::new(&self.metric)
375            .with_partition(partition)
376            .build(MetricValue::Count {
377                name: METRIC_NUM_SERIES.into(),
378                count: num_series.clone(),
379            });
380
381        let input = self.input.execute(partition, context)?;
382        let schema = input.schema();
383        let time_index = schema
384            .column_with_name(&self.time_index_column)
385            .expect("time index column not found")
386            .0;
387        let field_index = self
388            .field_column
389            .as_ref()
390            .and_then(|name| schema.column_with_name(name))
391            .map(|x| x.0);
392        let tsid_index = schema
393            .column_with_name("__tsid")
394            .filter(|(_, field)| field.data_type() == &DataType::UInt64)
395            .map(|(index, _)| index);
396        Ok(Box::pin(InstantManipulateStream {
397            start: self.start,
398            end: self.end,
399            lookback_delta: self.lookback_delta,
400            interval: self.interval,
401            time_index,
402            field_index,
403            tsid_index,
404            reuse_tsid_column: self.reuse_tsid_column && tsid_index.is_some(),
405            schema,
406            input,
407            metric: baseline_metric,
408            num_series,
409        }))
410    }
411
412    fn metrics(&self) -> Option<MetricsSet> {
413        Some(self.metric.clone_inner())
414    }
415
416    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
417        let input_stats = self.input.partition_statistics(partition)?;
418
419        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
420        let estimated_total_bytes = input_stats
421            .total_byte_size
422            .get_value()
423            .zip(input_stats.num_rows.get_value())
424            .map(|(size, rows)| {
425                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
426            })
427            .unwrap_or(Precision::Absent);
428
429        Ok(Statistics {
430            num_rows: Precision::Inexact(estimated_row_num.floor() as _),
431            total_byte_size: estimated_total_bytes,
432            // TODO(ruihang): support this column statistics
433            column_statistics: Statistics::unknown_column(&self.schema()),
434        })
435    }
436
437    fn name(&self) -> &str {
438        "InstantManipulateExec"
439    }
440}
441
442impl DisplayAs for InstantManipulateExec {
443    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
444        match t {
445            DisplayFormatType::Default
446            | DisplayFormatType::Verbose
447            | DisplayFormatType::TreeRender => {
448                write!(
449                    f,
450                    "PromInstantManipulateExec: range=[{}..{}], lookback=[{}], interval=[{}], time index=[{}]",
451                    self.start,
452                    self.end,
453                    self.lookback_delta,
454                    self.interval,
455                    self.time_index_column
456                )
457            }
458        }
459    }
460}
461
462pub struct InstantManipulateStream {
463    start: Millisecond,
464    end: Millisecond,
465    lookback_delta: Millisecond,
466    interval: Millisecond,
467    // Column index of TIME INDEX column's position in schema
468    time_index: usize,
469    field_index: Option<usize>,
470    tsid_index: Option<usize>,
471    reuse_tsid_column: bool,
472
473    schema: SchemaRef,
474    input: SendableRecordBatchStream,
475    metric: BaselineMetrics,
476    /// Number of series processed.
477    num_series: Count,
478}
479
480impl RecordBatchStream for InstantManipulateStream {
481    fn schema(&self) -> SchemaRef {
482        self.schema.clone()
483    }
484}
485
486impl Stream for InstantManipulateStream {
487    type Item = DataFusionResult<RecordBatch>;
488
489    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
490        let poll = match ready!(self.input.poll_next_unpin(cx)) {
491            Some(Ok(batch)) => {
492                if batch.num_rows() == 0 {
493                    return Poll::Pending;
494                }
495                let timer = std::time::Instant::now();
496                self.num_series.add(1);
497                let result = Ok(batch).and_then(|batch| self.manipulate(batch));
498                self.metric.elapsed_compute().add_elapsed(timer);
499                Poll::Ready(Some(result))
500            }
501            None => {
502                PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
503                Poll::Ready(None)
504            }
505            Some(Err(e)) => Poll::Ready(Some(Err(e))),
506        };
507        self.metric.record_poll(poll)
508    }
509}
510
511impl InstantManipulateStream {
512    // Refer to Prometheus `vectorSelectorSingle` / lookback semantics.
513    //
514    // Prometheus `v3.9.1` uses a start-exclusive lookback window:
515    //   (eval_ts - lookback_delta, eval_ts]
516    // i.e. a sample at exactly `eval_ts - lookback_delta` is considered too old.
517    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
518        let ts_column = input
519            .column(self.time_index)
520            .as_any()
521            .downcast_ref::<TimestampMillisecondArray>()
522            .ok_or_else(|| {
523                DataFusionError::Execution(
524                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
525                )
526            })?;
527
528        // Early return for empty input
529        if ts_column.is_empty() {
530            return Ok(input);
531        }
532
533        // field column for staleness check
534        let field_column = self
535            .field_index
536            .and_then(|index| input.column(index).as_any().downcast_ref::<Float64Array>());
537
538        // Optimize iteration range based on actual data bounds
539        let first_ts = ts_column.value(0);
540        let last_ts = ts_column.value(ts_column.len() - 1);
541        // A sample at `t` is eligible for eval time `eval_ts` iff:
542        //   t > eval_ts - lookback_delta  <=>  eval_ts < t + lookback_delta.
543        // Therefore the last eval timestamp for which the last sample is still eligible is:
544        //   last_ts + lookback_delta - 1 (millisecond granularity).
545        let last_useful = if self.lookback_delta > 0 {
546            last_ts + self.lookback_delta - 1
547        } else {
548            last_ts
549        };
550
551        let max_start = first_ts.max(self.start);
552        let min_end = last_useful.min(self.end);
553
554        let aligned_start = self.start + (max_start - self.start) / self.interval * self.interval;
555        let aligned_end = self.end - (self.end - min_end) / self.interval * self.interval;
556
557        let estimated_points = if aligned_end >= aligned_start {
558            ((aligned_end - aligned_start) / self.interval).saturating_add(1) as usize
559        } else {
560            0
561        };
562        if estimated_points > MAX_INSTANT_MANIPULATE_OUTPUT_POINTS {
563            return Err(DataFusionError::Execution(format!(
564                "InstantManipulate output points exceed limit: {estimated_points} > {MAX_INSTANT_MANIPULATE_OUTPUT_POINTS}"
565            )));
566        }
567        let mut take_indices = Vec::with_capacity(estimated_points);
568
569        let mut cursor = 0;
570
571        let aligned_ts_iter = (aligned_start..=aligned_end).step_by(self.interval as usize);
572        let mut aligned_ts = Vec::with_capacity(estimated_points);
573
574        // calculate the offsets to take
575        'next: for expected_ts in aligned_ts_iter {
576            // first, search toward end to see if there is matched timestamp
577            while cursor < ts_column.len() {
578                let curr = ts_column.value(cursor);
579                match curr.cmp(&expected_ts) {
580                    Ordering::Equal => {
581                        if let Some(field_column) = &field_column
582                            && field_column.value(cursor).is_nan()
583                        {
584                            // ignore the NaN value
585                        } else {
586                            take_indices.push(cursor as u64);
587                            aligned_ts.push(expected_ts);
588                        }
589                        continue 'next;
590                    }
591                    Ordering::Greater => break,
592                    Ordering::Less => {}
593                }
594                cursor += 1;
595            }
596            if cursor == ts_column.len() {
597                cursor -= 1;
598                // short cut this loop
599                if ts_column.value(cursor) + self.lookback_delta <= expected_ts {
600                    break;
601                }
602            }
603
604            // then examine the value
605            let curr_ts = ts_column.value(cursor);
606            if curr_ts + self.lookback_delta <= expected_ts {
607                continue;
608            }
609            if curr_ts > expected_ts {
610                // exceeds current expected timestamp, examine the previous value
611                if let Some(prev_cursor) = cursor.checked_sub(1) {
612                    let prev_ts = ts_column.value(prev_cursor);
613                    if prev_ts + self.lookback_delta > expected_ts {
614                        // only use the point in the time range
615                        if let Some(field_column) = &field_column
616                            && field_column.value(prev_cursor).is_nan()
617                        {
618                            // if the newest value is NaN, it means the value is stale, so we should not use it
619                            continue;
620                        }
621                        // use this point
622                        take_indices.push(prev_cursor as u64);
623                        aligned_ts.push(expected_ts);
624                    }
625                }
626            } else if let Some(field_column) = &field_column
627                && field_column.value(cursor).is_nan()
628            {
629                // if the newest value is NaN, it means the value is stale, so we should not use it
630            } else {
631                // use this point
632                take_indices.push(cursor as u64);
633                aligned_ts.push(expected_ts);
634            }
635        }
636
637        // take record batch and replace the time index column
638        self.take_record_batch_optional(input, take_indices, aligned_ts)
639    }
640
641    /// Helper function to apply "take" on record batch.
642    fn take_record_batch_optional(
643        &self,
644        record_batch: RecordBatch,
645        take_indices: Vec<u64>,
646        aligned_ts: Vec<Millisecond>,
647    ) -> DataFusionResult<RecordBatch> {
648        assert_eq!(take_indices.len(), aligned_ts.len());
649
650        let output_len = aligned_ts.len();
651        let mut indices_array = None;
652        let mut arrays = Vec::with_capacity(record_batch.num_columns());
653        let aligned_ts = Arc::new(TimestampMillisecondArray::from(aligned_ts)) as Arc<dyn Array>;
654
655        for (index, array) in record_batch.columns().iter().enumerate() {
656            if index == self.time_index {
657                arrays.push(aligned_ts.clone());
658                continue;
659            }
660
661            if self.reuse_tsid_column && self.tsid_index == Some(index) {
662                arrays.push(reuse_constant_column(array, output_len)?);
663                continue;
664            }
665
666            let indices_array =
667                indices_array.get_or_insert_with(|| UInt64Array::from(take_indices.clone()));
668            arrays.push(compute::take(array, indices_array, None)?);
669        }
670
671        let result = RecordBatch::try_new(record_batch.schema(), arrays)
672            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
673        Ok(result)
674    }
675}
676
677fn reuse_constant_column(array: &Arc<dyn Array>, len: usize) -> DataFusionResult<Arc<dyn Array>> {
678    if len <= array.len() {
679        return Ok(array.slice(0, len));
680    }
681
682    if array.is_empty() {
683        return Ok(array.slice(0, 0));
684    }
685
686    ScalarValue::try_from_array(array.as_ref(), 0)?.to_array_of_size(len)
687}
688
689#[cfg(test)]
690mod test {
691    use datafusion::arrow::datatypes::{DataType, Field, Schema};
692    use datafusion::common::ToDFSchema;
693    use datafusion::datasource::memory::MemorySourceConfig;
694    use datafusion::datasource::source::DataSourceExec;
695    use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
696    use datafusion::prelude::SessionContext;
697
698    use super::*;
699    use crate::extension_plan::test_util::{
700        TIME_INDEX_COLUMN, prepare_test_data, prepare_test_data_with_nan,
701    };
702
703    async fn do_normalize_test(
704        start: Millisecond,
705        end: Millisecond,
706        lookback_delta: Millisecond,
707        interval: Millisecond,
708        expected: String,
709        contains_nan: bool,
710    ) {
711        let memory_exec = if contains_nan {
712            Arc::new(prepare_test_data_with_nan())
713        } else {
714            Arc::new(prepare_test_data())
715        };
716        let normalize_exec = Arc::new(InstantManipulateExec {
717            start,
718            end,
719            lookback_delta,
720            interval,
721            time_index_column: TIME_INDEX_COLUMN.to_string(),
722            field_column: Some("value".to_string()),
723            reuse_tsid_column: false,
724            input: memory_exec,
725            metric: ExecutionPlanMetricsSet::new(),
726        });
727        let session_context = SessionContext::default();
728        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
729            .await
730            .unwrap();
731        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
732            .unwrap()
733            .to_string();
734
735        assert_eq!(result_literal, expected);
736    }
737
738    #[test]
739    fn pruning_should_keep_time_and_field_columns_for_exec() {
740        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
741        let input = LogicalPlan::EmptyRelation(EmptyRelation {
742            produce_one_row: false,
743            schema: df_schema,
744        });
745        let plan = InstantManipulate::new(
746            0,
747            0,
748            0,
749            0,
750            TIME_INDEX_COLUMN.to_string(),
751            Vec::new(),
752            Some("value".to_string()),
753            input,
754        );
755
756        // Simulate a parent projection requesting only the `path` column.
757        let output_columns = [2usize];
758        let required = plan.necessary_children_exprs(&output_columns).unwrap();
759        let required = &required[0];
760        assert_eq!(required.as_slice(), &[0, 1, 2]);
761    }
762
763    #[test]
764    fn rebuild_should_recover_tag_columns_from_series_divide_input() {
765        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
766        let input = LogicalPlan::EmptyRelation(EmptyRelation {
767            produce_one_row: false,
768            schema: df_schema,
769        });
770        let series_divide = LogicalPlan::Extension(Extension {
771            node: Arc::new(SeriesDivide::new(
772                vec!["__tsid".to_string()],
773                TIME_INDEX_COLUMN.to_string(),
774                input,
775            )),
776        });
777        let bytes = InstantManipulate::new(
778            0,
779            0,
780            0,
781            0,
782            TIME_INDEX_COLUMN.to_string(),
783            vec!["__tsid".to_string()],
784            Some("value".to_string()),
785            series_divide.clone(),
786        )
787        .serialize();
788        let plan = InstantManipulate::deserialize(&bytes)
789            .unwrap()
790            .with_exprs_and_inputs(vec![], vec![series_divide])
791            .unwrap();
792
793        assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
794    }
795
796    #[test]
797    fn rebuild_should_recover_tag_columns_from_series_normalize_input() {
798        let df_schema = prepare_test_data().schema().to_dfschema_ref().unwrap();
799        let input = LogicalPlan::EmptyRelation(EmptyRelation {
800            produce_one_row: false,
801            schema: df_schema,
802        });
803        let series_divide = LogicalPlan::Extension(Extension {
804            node: Arc::new(SeriesDivide::new(
805                vec!["__tsid".to_string()],
806                TIME_INDEX_COLUMN.to_string(),
807                input,
808            )),
809        });
810        let series_normalize = LogicalPlan::Extension(Extension {
811            node: Arc::new(crate::extension_plan::SeriesNormalize::new(
812                0,
813                TIME_INDEX_COLUMN,
814                false,
815                vec!["__tsid".to_string()],
816                series_divide,
817            )),
818        });
819        let bytes = InstantManipulate::new(
820            0,
821            0,
822            0,
823            0,
824            TIME_INDEX_COLUMN.to_string(),
825            vec!["__tsid".to_string()],
826            Some("value".to_string()),
827            series_normalize.clone(),
828        )
829        .serialize();
830        let plan = InstantManipulate::deserialize(&bytes)
831            .unwrap()
832            .with_exprs_and_inputs(vec![], vec![series_normalize])
833            .unwrap();
834
835        assert_eq!(plan.tag_columns, vec!["__tsid".to_string()]);
836    }
837
838    #[test]
839    fn to_execution_plan_enables_tsid_fast_path() {
840        let schema = Arc::new(Schema::new(vec![
841            Field::new(
842                TIME_INDEX_COLUMN,
843                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
844                false,
845            ),
846            Field::new("value", DataType::Float64, true),
847        ]));
848        let exec_input: Arc<dyn ExecutionPlan> = Arc::new(DataSourceExec::new(Arc::new(
849            MemorySourceConfig::try_new(&[], schema, None).unwrap(),
850        )));
851
852        let exec = InstantManipulate::new(
853            0,
854            0,
855            0,
856            0,
857            TIME_INDEX_COLUMN.to_string(),
858            vec!["__tsid".to_string()],
859            Some("value".to_string()),
860            LogicalPlan::EmptyRelation(EmptyRelation {
861                produce_one_row: false,
862                schema: Arc::new(datafusion::common::DFSchema::empty()),
863            }),
864        )
865        .to_execution_plan(exec_input);
866
867        assert!(format!("{exec:?}").contains("reuse_tsid_column: true"));
868    }
869
870    #[tokio::test]
871    async fn tsid_fast_path_reuses_tsid_column_when_output_grows() {
872        let schema = Arc::new(Schema::new(vec![
873            Field::new(
874                TIME_INDEX_COLUMN,
875                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
876                false,
877            ),
878            Field::new("value", DataType::Float64, true),
879            Field::new("host", DataType::Utf8, true),
880            Field::new("__tsid", DataType::UInt64, false),
881        ]));
882        let batch = RecordBatch::try_new(
883            schema.clone(),
884            vec![
885                Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
886                Arc::new(Float64Array::from(vec![1.0, 2.0])),
887                Arc::new(datafusion::arrow::array::StringArray::from(vec![
888                    "foo", "foo",
889                ])),
890                Arc::new(UInt64Array::from(vec![42, 42])),
891            ],
892        )
893        .unwrap();
894        let input = Arc::new(DataSourceExec::new(Arc::new(
895            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
896        )));
897        let normalize_exec = Arc::new(InstantManipulateExec {
898            start: 0,
899            end: 1_500,
900            lookback_delta: 1_000,
901            interval: 500,
902            time_index_column: TIME_INDEX_COLUMN.to_string(),
903            field_column: Some("value".to_string()),
904            reuse_tsid_column: true,
905            input,
906            metric: ExecutionPlanMetricsSet::new(),
907        });
908        let session_context = SessionContext::default();
909        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
910            .await
911            .unwrap();
912        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
913            .unwrap()
914            .to_string();
915
916        assert_eq!(
917            result_literal,
918            "+-------------------------+-------+------+--------+\
919            \n| timestamp               | value | host | __tsid |\
920            \n+-------------------------+-------+------+--------+\
921            \n| 1970-01-01T00:00:00     | 1.0   | foo  | 42     |\
922            \n| 1970-01-01T00:00:00.500 | 1.0   | foo  | 42     |\
923            \n| 1970-01-01T00:00:01     | 2.0   | foo  | 42     |\
924            \n| 1970-01-01T00:00:01.500 | 2.0   | foo  | 42     |\
925            \n+-------------------------+-------+------+--------+"
926        );
927    }
928
929    #[tokio::test]
930    async fn tsid_fast_path_still_takes_additional_field_columns() {
931        let schema = Arc::new(Schema::new(vec![
932            Field::new(
933                TIME_INDEX_COLUMN,
934                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
935                false,
936            ),
937            Field::new("value", DataType::Float64, true),
938            Field::new("value_2", DataType::Float64, true),
939            Field::new("host", DataType::Utf8, true),
940            Field::new("__tsid", DataType::UInt64, false),
941        ]));
942        let batch = RecordBatch::try_new(
943            schema.clone(),
944            vec![
945                Arc::new(TimestampMillisecondArray::from(vec![0, 1_000])),
946                Arc::new(Float64Array::from(vec![1.0, 2.0])),
947                Arc::new(Float64Array::from(vec![10.0, 20.0])),
948                Arc::new(datafusion::arrow::array::StringArray::from(vec![
949                    "foo", "foo",
950                ])),
951                Arc::new(UInt64Array::from(vec![42, 42])),
952            ],
953        )
954        .unwrap();
955        let input = Arc::new(DataSourceExec::new(Arc::new(
956            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
957        )));
958        let normalize_exec = Arc::new(InstantManipulateExec {
959            start: 0,
960            end: 1_500,
961            lookback_delta: 1_000,
962            interval: 500,
963            time_index_column: TIME_INDEX_COLUMN.to_string(),
964            field_column: Some("value".to_string()),
965            reuse_tsid_column: true,
966            input,
967            metric: ExecutionPlanMetricsSet::new(),
968        });
969        let session_context = SessionContext::default();
970        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
971            .await
972            .unwrap();
973        let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
974            .unwrap()
975            .to_string();
976
977        assert_eq!(
978            result_literal,
979            "+-------------------------+-------+---------+------+--------+\
980            \n| timestamp               | value | value_2 | host | __tsid |\
981            \n+-------------------------+-------+---------+------+--------+\
982            \n| 1970-01-01T00:00:00     | 1.0   | 10.0    | foo  | 42     |\
983            \n| 1970-01-01T00:00:00.500 | 1.0   | 10.0    | foo  | 42     |\
984            \n| 1970-01-01T00:00:01     | 2.0   | 20.0    | foo  | 42     |\
985            \n| 1970-01-01T00:00:01.500 | 2.0   | 20.0    | foo  | 42     |\
986            \n+-------------------------+-------+---------+------+--------+"
987        );
988    }
989
990    #[tokio::test]
991    async fn manipulate_should_reject_too_many_output_points() {
992        let schema = Arc::new(Schema::new(vec![
993            Field::new(
994                TIME_INDEX_COLUMN,
995                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
996                false,
997            ),
998            Field::new("value", DataType::Float64, true),
999        ]));
1000        let batch = RecordBatch::try_new(
1001            schema.clone(),
1002            vec![
1003                Arc::new(TimestampMillisecondArray::from(vec![0])),
1004                Arc::new(Float64Array::from(vec![1.0])),
1005            ],
1006        )
1007        .unwrap();
1008        let input = Arc::new(DataSourceExec::new(Arc::new(
1009            MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(),
1010        )));
1011        let too_many_points = MAX_INSTANT_MANIPULATE_OUTPUT_POINTS as Millisecond + 1;
1012        let normalize_exec = Arc::new(InstantManipulateExec {
1013            start: 0,
1014            end: too_many_points,
1015            lookback_delta: too_many_points + 1,
1016            interval: 1,
1017            time_index_column: TIME_INDEX_COLUMN.to_string(),
1018            field_column: Some("value".to_string()),
1019            reuse_tsid_column: false,
1020            input,
1021            metric: ExecutionPlanMetricsSet::new(),
1022        });
1023        let session_context = SessionContext::default();
1024        let err = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
1025            .await
1026            .unwrap_err();
1027
1028        assert!(
1029            err.to_string()
1030                .contains("InstantManipulate output points exceed limit")
1031        );
1032    }
1033
1034    #[tokio::test]
1035    async fn lookback_10s_interval_30s() {
1036        let expected = String::from(
1037            "+---------------------+-------+------+\
1038            \n| timestamp           | value | path |\
1039            \n+---------------------+-------+------+\
1040            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1041            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1042            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1043            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1044            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1045            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1046            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1047            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1048            \n+---------------------+-------+------+",
1049        );
1050        do_normalize_test(0, 310_000, 10_000, 30_000, expected, false).await;
1051    }
1052
1053    #[tokio::test]
1054    async fn lookback_10s_interval_10s() {
1055        let expected = String::from(
1056            "+---------------------+-------+------+\
1057            \n| timestamp           | value | path |\
1058            \n+---------------------+-------+------+\
1059            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1060            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1061            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1062            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1063            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1064            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1065            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1066            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
1067            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
1068            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1069            \n+---------------------+-------+------+",
1070        );
1071        do_normalize_test(0, 300_000, 10_000, 10_000, expected, false).await;
1072    }
1073
1074    #[tokio::test]
1075    async fn lookback_30s_interval_30s() {
1076        let expected = String::from(
1077            "+---------------------+-------+------+\
1078            \n| timestamp           | value | path |\
1079            \n+---------------------+-------+------+\
1080            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1081            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1082            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1083            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1084            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1085            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1086            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1087            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
1088            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1089            \n+---------------------+-------+------+",
1090        );
1091        do_normalize_test(0, 300_000, 30_000, 30_000, expected, false).await;
1092    }
1093
1094    #[tokio::test]
1095    async fn lookback_30s_interval_10s() {
1096        let expected = String::from(
1097            "+---------------------+-------+------+\
1098            \n| timestamp           | value | path |\
1099            \n+---------------------+-------+------+\
1100            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1101            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
1102            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
1103            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1104            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
1105            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
1106            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1107            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
1108            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
1109            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1110            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
1111            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
1112            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1113            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
1114            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
1115            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1116            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
1117            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
1118            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1119            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
1120            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
1121            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
1122            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
1123            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
1124            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1125            \n+---------------------+-------+------+",
1126        );
1127        do_normalize_test(0, 300_000, 30_000, 10_000, expected, false).await;
1128    }
1129
1130    #[tokio::test]
1131    async fn lookback_60s_interval_10s() {
1132        let expected = String::from(
1133            "+---------------------+-------+------+\
1134            \n| timestamp           | value | path |\
1135            \n+---------------------+-------+------+\
1136            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1137            \n| 1970-01-01T00:00:10 | 1.0   | foo  |\
1138            \n| 1970-01-01T00:00:20 | 1.0   | foo  |\
1139            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1140            \n| 1970-01-01T00:00:40 | 1.0   | foo  |\
1141            \n| 1970-01-01T00:00:50 | 1.0   | foo  |\
1142            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1143            \n| 1970-01-01T00:01:10 | 1.0   | foo  |\
1144            \n| 1970-01-01T00:01:20 | 1.0   | foo  |\
1145            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1146            \n| 1970-01-01T00:01:40 | 1.0   | foo  |\
1147            \n| 1970-01-01T00:01:50 | 1.0   | foo  |\
1148            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1149            \n| 1970-01-01T00:02:10 | 1.0   | foo  |\
1150            \n| 1970-01-01T00:02:20 | 1.0   | foo  |\
1151            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
1152            \n| 1970-01-01T00:02:40 | 1.0   | foo  |\
1153            \n| 1970-01-01T00:02:50 | 1.0   | foo  |\
1154            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1155            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
1156            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
1157            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
1158            \n| 1970-01-01T00:03:40 | 1.0   | foo  |\
1159            \n| 1970-01-01T00:03:50 | 1.0   | foo  |\
1160            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1161            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
1162            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
1163            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
1164            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
1165            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
1166            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1167            \n+---------------------+-------+------+",
1168        );
1169        do_normalize_test(0, 300_000, 60_000, 10_000, expected, false).await;
1170    }
1171
1172    #[tokio::test]
1173    async fn lookback_60s_interval_30s() {
1174        let expected = String::from(
1175            "+---------------------+-------+------+\
1176            \n| timestamp           | value | path |\
1177            \n+---------------------+-------+------+\
1178            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1179            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1180            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1181            \n| 1970-01-01T00:01:30 | 1.0   | foo  |\
1182            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1183            \n| 1970-01-01T00:02:30 | 1.0   | foo  |\
1184            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1185            \n| 1970-01-01T00:03:30 | 1.0   | foo  |\
1186            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1187            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
1188            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1189            \n+---------------------+-------+------+",
1190        );
1191        do_normalize_test(0, 300_000, 60_000, 30_000, expected, false).await;
1192    }
1193
1194    #[tokio::test]
1195    async fn small_range_lookback_0s_interval_1s() {
1196        let expected = String::from(
1197            "+---------------------+-------+------+\
1198            \n| timestamp           | value | path |\
1199            \n+---------------------+-------+------+\
1200            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1201            \n| 1970-01-01T00:04:01 | 1.0   | foo  |\
1202            \n+---------------------+-------+------+",
1203        );
1204        do_normalize_test(230_000, 245_000, 0, 1_000, expected, false).await;
1205    }
1206
1207    #[tokio::test]
1208    async fn small_range_lookback_10s_interval_10s() {
1209        let expected = String::from(
1210            "+---------------------+-------+------+\
1211            \n| timestamp           | value | path |\
1212            \n+---------------------+-------+------+\
1213            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1214            \n| 1970-01-01T00:00:30 | 1.0   | foo  |\
1215            \n+---------------------+-------+------+",
1216        );
1217        do_normalize_test(0, 30_000, 10_000, 10_000, expected, false).await;
1218    }
1219
1220    #[tokio::test]
1221    async fn large_range_lookback_30s_interval_60s() {
1222        let expected = String::from(
1223            "+---------------------+-------+------+\
1224            \n| timestamp           | value | path |\
1225            \n+---------------------+-------+------+\
1226            \n| 1970-01-01T00:00:00 | 1.0   | foo  |\
1227            \n| 1970-01-01T00:01:00 | 1.0   | foo  |\
1228            \n| 1970-01-01T00:02:00 | 1.0   | foo  |\
1229            \n| 1970-01-01T00:03:00 | 1.0   | foo  |\
1230            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1231            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1232            \n+---------------------+-------+------+",
1233        );
1234        do_normalize_test(-900_000, 900_000, 30_000, 60_000, expected, false).await;
1235    }
1236
1237    #[tokio::test]
1238    async fn small_range_lookback_30s_interval_30s() {
1239        let expected = String::from(
1240            "+---------------------+-------+------+\
1241            \n| timestamp           | value | path |\
1242            \n+---------------------+-------+------+\
1243            \n| 1970-01-01T00:03:10 | 1.0   | foo  |\
1244            \n| 1970-01-01T00:03:20 | 1.0   | foo  |\
1245            \n| 1970-01-01T00:04:00 | 1.0   | foo  |\
1246            \n| 1970-01-01T00:04:10 | 1.0   | foo  |\
1247            \n| 1970-01-01T00:04:20 | 1.0   | foo  |\
1248            \n| 1970-01-01T00:04:30 | 1.0   | foo  |\
1249            \n| 1970-01-01T00:04:40 | 1.0   | foo  |\
1250            \n| 1970-01-01T00:04:50 | 1.0   | foo  |\
1251            \n| 1970-01-01T00:05:00 | 1.0   | foo  |\
1252            \n+---------------------+-------+------+",
1253        );
1254        do_normalize_test(190_000, 300_000, 30_000, 10_000, expected, false).await;
1255    }
1256
1257    #[tokio::test]
1258    async fn lookback_10s_interval_10s_with_nan() {
1259        let expected = String::from(
1260            "+---------------------+-------+\
1261            \n| timestamp           | value |\
1262            \n+---------------------+-------+\
1263            \n| 1970-01-01T00:00:00 | 0.0   |\
1264            \n| 1970-01-01T00:01:00 | 6.0   |\
1265            \n| 1970-01-01T00:02:00 | 12.0  |\
1266            \n+---------------------+-------+",
1267        );
1268        do_normalize_test(0, 300_000, 10_000, 10_000, expected, true).await;
1269    }
1270
1271    #[tokio::test]
1272    async fn lookback_10s_interval_10s_with_nan_unaligned() {
1273        let expected = String::from(
1274            "+-------------------------+-------+\
1275            \n| timestamp               | value |\
1276            \n+-------------------------+-------+\
1277            \n| 1970-01-01T00:00:00.001 | 0.0   |\
1278            \n| 1970-01-01T00:01:00.001 | 6.0   |\
1279            \n| 1970-01-01T00:02:00.001 | 12.0  |\
1280            \n+-------------------------+-------+",
1281        );
1282        do_normalize_test(1, 300_001, 10_000, 10_000, expected, true).await;
1283    }
1284
1285    #[tokio::test]
1286    async fn ultra_large_range() {
1287        let expected = String::from(
1288            "+-------------------------+-------+\
1289            \n| timestamp               | value |\
1290            \n+-------------------------+-------+\
1291            \n| 1970-01-01T00:00:00.001 | 0.0   |\
1292            \n| 1970-01-01T00:01:00.001 | 6.0   |\
1293            \n| 1970-01-01T00:02:00.001 | 12.0  |\
1294            \n+-------------------------+-------+",
1295        );
1296        do_normalize_test(
1297            -900_000_000_000_000 + 1,
1298            900_000_000_000_000,
1299            10_000,
1300            10_000,
1301            expected,
1302            true,
1303        )
1304        .await;
1305    }
1306}