promql/extension_plan/
range_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37    SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{ready, Stream, StreamExt};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50/// Time series manipulator for range function.
51///
52/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
53/// other columns to the same length with the "folded" [RangeArray] column.
54///
55/// To pass runtime information to the execution plan (or the range function), This plan
56/// will add those extra columns:
57/// - timestamp range with type [RangeArray], which is the folded timestamp column.
58/// - end of current range with the same type as the timestamp column. (todo)
59#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61    start: Millisecond,
62    end: Millisecond,
63    interval: Millisecond,
64    range: Millisecond,
65
66    time_index: String,
67    field_columns: Vec<String>,
68    input: LogicalPlan,
69    output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73    pub fn new(
74        start: Millisecond,
75        end: Millisecond,
76        interval: Millisecond,
77        range: Millisecond,
78        time_index: String,
79        field_columns: Vec<String>,
80        input: LogicalPlan,
81    ) -> DataFusionResult<Self> {
82        let output_schema =
83            Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84        Ok(Self {
85            start,
86            end,
87            interval,
88            range,
89            time_index,
90            field_columns,
91            input,
92            output_schema,
93        })
94    }
95
96    pub const fn name() -> &'static str {
97        "RangeManipulate"
98    }
99
100    pub fn build_timestamp_range_name(time_index: &str) -> String {
101        format!("{time_index}_range")
102    }
103
104    pub fn internal_range_end_col_name() -> String {
105        "__internal_range_end".to_string()
106    }
107
108    fn range_timestamp_name(&self) -> String {
109        Self::build_timestamp_range_name(&self.time_index)
110    }
111
112    fn calculate_output_schema(
113        input_schema: &DFSchemaRef,
114        time_index: &str,
115        field_columns: &[String],
116    ) -> DataFusionResult<DFSchemaRef> {
117        let columns = input_schema.fields();
118        let mut new_columns = Vec::with_capacity(columns.len() + 1);
119        for i in 0..columns.len() {
120            let x = input_schema.qualified_field(i);
121            new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122        }
123
124        // process time index column
125        // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
126        let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127            return Err(datafusion::common::field_not_found(
128                None::<TableReference>,
129                time_index,
130                input_schema.as_ref(),
131            ));
132        };
133        let ts_col_field = &columns[ts_col_index];
134        let timestamp_range_field = Field::new(
135            Self::build_timestamp_range_name(time_index),
136            RangeArray::convert_field(ts_col_field).data_type().clone(),
137            ts_col_field.is_nullable(),
138        );
139        new_columns.push((None, Arc::new(timestamp_range_field)));
140
141        // process value columns
142        for name in field_columns {
143            let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144                return Err(datafusion::common::field_not_found(
145                    None::<TableReference>,
146                    name,
147                    input_schema.as_ref(),
148                ));
149            };
150            new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151        }
152
153        Ok(Arc::new(DFSchema::new_with_metadata(
154            new_columns,
155            HashMap::new(),
156        )?))
157    }
158
159    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160        let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161        let properties = exec_input.properties();
162        let properties = PlanProperties::new(
163            EquivalenceProperties::new(output_schema.clone()),
164            properties.partitioning.clone(),
165            properties.emission_type,
166            properties.boundedness,
167        );
168        Arc::new(RangeManipulateExec {
169            start: self.start,
170            end: self.end,
171            interval: self.interval,
172            range: self.range,
173            time_index_column: self.time_index.clone(),
174            time_range_column: self.range_timestamp_name(),
175            field_columns: self.field_columns.clone(),
176            input: exec_input,
177            output_schema,
178            metric: ExecutionPlanMetricsSet::new(),
179            properties,
180        })
181    }
182
183    pub fn serialize(&self) -> Vec<u8> {
184        pb::RangeManipulate {
185            start: self.start,
186            end: self.end,
187            interval: self.interval,
188            range: self.range,
189            time_index: self.time_index.clone(),
190            tag_columns: self.field_columns.clone(),
191        }
192        .encode_to_vec()
193    }
194
195    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196        let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197        let empty_schema = Arc::new(DFSchema::empty());
198        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199            produce_one_row: false,
200            schema: empty_schema.clone(),
201        });
202
203        // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
204        // because the input schema is empty.
205        // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
206        // deserialized plan.
207        Ok(Self {
208            start: pb_range_manipulate.start,
209            end: pb_range_manipulate.end,
210            interval: pb_range_manipulate.interval,
211            range: pb_range_manipulate.range,
212            time_index: pb_range_manipulate.time_index,
213            field_columns: pb_range_manipulate.tag_columns,
214            input: placeholder_plan,
215            output_schema: empty_schema,
216        })
217    }
218}
219
220impl PartialOrd for RangeManipulate {
221    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222        // Compare fields in order excluding output_schema
223        match self.start.partial_cmp(&other.start) {
224            Some(core::cmp::Ordering::Equal) => {}
225            ord => return ord,
226        }
227        match self.end.partial_cmp(&other.end) {
228            Some(core::cmp::Ordering::Equal) => {}
229            ord => return ord,
230        }
231        match self.interval.partial_cmp(&other.interval) {
232            Some(core::cmp::Ordering::Equal) => {}
233            ord => return ord,
234        }
235        match self.range.partial_cmp(&other.range) {
236            Some(core::cmp::Ordering::Equal) => {}
237            ord => return ord,
238        }
239        match self.time_index.partial_cmp(&other.time_index) {
240            Some(core::cmp::Ordering::Equal) => {}
241            ord => return ord,
242        }
243        match self.field_columns.partial_cmp(&other.field_columns) {
244            Some(core::cmp::Ordering::Equal) => {}
245            ord => return ord,
246        }
247        self.input.partial_cmp(&other.input)
248    }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252    fn name(&self) -> &str {
253        Self::name()
254    }
255
256    fn inputs(&self) -> Vec<&LogicalPlan> {
257        vec![&self.input]
258    }
259
260    fn schema(&self) -> &DFSchemaRef {
261        &self.output_schema
262    }
263
264    fn expressions(&self) -> Vec<Expr> {
265        vec![]
266    }
267
268    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269        write!(
270            f,
271            "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272            self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273        )
274    }
275
276    fn with_exprs_and_inputs(
277        &self,
278        _exprs: Vec<Expr>,
279        mut inputs: Vec<LogicalPlan>,
280    ) -> DataFusionResult<Self> {
281        if inputs.len() != 1 {
282            return Err(DataFusionError::Internal(
283                "RangeManipulate should have at exact one input".to_string(),
284            ));
285        }
286
287        let input: LogicalPlan = inputs.pop().unwrap();
288        let input_schema = input.schema();
289        let output_schema =
290            Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292        Ok(Self {
293            start: self.start,
294            end: self.end,
295            interval: self.interval,
296            range: self.range,
297            time_index: self.time_index.clone(),
298            field_columns: self.field_columns.clone(),
299            input,
300            output_schema,
301        })
302    }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307    start: Millisecond,
308    end: Millisecond,
309    interval: Millisecond,
310    range: Millisecond,
311    time_index_column: String,
312    time_range_column: String,
313    field_columns: Vec<String>,
314
315    input: Arc<dyn ExecutionPlan>,
316    output_schema: SchemaRef,
317    metric: ExecutionPlanMetricsSet,
318    properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322    fn as_any(&self) -> &dyn Any {
323        self
324    }
325
326    fn schema(&self) -> SchemaRef {
327        self.output_schema.clone()
328    }
329
330    fn properties(&self) -> &PlanProperties {
331        &self.properties
332    }
333
334    fn maintains_input_order(&self) -> Vec<bool> {
335        vec![true; self.children().len()]
336    }
337
338    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339        vec![&self.input]
340    }
341
342    fn required_input_distribution(&self) -> Vec<Distribution> {
343        self.input.required_input_distribution()
344    }
345
346    fn with_new_children(
347        self: Arc<Self>,
348        children: Vec<Arc<dyn ExecutionPlan>>,
349    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
350        assert!(!children.is_empty());
351        let exec_input = children[0].clone();
352        let properties = exec_input.properties();
353        let properties = PlanProperties::new(
354            EquivalenceProperties::new(self.output_schema.clone()),
355            properties.partitioning.clone(),
356            properties.emission_type,
357            properties.boundedness,
358        );
359        Ok(Arc::new(Self {
360            start: self.start,
361            end: self.end,
362            interval: self.interval,
363            range: self.range,
364            time_index_column: self.time_index_column.clone(),
365            time_range_column: self.time_range_column.clone(),
366            field_columns: self.field_columns.clone(),
367            output_schema: self.output_schema.clone(),
368            input: children[0].clone(),
369            metric: self.metric.clone(),
370            properties,
371        }))
372    }
373
374    fn execute(
375        &self,
376        partition: usize,
377        context: Arc<TaskContext>,
378    ) -> DataFusionResult<SendableRecordBatchStream> {
379        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
380        let metrics_builder = MetricBuilder::new(&self.metric);
381        let num_series = Count::new();
382        metrics_builder
383            .with_partition(partition)
384            .build(MetricValue::Count {
385                name: METRIC_NUM_SERIES.into(),
386                count: num_series.clone(),
387            });
388
389        let input = self.input.execute(partition, context)?;
390        let schema = input.schema();
391        let time_index = schema
392            .column_with_name(&self.time_index_column)
393            .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
394            .0;
395        let field_columns = self
396            .field_columns
397            .iter()
398            .map(|value_col| {
399                schema
400                    .column_with_name(value_col)
401                    .unwrap_or_else(|| panic!("value column {value_col} not found",))
402                    .0
403            })
404            .collect();
405        let aligned_ts_array =
406            RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
407        Ok(Box::pin(RangeManipulateStream {
408            start: self.start,
409            end: self.end,
410            interval: self.interval,
411            range: self.range,
412            time_index,
413            field_columns,
414            aligned_ts_array,
415            output_schema: self.output_schema.clone(),
416            input,
417            metric: baseline_metric,
418            num_series,
419        }))
420    }
421
422    fn metrics(&self) -> Option<MetricsSet> {
423        Some(self.metric.clone_inner())
424    }
425
426    fn statistics(&self) -> DataFusionResult<Statistics> {
427        let input_stats = self.input.statistics()?;
428
429        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
430        let estimated_total_bytes = input_stats
431            .total_byte_size
432            .get_value()
433            .zip(input_stats.num_rows.get_value())
434            .map(|(size, rows)| {
435                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
436            })
437            .unwrap_or_default();
438
439        Ok(Statistics {
440            num_rows: Precision::Inexact(estimated_row_num as _),
441            total_byte_size: estimated_total_bytes,
442            // TODO(ruihang): support this column statistics
443            column_statistics: Statistics::unknown_column(&self.schema()),
444        })
445    }
446
447    fn name(&self) -> &str {
448        "RangeManipulateExec"
449    }
450}
451
452impl DisplayAs for RangeManipulateExec {
453    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
454        match t {
455            DisplayFormatType::Default | DisplayFormatType::Verbose => {
456                write!(
457                    f,
458                    "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
459                   self.start, self.end, self.interval, self.range, self.time_index_column
460                )
461            }
462        }
463    }
464}
465
466pub struct RangeManipulateStream {
467    start: Millisecond,
468    end: Millisecond,
469    interval: Millisecond,
470    range: Millisecond,
471    time_index: usize,
472    field_columns: Vec<usize>,
473    aligned_ts_array: ArrayRef,
474
475    output_schema: SchemaRef,
476    input: SendableRecordBatchStream,
477    metric: BaselineMetrics,
478    /// Number of series processed.
479    num_series: Count,
480}
481
482impl RecordBatchStream for RangeManipulateStream {
483    fn schema(&self) -> SchemaRef {
484        self.output_schema.clone()
485    }
486}
487
488impl Stream for RangeManipulateStream {
489    type Item = DataFusionResult<RecordBatch>;
490
491    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
492        let poll = loop {
493            match ready!(self.input.poll_next_unpin(cx)) {
494                Some(Ok(batch)) => {
495                    let timer = std::time::Instant::now();
496                    let result = self.manipulate(batch);
497                    if let Ok(None) = result {
498                        self.metric.elapsed_compute().add_elapsed(timer);
499                        continue;
500                    } else {
501                        self.num_series.add(1);
502                        self.metric.elapsed_compute().add_elapsed(timer);
503                        break Poll::Ready(result.transpose());
504                    }
505                }
506                None => {
507                    PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
508                    break Poll::Ready(None);
509                }
510                Some(Err(e)) => break Poll::Ready(Some(Err(e))),
511            }
512        };
513        self.metric.record_poll(poll)
514    }
515}
516
517impl RangeManipulateStream {
518    // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
519    // But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
520    // And the generated timestamp is not aligned to the step. It's expected to do later.
521    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
522        let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
523        // calculate the range
524        let (ranges, (start, end)) = self.calculate_range(&input)?;
525        // ignore this if all ranges are empty
526        if ranges.iter().all(|(_, len)| *len == 0) {
527            return Ok(None);
528        }
529
530        // transform columns
531        let mut new_columns = input.columns().to_vec();
532        for index in self.field_columns.iter() {
533            let _ = other_columns.remove(index);
534            let column = input.column(*index);
535            let new_column = Arc::new(
536                RangeArray::from_ranges(column.clone(), ranges.clone())
537                    .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
538                    .into_dict(),
539            );
540            new_columns[*index] = new_column;
541        }
542
543        // push timestamp range column
544        let ts_range_column =
545            RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
546                .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
547                .into_dict();
548        new_columns.push(Arc::new(ts_range_column));
549
550        // truncate other columns
551        let take_indices = Int64Array::from(vec![0; ranges.len()]);
552        for index in other_columns.into_iter() {
553            new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
554        }
555        // replace timestamp with the aligned one
556        let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
557            Self::build_aligned_ts_array(start, end, self.interval)
558        } else {
559            self.aligned_ts_array.clone()
560        };
561        new_columns[self.time_index] = new_time_index;
562
563        RecordBatch::try_new(self.output_schema.clone(), new_columns)
564            .map(Some)
565            .map_err(|e| DataFusionError::ArrowError(e, None))
566    }
567
568    fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
569        Arc::new(TimestampMillisecondArray::from_iter_values(
570            (start..=end).step_by(interval as _),
571        ))
572    }
573
574    /// Return values:
575    /// - A vector of tuples where each tuple contains the start index and length of the range.
576    /// - A tuple of the actual start/end timestamp used to calculate the range.
577    #[allow(clippy::type_complexity)]
578    fn calculate_range(
579        &self,
580        input: &RecordBatch,
581    ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
582        let ts_column = input
583            .column(self.time_index)
584            .as_any()
585            .downcast_ref::<TimestampMillisecondArray>()
586            .ok_or_else(|| {
587                DataFusionError::Execution(
588                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
589                )
590            })?;
591
592        let len = ts_column.len();
593        if len == 0 {
594            return Ok((vec![], (self.start, self.end)));
595        }
596
597        // shorten the range to calculate
598        let first_ts = ts_column.value(0);
599        let first_ts_aligned = (first_ts / self.interval) * self.interval;
600        let last_ts = ts_column.value(ts_column.len() - 1);
601        let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
602        let start = self.start.max(first_ts_aligned);
603        let end = self.end.min(last_ts_aligned);
604        if start > end {
605            return Ok((vec![], (start, end)));
606        }
607        let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
608
609        // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
610        let mut range_start_index = 0usize;
611        let mut last_range_start = 0;
612        let mut start_delta = 0;
613        for curr_ts in (start..=end).step_by(self.interval as _) {
614            // determine range start
615            let start_ts = curr_ts - self.range;
616
617            // advance cursor based on last range
618            let mut range_start = ts_column.len();
619            let mut range_end = 0;
620            let mut cursor = range_start_index + start_delta;
621            // search back to keep the result correct
622            while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
623                cursor -= 1;
624            }
625
626            while cursor < ts_column.len() {
627                let ts = ts_column.value(cursor);
628                if range_start > cursor && ts >= start_ts {
629                    range_start = cursor;
630                    range_start_index = range_start;
631                }
632                if ts <= curr_ts {
633                    range_end = range_end.max(cursor);
634                } else {
635                    range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
636                    break;
637                }
638                cursor += 1;
639            }
640            if range_start > range_end {
641                ranges.push((0, 0));
642                start_delta = 0;
643            } else {
644                ranges.push((range_start as _, (range_end + 1 - range_start) as _));
645                start_delta = range_start - last_range_start;
646                last_range_start = range_start;
647            }
648        }
649
650        Ok((ranges, (start, end)))
651    }
652}
653
654#[cfg(test)]
655mod test {
656    use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
657    use datafusion::arrow::datatypes::{
658        ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
659    };
660    use datafusion::common::ToDFSchema;
661    use datafusion::physical_expr::Partitioning;
662    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
663    use datafusion::physical_plan::memory::MemoryExec;
664    use datafusion::prelude::SessionContext;
665    use datatypes::arrow::array::TimestampMillisecondArray;
666
667    use super::*;
668
669    const TIME_INDEX_COLUMN: &str = "timestamp";
670
671    fn prepare_test_data() -> MemoryExec {
672        let schema = Arc::new(Schema::new(vec![
673            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
674            Field::new("value_1", DataType::Float64, true),
675            Field::new("value_2", DataType::Float64, true),
676            Field::new("path", DataType::Utf8, true),
677        ]));
678        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
679            0, 30_000, 60_000, 90_000, 120_000, // every 30s
680            180_000, 240_000, // every 60s
681            241_000, 271_000, 291_000, // others
682        ])) as _;
683        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
684        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
685        let data = RecordBatch::try_new(
686            schema.clone(),
687            vec![
688                timestamp_column,
689                field_column.clone(),
690                field_column,
691                path_column,
692            ],
693        )
694        .unwrap();
695
696        MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
697    }
698
699    async fn do_normalize_test(
700        start: Millisecond,
701        end: Millisecond,
702        interval: Millisecond,
703        range: Millisecond,
704        expected: String,
705    ) {
706        let memory_exec = Arc::new(prepare_test_data());
707        let time_index = TIME_INDEX_COLUMN.to_string();
708        let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
709        let manipulate_output_schema = SchemaRef::new(
710            RangeManipulate::calculate_output_schema(
711                &memory_exec.schema().to_dfschema_ref().unwrap(),
712                &time_index,
713                &field_columns,
714            )
715            .unwrap()
716            .as_ref()
717            .into(),
718        );
719        let properties = PlanProperties::new(
720            EquivalenceProperties::new(manipulate_output_schema.clone()),
721            Partitioning::UnknownPartitioning(1),
722            EmissionType::Incremental,
723            Boundedness::Bounded,
724        );
725        let normalize_exec = Arc::new(RangeManipulateExec {
726            start,
727            end,
728            interval,
729            range,
730            field_columns,
731            output_schema: manipulate_output_schema,
732            time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
733            time_index_column: time_index,
734            input: memory_exec,
735            metric: ExecutionPlanMetricsSet::new(),
736            properties,
737        });
738        let session_context = SessionContext::default();
739        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
740            .await
741            .unwrap();
742        // DirectoryArray from RangeArray cannot be print as normal arrays.
743        let result_literal: String = result
744            .into_iter()
745            .filter_map(|batch| {
746                batch
747                    .columns()
748                    .iter()
749                    .map(|array| {
750                        if matches!(array.data_type(), &DataType::Dictionary(..)) {
751                            let dict_array = array
752                                .as_any()
753                                .downcast_ref::<DictionaryArray<Int64Type>>()
754                                .unwrap()
755                                .clone();
756                            format!("{:?}", RangeArray::try_new(dict_array).unwrap())
757                        } else {
758                            format!("{array:?}")
759                        }
760                    })
761                    .reduce(|lhs, rhs| lhs + "\n" + &rhs)
762            })
763            .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
764            .unwrap();
765
766        assert_eq!(result_literal, expected);
767    }
768
769    #[tokio::test]
770    async fn interval_30s_range_90s() {
771        let expected = String::from(
772            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
773                1970-01-01T00:00:00,\n  \
774                1970-01-01T00:00:30,\n  \
775                1970-01-01T00:01:00,\n  \
776                1970-01-01T00:01:30,\n  \
777                1970-01-01T00:02:00,\n  \
778                1970-01-01T00:02:30,\n  \
779                1970-01-01T00:03:00,\n  \
780                1970-01-01T00:03:30,\n  \
781                1970-01-01T00:04:00,\n  \
782                1970-01-01T00:04:30,\n  \
783                1970-01-01T00:05:00,\n\
784            ]\nRangeArray { \
785                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
786                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
787            }\nRangeArray { \
788                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
789                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
790            }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
791            RangeArray { \
792                base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
793                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
794            }",
795);
796        do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
797
798        // dump large range
799        do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
800    }
801
802    #[tokio::test]
803    async fn small_empty_range() {
804        let expected = String::from(
805        "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
806            1970-01-01T00:00:00.001,\n  \
807            1970-01-01T00:00:03.001,\n  \
808            1970-01-01T00:00:06.001,\n  \
809            1970-01-01T00:00:09.001,\n\
810        ]\nRangeArray { \
811            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
812            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
813        }\nRangeArray { \
814            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
815            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
816        }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
817        RangeArray { \
818            base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
819            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
820        }");
821        do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
822    }
823}