promql/extension_plan/
range_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37    SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{ready, Stream, StreamExt};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50/// Time series manipulator for range function.
51///
52/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
53/// other columns to the same length with the "folded" [RangeArray] column.
54///
55/// To pass runtime information to the execution plan (or the range function), This plan
56/// will add those extra columns:
57/// - timestamp range with type [RangeArray], which is the folded timestamp column.
58/// - end of current range with the same type as the timestamp column. (todo)
59#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61    start: Millisecond,
62    end: Millisecond,
63    interval: Millisecond,
64    range: Millisecond,
65
66    time_index: String,
67    field_columns: Vec<String>,
68    input: LogicalPlan,
69    output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73    pub fn new(
74        start: Millisecond,
75        end: Millisecond,
76        interval: Millisecond,
77        range: Millisecond,
78        time_index: String,
79        field_columns: Vec<String>,
80        input: LogicalPlan,
81    ) -> DataFusionResult<Self> {
82        let output_schema =
83            Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84        Ok(Self {
85            start,
86            end,
87            interval,
88            range,
89            time_index,
90            field_columns,
91            input,
92            output_schema,
93        })
94    }
95
96    pub const fn name() -> &'static str {
97        "RangeManipulate"
98    }
99
100    pub fn build_timestamp_range_name(time_index: &str) -> String {
101        format!("{time_index}_range")
102    }
103
104    pub fn internal_range_end_col_name() -> String {
105        "__internal_range_end".to_string()
106    }
107
108    fn range_timestamp_name(&self) -> String {
109        Self::build_timestamp_range_name(&self.time_index)
110    }
111
112    fn calculate_output_schema(
113        input_schema: &DFSchemaRef,
114        time_index: &str,
115        field_columns: &[String],
116    ) -> DataFusionResult<DFSchemaRef> {
117        let columns = input_schema.fields();
118        let mut new_columns = Vec::with_capacity(columns.len() + 1);
119        for i in 0..columns.len() {
120            let x = input_schema.qualified_field(i);
121            new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122        }
123
124        // process time index column
125        // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
126        let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127            return Err(datafusion::common::field_not_found(
128                None::<TableReference>,
129                time_index,
130                input_schema.as_ref(),
131            ));
132        };
133        let ts_col_field = &columns[ts_col_index];
134        let timestamp_range_field = Field::new(
135            Self::build_timestamp_range_name(time_index),
136            RangeArray::convert_field(ts_col_field).data_type().clone(),
137            ts_col_field.is_nullable(),
138        );
139        new_columns.push((None, Arc::new(timestamp_range_field)));
140
141        // process value columns
142        for name in field_columns {
143            let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144                return Err(datafusion::common::field_not_found(
145                    None::<TableReference>,
146                    name,
147                    input_schema.as_ref(),
148                ));
149            };
150            new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151        }
152
153        Ok(Arc::new(DFSchema::new_with_metadata(
154            new_columns,
155            HashMap::new(),
156        )?))
157    }
158
159    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160        let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161        let properties = exec_input.properties();
162        let properties = PlanProperties::new(
163            EquivalenceProperties::new(output_schema.clone()),
164            properties.partitioning.clone(),
165            properties.emission_type,
166            properties.boundedness,
167        );
168        Arc::new(RangeManipulateExec {
169            start: self.start,
170            end: self.end,
171            interval: self.interval,
172            range: self.range,
173            time_index_column: self.time_index.clone(),
174            time_range_column: self.range_timestamp_name(),
175            field_columns: self.field_columns.clone(),
176            input: exec_input,
177            output_schema,
178            metric: ExecutionPlanMetricsSet::new(),
179            properties,
180        })
181    }
182
183    pub fn serialize(&self) -> Vec<u8> {
184        pb::RangeManipulate {
185            start: self.start,
186            end: self.end,
187            interval: self.interval,
188            range: self.range,
189            time_index: self.time_index.clone(),
190            tag_columns: self.field_columns.clone(),
191        }
192        .encode_to_vec()
193    }
194
195    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196        let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197        let empty_schema = Arc::new(DFSchema::empty());
198        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199            produce_one_row: false,
200            schema: empty_schema.clone(),
201        });
202
203        // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
204        // because the input schema is empty.
205        // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
206        // deserialized plan.
207        Ok(Self {
208            start: pb_range_manipulate.start,
209            end: pb_range_manipulate.end,
210            interval: pb_range_manipulate.interval,
211            range: pb_range_manipulate.range,
212            time_index: pb_range_manipulate.time_index,
213            field_columns: pb_range_manipulate.tag_columns,
214            input: placeholder_plan,
215            output_schema: empty_schema,
216        })
217    }
218}
219
220impl PartialOrd for RangeManipulate {
221    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222        // Compare fields in order excluding output_schema
223        match self.start.partial_cmp(&other.start) {
224            Some(core::cmp::Ordering::Equal) => {}
225            ord => return ord,
226        }
227        match self.end.partial_cmp(&other.end) {
228            Some(core::cmp::Ordering::Equal) => {}
229            ord => return ord,
230        }
231        match self.interval.partial_cmp(&other.interval) {
232            Some(core::cmp::Ordering::Equal) => {}
233            ord => return ord,
234        }
235        match self.range.partial_cmp(&other.range) {
236            Some(core::cmp::Ordering::Equal) => {}
237            ord => return ord,
238        }
239        match self.time_index.partial_cmp(&other.time_index) {
240            Some(core::cmp::Ordering::Equal) => {}
241            ord => return ord,
242        }
243        match self.field_columns.partial_cmp(&other.field_columns) {
244            Some(core::cmp::Ordering::Equal) => {}
245            ord => return ord,
246        }
247        self.input.partial_cmp(&other.input)
248    }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252    fn name(&self) -> &str {
253        Self::name()
254    }
255
256    fn inputs(&self) -> Vec<&LogicalPlan> {
257        vec![&self.input]
258    }
259
260    fn schema(&self) -> &DFSchemaRef {
261        &self.output_schema
262    }
263
264    fn expressions(&self) -> Vec<Expr> {
265        vec![]
266    }
267
268    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269        write!(
270            f,
271            "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272            self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273        )
274    }
275
276    fn with_exprs_and_inputs(
277        &self,
278        _exprs: Vec<Expr>,
279        mut inputs: Vec<LogicalPlan>,
280    ) -> DataFusionResult<Self> {
281        if inputs.len() != 1 {
282            return Err(DataFusionError::Internal(
283                "RangeManipulate should have at exact one input".to_string(),
284            ));
285        }
286
287        let input: LogicalPlan = inputs.pop().unwrap();
288        let input_schema = input.schema();
289        let output_schema =
290            Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292        Ok(Self {
293            start: self.start,
294            end: self.end,
295            interval: self.interval,
296            range: self.range,
297            time_index: self.time_index.clone(),
298            field_columns: self.field_columns.clone(),
299            input,
300            output_schema,
301        })
302    }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307    start: Millisecond,
308    end: Millisecond,
309    interval: Millisecond,
310    range: Millisecond,
311    time_index_column: String,
312    time_range_column: String,
313    field_columns: Vec<String>,
314
315    input: Arc<dyn ExecutionPlan>,
316    output_schema: SchemaRef,
317    metric: ExecutionPlanMetricsSet,
318    properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322    fn as_any(&self) -> &dyn Any {
323        self
324    }
325
326    fn schema(&self) -> SchemaRef {
327        self.output_schema.clone()
328    }
329
330    fn properties(&self) -> &PlanProperties {
331        &self.properties
332    }
333
334    fn maintains_input_order(&self) -> Vec<bool> {
335        vec![true; self.children().len()]
336    }
337
338    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339        vec![&self.input]
340    }
341
342    fn required_input_distribution(&self) -> Vec<Distribution> {
343        let input_requirement = self.input.required_input_distribution();
344        if input_requirement.is_empty() {
345            // if the input is EmptyMetric, its required_input_distribution() is empty so we can't
346            // use its input distribution.
347            vec![Distribution::UnspecifiedDistribution]
348        } else {
349            input_requirement
350        }
351    }
352
353    fn with_new_children(
354        self: Arc<Self>,
355        children: Vec<Arc<dyn ExecutionPlan>>,
356    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
357        assert!(!children.is_empty());
358        let exec_input = children[0].clone();
359        let properties = exec_input.properties();
360        let properties = PlanProperties::new(
361            EquivalenceProperties::new(self.output_schema.clone()),
362            properties.partitioning.clone(),
363            properties.emission_type,
364            properties.boundedness,
365        );
366        Ok(Arc::new(Self {
367            start: self.start,
368            end: self.end,
369            interval: self.interval,
370            range: self.range,
371            time_index_column: self.time_index_column.clone(),
372            time_range_column: self.time_range_column.clone(),
373            field_columns: self.field_columns.clone(),
374            output_schema: self.output_schema.clone(),
375            input: children[0].clone(),
376            metric: self.metric.clone(),
377            properties,
378        }))
379    }
380
381    fn execute(
382        &self,
383        partition: usize,
384        context: Arc<TaskContext>,
385    ) -> DataFusionResult<SendableRecordBatchStream> {
386        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
387        let metrics_builder = MetricBuilder::new(&self.metric);
388        let num_series = Count::new();
389        metrics_builder
390            .with_partition(partition)
391            .build(MetricValue::Count {
392                name: METRIC_NUM_SERIES.into(),
393                count: num_series.clone(),
394            });
395
396        let input = self.input.execute(partition, context)?;
397        let schema = input.schema();
398        let time_index = schema
399            .column_with_name(&self.time_index_column)
400            .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
401            .0;
402        let field_columns = self
403            .field_columns
404            .iter()
405            .map(|value_col| {
406                schema
407                    .column_with_name(value_col)
408                    .unwrap_or_else(|| panic!("value column {value_col} not found",))
409                    .0
410            })
411            .collect();
412        let aligned_ts_array =
413            RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
414        Ok(Box::pin(RangeManipulateStream {
415            start: self.start,
416            end: self.end,
417            interval: self.interval,
418            range: self.range,
419            time_index,
420            field_columns,
421            aligned_ts_array,
422            output_schema: self.output_schema.clone(),
423            input,
424            metric: baseline_metric,
425            num_series,
426        }))
427    }
428
429    fn metrics(&self) -> Option<MetricsSet> {
430        Some(self.metric.clone_inner())
431    }
432
433    fn statistics(&self) -> DataFusionResult<Statistics> {
434        let input_stats = self.input.statistics()?;
435
436        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
437        let estimated_total_bytes = input_stats
438            .total_byte_size
439            .get_value()
440            .zip(input_stats.num_rows.get_value())
441            .map(|(size, rows)| {
442                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
443            })
444            .unwrap_or_default();
445
446        Ok(Statistics {
447            num_rows: Precision::Inexact(estimated_row_num as _),
448            total_byte_size: estimated_total_bytes,
449            // TODO(ruihang): support this column statistics
450            column_statistics: Statistics::unknown_column(&self.schema()),
451        })
452    }
453
454    fn name(&self) -> &str {
455        "RangeManipulateExec"
456    }
457}
458
459impl DisplayAs for RangeManipulateExec {
460    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
461        match t {
462            DisplayFormatType::Default | DisplayFormatType::Verbose => {
463                write!(
464                    f,
465                    "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
466                   self.start, self.end, self.interval, self.range, self.time_index_column
467                )
468            }
469        }
470    }
471}
472
473pub struct RangeManipulateStream {
474    start: Millisecond,
475    end: Millisecond,
476    interval: Millisecond,
477    range: Millisecond,
478    time_index: usize,
479    field_columns: Vec<usize>,
480    aligned_ts_array: ArrayRef,
481
482    output_schema: SchemaRef,
483    input: SendableRecordBatchStream,
484    metric: BaselineMetrics,
485    /// Number of series processed.
486    num_series: Count,
487}
488
489impl RecordBatchStream for RangeManipulateStream {
490    fn schema(&self) -> SchemaRef {
491        self.output_schema.clone()
492    }
493}
494
495impl Stream for RangeManipulateStream {
496    type Item = DataFusionResult<RecordBatch>;
497
498    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499        let poll = loop {
500            match ready!(self.input.poll_next_unpin(cx)) {
501                Some(Ok(batch)) => {
502                    let timer = std::time::Instant::now();
503                    let result = self.manipulate(batch);
504                    if let Ok(None) = result {
505                        self.metric.elapsed_compute().add_elapsed(timer);
506                        continue;
507                    } else {
508                        self.num_series.add(1);
509                        self.metric.elapsed_compute().add_elapsed(timer);
510                        break Poll::Ready(result.transpose());
511                    }
512                }
513                None => {
514                    PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
515                    break Poll::Ready(None);
516                }
517                Some(Err(e)) => break Poll::Ready(Some(Err(e))),
518            }
519        };
520        self.metric.record_poll(poll)
521    }
522}
523
524impl RangeManipulateStream {
525    // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
526    // But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
527    // And the generated timestamp is not aligned to the step. It's expected to do later.
528    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
529        let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
530        // calculate the range
531        let (ranges, (start, end)) = self.calculate_range(&input)?;
532        // ignore this if all ranges are empty
533        if ranges.iter().all(|(_, len)| *len == 0) {
534            return Ok(None);
535        }
536
537        // transform columns
538        let mut new_columns = input.columns().to_vec();
539        for index in self.field_columns.iter() {
540            let _ = other_columns.remove(index);
541            let column = input.column(*index);
542            let new_column = Arc::new(
543                RangeArray::from_ranges(column.clone(), ranges.clone())
544                    .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
545                    .into_dict(),
546            );
547            new_columns[*index] = new_column;
548        }
549
550        // push timestamp range column
551        let ts_range_column =
552            RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
553                .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
554                .into_dict();
555        new_columns.push(Arc::new(ts_range_column));
556
557        // truncate other columns
558        let take_indices = Int64Array::from(vec![0; ranges.len()]);
559        for index in other_columns.into_iter() {
560            new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
561        }
562        // replace timestamp with the aligned one
563        let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
564            Self::build_aligned_ts_array(start, end, self.interval)
565        } else {
566            self.aligned_ts_array.clone()
567        };
568        new_columns[self.time_index] = new_time_index;
569
570        RecordBatch::try_new(self.output_schema.clone(), new_columns)
571            .map(Some)
572            .map_err(|e| DataFusionError::ArrowError(e, None))
573    }
574
575    fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
576        Arc::new(TimestampMillisecondArray::from_iter_values(
577            (start..=end).step_by(interval as _),
578        ))
579    }
580
581    /// Return values:
582    /// - A vector of tuples where each tuple contains the start index and length of the range.
583    /// - A tuple of the actual start/end timestamp used to calculate the range.
584    #[allow(clippy::type_complexity)]
585    fn calculate_range(
586        &self,
587        input: &RecordBatch,
588    ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
589        let ts_column = input
590            .column(self.time_index)
591            .as_any()
592            .downcast_ref::<TimestampMillisecondArray>()
593            .ok_or_else(|| {
594                DataFusionError::Execution(
595                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
596                )
597            })?;
598
599        let len = ts_column.len();
600        if len == 0 {
601            return Ok((vec![], (self.start, self.end)));
602        }
603
604        // shorten the range to calculate
605        let first_ts = ts_column.value(0);
606        let first_ts_aligned = (first_ts / self.interval) * self.interval;
607        let last_ts = ts_column.value(ts_column.len() - 1);
608        let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
609        let start = self.start.max(first_ts_aligned);
610        let end = self.end.min(last_ts_aligned);
611        if start > end {
612            return Ok((vec![], (start, end)));
613        }
614        let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
615
616        // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
617        let mut range_start_index = 0usize;
618        let mut last_range_start = 0;
619        let mut start_delta = 0;
620        for curr_ts in (start..=end).step_by(self.interval as _) {
621            // determine range start
622            let start_ts = curr_ts - self.range;
623
624            // advance cursor based on last range
625            let mut range_start = ts_column.len();
626            let mut range_end = 0;
627            let mut cursor = range_start_index + start_delta;
628            // search back to keep the result correct
629            while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
630                cursor -= 1;
631            }
632
633            while cursor < ts_column.len() {
634                let ts = ts_column.value(cursor);
635                if range_start > cursor && ts >= start_ts {
636                    range_start = cursor;
637                    range_start_index = range_start;
638                }
639                if ts <= curr_ts {
640                    range_end = range_end.max(cursor);
641                } else {
642                    range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
643                    break;
644                }
645                cursor += 1;
646            }
647            if range_start > range_end {
648                ranges.push((0, 0));
649                start_delta = 0;
650            } else {
651                ranges.push((range_start as _, (range_end + 1 - range_start) as _));
652                start_delta = range_start - last_range_start;
653                last_range_start = range_start;
654            }
655        }
656
657        Ok((ranges, (start, end)))
658    }
659}
660
661#[cfg(test)]
662mod test {
663    use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
664    use datafusion::arrow::datatypes::{
665        ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
666    };
667    use datafusion::common::ToDFSchema;
668    use datafusion::physical_expr::Partitioning;
669    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
670    use datafusion::physical_plan::memory::MemoryExec;
671    use datafusion::prelude::SessionContext;
672    use datatypes::arrow::array::TimestampMillisecondArray;
673
674    use super::*;
675
676    const TIME_INDEX_COLUMN: &str = "timestamp";
677
678    fn prepare_test_data() -> MemoryExec {
679        let schema = Arc::new(Schema::new(vec![
680            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
681            Field::new("value_1", DataType::Float64, true),
682            Field::new("value_2", DataType::Float64, true),
683            Field::new("path", DataType::Utf8, true),
684        ]));
685        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
686            0, 30_000, 60_000, 90_000, 120_000, // every 30s
687            180_000, 240_000, // every 60s
688            241_000, 271_000, 291_000, // others
689        ])) as _;
690        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
691        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
692        let data = RecordBatch::try_new(
693            schema.clone(),
694            vec![
695                timestamp_column,
696                field_column.clone(),
697                field_column,
698                path_column,
699            ],
700        )
701        .unwrap();
702
703        MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
704    }
705
706    async fn do_normalize_test(
707        start: Millisecond,
708        end: Millisecond,
709        interval: Millisecond,
710        range: Millisecond,
711        expected: String,
712    ) {
713        let memory_exec = Arc::new(prepare_test_data());
714        let time_index = TIME_INDEX_COLUMN.to_string();
715        let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
716        let manipulate_output_schema = SchemaRef::new(
717            RangeManipulate::calculate_output_schema(
718                &memory_exec.schema().to_dfschema_ref().unwrap(),
719                &time_index,
720                &field_columns,
721            )
722            .unwrap()
723            .as_ref()
724            .into(),
725        );
726        let properties = PlanProperties::new(
727            EquivalenceProperties::new(manipulate_output_schema.clone()),
728            Partitioning::UnknownPartitioning(1),
729            EmissionType::Incremental,
730            Boundedness::Bounded,
731        );
732        let normalize_exec = Arc::new(RangeManipulateExec {
733            start,
734            end,
735            interval,
736            range,
737            field_columns,
738            output_schema: manipulate_output_schema,
739            time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
740            time_index_column: time_index,
741            input: memory_exec,
742            metric: ExecutionPlanMetricsSet::new(),
743            properties,
744        });
745        let session_context = SessionContext::default();
746        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
747            .await
748            .unwrap();
749        // DirectoryArray from RangeArray cannot be print as normal arrays.
750        let result_literal: String = result
751            .into_iter()
752            .filter_map(|batch| {
753                batch
754                    .columns()
755                    .iter()
756                    .map(|array| {
757                        if matches!(array.data_type(), &DataType::Dictionary(..)) {
758                            let dict_array = array
759                                .as_any()
760                                .downcast_ref::<DictionaryArray<Int64Type>>()
761                                .unwrap()
762                                .clone();
763                            format!("{:?}", RangeArray::try_new(dict_array).unwrap())
764                        } else {
765                            format!("{array:?}")
766                        }
767                    })
768                    .reduce(|lhs, rhs| lhs + "\n" + &rhs)
769            })
770            .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
771            .unwrap();
772
773        assert_eq!(result_literal, expected);
774    }
775
776    #[tokio::test]
777    async fn interval_30s_range_90s() {
778        let expected = String::from(
779            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
780                1970-01-01T00:00:00,\n  \
781                1970-01-01T00:00:30,\n  \
782                1970-01-01T00:01:00,\n  \
783                1970-01-01T00:01:30,\n  \
784                1970-01-01T00:02:00,\n  \
785                1970-01-01T00:02:30,\n  \
786                1970-01-01T00:03:00,\n  \
787                1970-01-01T00:03:30,\n  \
788                1970-01-01T00:04:00,\n  \
789                1970-01-01T00:04:30,\n  \
790                1970-01-01T00:05:00,\n\
791            ]\nRangeArray { \
792                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
793                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
794            }\nRangeArray { \
795                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
796                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
797            }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
798            RangeArray { \
799                base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
800                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
801            }",
802);
803        do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
804
805        // dump large range
806        do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
807    }
808
809    #[tokio::test]
810    async fn small_empty_range() {
811        let expected = String::from(
812        "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
813            1970-01-01T00:00:00.001,\n  \
814            1970-01-01T00:00:03.001,\n  \
815            1970-01-01T00:00:06.001,\n  \
816            1970-01-01T00:00:09.001,\n\
817        ]\nRangeArray { \
818            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
819            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
820        }\nRangeArray { \
821            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
822            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
823        }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
824        RangeArray { \
825            base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
826            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
827        }");
828        do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
829    }
830}