promql/extension_plan/
range_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37    SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{Stream, StreamExt, ready};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50/// Time series manipulator for range function.
51///
52/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
53/// other columns to the same length with the "folded" [RangeArray] column.
54///
55/// To pass runtime information to the execution plan (or the range function), This plan
56/// will add those extra columns:
57/// - timestamp range with type [RangeArray], which is the folded timestamp column.
58/// - end of current range with the same type as the timestamp column. (todo)
59#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61    start: Millisecond,
62    end: Millisecond,
63    interval: Millisecond,
64    range: Millisecond,
65
66    time_index: String,
67    field_columns: Vec<String>,
68    input: LogicalPlan,
69    output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73    pub fn new(
74        start: Millisecond,
75        end: Millisecond,
76        interval: Millisecond,
77        range: Millisecond,
78        time_index: String,
79        field_columns: Vec<String>,
80        input: LogicalPlan,
81    ) -> DataFusionResult<Self> {
82        let output_schema =
83            Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84        Ok(Self {
85            start,
86            end,
87            interval,
88            range,
89            time_index,
90            field_columns,
91            input,
92            output_schema,
93        })
94    }
95
96    pub const fn name() -> &'static str {
97        "RangeManipulate"
98    }
99
100    pub fn build_timestamp_range_name(time_index: &str) -> String {
101        format!("{time_index}_range")
102    }
103
104    pub fn internal_range_end_col_name() -> String {
105        "__internal_range_end".to_string()
106    }
107
108    fn range_timestamp_name(&self) -> String {
109        Self::build_timestamp_range_name(&self.time_index)
110    }
111
112    fn calculate_output_schema(
113        input_schema: &DFSchemaRef,
114        time_index: &str,
115        field_columns: &[String],
116    ) -> DataFusionResult<DFSchemaRef> {
117        let columns = input_schema.fields();
118        let mut new_columns = Vec::with_capacity(columns.len() + 1);
119        for i in 0..columns.len() {
120            let x = input_schema.qualified_field(i);
121            new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122        }
123
124        // process time index column
125        // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
126        let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127            return Err(datafusion::common::field_not_found(
128                None::<TableReference>,
129                time_index,
130                input_schema.as_ref(),
131            ));
132        };
133        let ts_col_field = &columns[ts_col_index];
134        let timestamp_range_field = Field::new(
135            Self::build_timestamp_range_name(time_index),
136            RangeArray::convert_field(ts_col_field).data_type().clone(),
137            ts_col_field.is_nullable(),
138        );
139        new_columns.push((None, Arc::new(timestamp_range_field)));
140
141        // process value columns
142        for name in field_columns {
143            let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144                return Err(datafusion::common::field_not_found(
145                    None::<TableReference>,
146                    name,
147                    input_schema.as_ref(),
148                ));
149            };
150            new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151        }
152
153        Ok(Arc::new(DFSchema::new_with_metadata(
154            new_columns,
155            HashMap::new(),
156        )?))
157    }
158
159    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160        let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161        let properties = exec_input.properties();
162        let properties = PlanProperties::new(
163            EquivalenceProperties::new(output_schema.clone()),
164            properties.partitioning.clone(),
165            properties.emission_type,
166            properties.boundedness,
167        );
168        Arc::new(RangeManipulateExec {
169            start: self.start,
170            end: self.end,
171            interval: self.interval,
172            range: self.range,
173            time_index_column: self.time_index.clone(),
174            time_range_column: self.range_timestamp_name(),
175            field_columns: self.field_columns.clone(),
176            input: exec_input,
177            output_schema,
178            metric: ExecutionPlanMetricsSet::new(),
179            properties,
180        })
181    }
182
183    pub fn serialize(&self) -> Vec<u8> {
184        pb::RangeManipulate {
185            start: self.start,
186            end: self.end,
187            interval: self.interval,
188            range: self.range,
189            time_index: self.time_index.clone(),
190            tag_columns: self.field_columns.clone(),
191        }
192        .encode_to_vec()
193    }
194
195    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196        let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197        let empty_schema = Arc::new(DFSchema::empty());
198        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199            produce_one_row: false,
200            schema: empty_schema.clone(),
201        });
202
203        // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
204        // because the input schema is empty.
205        // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
206        // deserialized plan.
207        Ok(Self {
208            start: pb_range_manipulate.start,
209            end: pb_range_manipulate.end,
210            interval: pb_range_manipulate.interval,
211            range: pb_range_manipulate.range,
212            time_index: pb_range_manipulate.time_index,
213            field_columns: pb_range_manipulate.tag_columns,
214            input: placeholder_plan,
215            output_schema: empty_schema,
216        })
217    }
218}
219
220impl PartialOrd for RangeManipulate {
221    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222        // Compare fields in order excluding output_schema
223        match self.start.partial_cmp(&other.start) {
224            Some(core::cmp::Ordering::Equal) => {}
225            ord => return ord,
226        }
227        match self.end.partial_cmp(&other.end) {
228            Some(core::cmp::Ordering::Equal) => {}
229            ord => return ord,
230        }
231        match self.interval.partial_cmp(&other.interval) {
232            Some(core::cmp::Ordering::Equal) => {}
233            ord => return ord,
234        }
235        match self.range.partial_cmp(&other.range) {
236            Some(core::cmp::Ordering::Equal) => {}
237            ord => return ord,
238        }
239        match self.time_index.partial_cmp(&other.time_index) {
240            Some(core::cmp::Ordering::Equal) => {}
241            ord => return ord,
242        }
243        match self.field_columns.partial_cmp(&other.field_columns) {
244            Some(core::cmp::Ordering::Equal) => {}
245            ord => return ord,
246        }
247        self.input.partial_cmp(&other.input)
248    }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252    fn name(&self) -> &str {
253        Self::name()
254    }
255
256    fn inputs(&self) -> Vec<&LogicalPlan> {
257        vec![&self.input]
258    }
259
260    fn schema(&self) -> &DFSchemaRef {
261        &self.output_schema
262    }
263
264    fn expressions(&self) -> Vec<Expr> {
265        vec![]
266    }
267
268    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269        write!(
270            f,
271            "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272            self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273        )
274    }
275
276    fn with_exprs_and_inputs(
277        &self,
278        _exprs: Vec<Expr>,
279        mut inputs: Vec<LogicalPlan>,
280    ) -> DataFusionResult<Self> {
281        if inputs.len() != 1 {
282            return Err(DataFusionError::Internal(
283                "RangeManipulate should have at exact one input".to_string(),
284            ));
285        }
286
287        let input: LogicalPlan = inputs.pop().unwrap();
288        let input_schema = input.schema();
289        let output_schema =
290            Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292        Ok(Self {
293            start: self.start,
294            end: self.end,
295            interval: self.interval,
296            range: self.range,
297            time_index: self.time_index.clone(),
298            field_columns: self.field_columns.clone(),
299            input,
300            output_schema,
301        })
302    }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307    start: Millisecond,
308    end: Millisecond,
309    interval: Millisecond,
310    range: Millisecond,
311    time_index_column: String,
312    time_range_column: String,
313    field_columns: Vec<String>,
314
315    input: Arc<dyn ExecutionPlan>,
316    output_schema: SchemaRef,
317    metric: ExecutionPlanMetricsSet,
318    properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322    fn as_any(&self) -> &dyn Any {
323        self
324    }
325
326    fn schema(&self) -> SchemaRef {
327        self.output_schema.clone()
328    }
329
330    fn properties(&self) -> &PlanProperties {
331        &self.properties
332    }
333
334    fn maintains_input_order(&self) -> Vec<bool> {
335        vec![true; self.children().len()]
336    }
337
338    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339        vec![&self.input]
340    }
341
342    fn required_input_distribution(&self) -> Vec<Distribution> {
343        let input_requirement = self.input.required_input_distribution();
344        if input_requirement.is_empty() {
345            // if the input is EmptyMetric, its required_input_distribution() is empty so we can't
346            // use its input distribution.
347            vec![Distribution::UnspecifiedDistribution]
348        } else {
349            input_requirement
350        }
351    }
352
353    fn with_new_children(
354        self: Arc<Self>,
355        children: Vec<Arc<dyn ExecutionPlan>>,
356    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
357        assert!(!children.is_empty());
358        let exec_input = children[0].clone();
359        let properties = exec_input.properties();
360        let properties = PlanProperties::new(
361            EquivalenceProperties::new(self.output_schema.clone()),
362            properties.partitioning.clone(),
363            properties.emission_type,
364            properties.boundedness,
365        );
366        Ok(Arc::new(Self {
367            start: self.start,
368            end: self.end,
369            interval: self.interval,
370            range: self.range,
371            time_index_column: self.time_index_column.clone(),
372            time_range_column: self.time_range_column.clone(),
373            field_columns: self.field_columns.clone(),
374            output_schema: self.output_schema.clone(),
375            input: children[0].clone(),
376            metric: self.metric.clone(),
377            properties,
378        }))
379    }
380
381    fn execute(
382        &self,
383        partition: usize,
384        context: Arc<TaskContext>,
385    ) -> DataFusionResult<SendableRecordBatchStream> {
386        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
387        let metrics_builder = MetricBuilder::new(&self.metric);
388        let num_series = Count::new();
389        metrics_builder
390            .with_partition(partition)
391            .build(MetricValue::Count {
392                name: METRIC_NUM_SERIES.into(),
393                count: num_series.clone(),
394            });
395
396        let input = self.input.execute(partition, context)?;
397        let schema = input.schema();
398        let time_index = schema
399            .column_with_name(&self.time_index_column)
400            .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
401            .0;
402        let field_columns = self
403            .field_columns
404            .iter()
405            .map(|value_col| {
406                schema
407                    .column_with_name(value_col)
408                    .unwrap_or_else(|| panic!("value column {value_col} not found",))
409                    .0
410            })
411            .collect();
412        let aligned_ts_array =
413            RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
414        Ok(Box::pin(RangeManipulateStream {
415            start: self.start,
416            end: self.end,
417            interval: self.interval,
418            range: self.range,
419            time_index,
420            field_columns,
421            aligned_ts_array,
422            output_schema: self.output_schema.clone(),
423            input,
424            metric: baseline_metric,
425            num_series,
426        }))
427    }
428
429    fn metrics(&self) -> Option<MetricsSet> {
430        Some(self.metric.clone_inner())
431    }
432
433    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
434        let input_stats = self.input.partition_statistics(partition)?;
435
436        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
437        let estimated_total_bytes = input_stats
438            .total_byte_size
439            .get_value()
440            .zip(input_stats.num_rows.get_value())
441            .map(|(size, rows)| {
442                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
443            })
444            .unwrap_or_default();
445
446        Ok(Statistics {
447            num_rows: Precision::Inexact(estimated_row_num as _),
448            total_byte_size: estimated_total_bytes,
449            // TODO(ruihang): support this column statistics
450            column_statistics: Statistics::unknown_column(&self.schema()),
451        })
452    }
453
454    fn name(&self) -> &str {
455        "RangeManipulateExec"
456    }
457}
458
459impl DisplayAs for RangeManipulateExec {
460    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
461        match t {
462            DisplayFormatType::Default
463            | DisplayFormatType::Verbose
464            | DisplayFormatType::TreeRender => {
465                write!(
466                    f,
467                    "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
468                    self.start, self.end, self.interval, self.range, self.time_index_column
469                )
470            }
471        }
472    }
473}
474
475pub struct RangeManipulateStream {
476    start: Millisecond,
477    end: Millisecond,
478    interval: Millisecond,
479    range: Millisecond,
480    time_index: usize,
481    field_columns: Vec<usize>,
482    aligned_ts_array: ArrayRef,
483
484    output_schema: SchemaRef,
485    input: SendableRecordBatchStream,
486    metric: BaselineMetrics,
487    /// Number of series processed.
488    num_series: Count,
489}
490
491impl RecordBatchStream for RangeManipulateStream {
492    fn schema(&self) -> SchemaRef {
493        self.output_schema.clone()
494    }
495}
496
497impl Stream for RangeManipulateStream {
498    type Item = DataFusionResult<RecordBatch>;
499
500    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501        let poll = loop {
502            match ready!(self.input.poll_next_unpin(cx)) {
503                Some(Ok(batch)) => {
504                    let timer = std::time::Instant::now();
505                    let result = self.manipulate(batch);
506                    if let Ok(None) = result {
507                        self.metric.elapsed_compute().add_elapsed(timer);
508                        continue;
509                    } else {
510                        self.num_series.add(1);
511                        self.metric.elapsed_compute().add_elapsed(timer);
512                        break Poll::Ready(result.transpose());
513                    }
514                }
515                None => {
516                    PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
517                    break Poll::Ready(None);
518                }
519                Some(Err(e)) => break Poll::Ready(Some(Err(e))),
520            }
521        };
522        self.metric.record_poll(poll)
523    }
524}
525
526impl RangeManipulateStream {
527    // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
528    // But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
529    // And the generated timestamp is not aligned to the step. It's expected to do later.
530    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
531        let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
532        // calculate the range
533        let (ranges, (start, end)) = self.calculate_range(&input)?;
534        // ignore this if all ranges are empty
535        if ranges.iter().all(|(_, len)| *len == 0) {
536            return Ok(None);
537        }
538
539        // transform columns
540        let mut new_columns = input.columns().to_vec();
541        for index in self.field_columns.iter() {
542            let _ = other_columns.remove(index);
543            let column = input.column(*index);
544            let new_column = Arc::new(
545                RangeArray::from_ranges(column.clone(), ranges.clone())
546                    .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
547                    .into_dict(),
548            );
549            new_columns[*index] = new_column;
550        }
551
552        // push timestamp range column
553        let ts_range_column =
554            RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
555                .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
556                .into_dict();
557        new_columns.push(Arc::new(ts_range_column));
558
559        // truncate other columns
560        let take_indices = Int64Array::from(vec![0; ranges.len()]);
561        for index in other_columns.into_iter() {
562            new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
563        }
564        // replace timestamp with the aligned one
565        let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
566            Self::build_aligned_ts_array(start, end, self.interval)
567        } else {
568            self.aligned_ts_array.clone()
569        };
570        new_columns[self.time_index] = new_time_index;
571
572        RecordBatch::try_new(self.output_schema.clone(), new_columns)
573            .map(Some)
574            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
575    }
576
577    fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
578        Arc::new(TimestampMillisecondArray::from_iter_values(
579            (start..=end).step_by(interval as _),
580        ))
581    }
582
583    /// Return values:
584    /// - A vector of tuples where each tuple contains the start index and length of the range.
585    /// - A tuple of the actual start/end timestamp used to calculate the range.
586    #[allow(clippy::type_complexity)]
587    fn calculate_range(
588        &self,
589        input: &RecordBatch,
590    ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
591        let ts_column = input
592            .column(self.time_index)
593            .as_any()
594            .downcast_ref::<TimestampMillisecondArray>()
595            .ok_or_else(|| {
596                DataFusionError::Execution(
597                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
598                )
599            })?;
600
601        let len = ts_column.len();
602        if len == 0 {
603            return Ok((vec![], (self.start, self.end)));
604        }
605
606        // shorten the range to calculate
607        let first_ts = ts_column.value(0);
608        // Preserve the query's alignment pattern when optimizing start time
609        let remainder = (first_ts - self.start).rem_euclid(self.interval);
610        let first_ts_aligned = if remainder == 0 {
611            first_ts
612        } else {
613            first_ts + (self.interval - remainder)
614        };
615        let last_ts = ts_column.value(ts_column.len() - 1);
616        let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
617        let start = self.start.max(first_ts_aligned);
618        let end = self.end.min(last_ts_aligned);
619        if start > end {
620            return Ok((vec![], (start, end)));
621        }
622        let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
623
624        // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
625        let mut range_start_index = 0usize;
626        let mut last_range_start = 0;
627        let mut start_delta = 0;
628        for curr_ts in (start..=end).step_by(self.interval as _) {
629            // determine range start
630            let start_ts = curr_ts - self.range;
631
632            // advance cursor based on last range
633            let mut range_start = ts_column.len();
634            let mut range_end = 0;
635            let mut cursor = range_start_index + start_delta;
636            // search back to keep the result correct
637            while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
638                cursor -= 1;
639            }
640
641            while cursor < ts_column.len() {
642                let ts = ts_column.value(cursor);
643                if range_start > cursor && ts >= start_ts {
644                    range_start = cursor;
645                    range_start_index = range_start;
646                }
647                if ts <= curr_ts {
648                    range_end = range_end.max(cursor);
649                } else {
650                    range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
651                    break;
652                }
653                cursor += 1;
654            }
655            if range_start > range_end {
656                ranges.push((0, 0));
657                start_delta = 0;
658            } else {
659                ranges.push((range_start as _, (range_end + 1 - range_start) as _));
660                start_delta = range_start - last_range_start;
661                last_range_start = range_start;
662            }
663        }
664
665        Ok((ranges, (start, end)))
666    }
667}
668
669#[cfg(test)]
670mod test {
671    use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
672    use datafusion::arrow::datatypes::{
673        ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
674    };
675    use datafusion::common::ToDFSchema;
676    use datafusion::datasource::memory::MemorySourceConfig;
677    use datafusion::datasource::source::DataSourceExec;
678    use datafusion::physical_expr::Partitioning;
679    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
680    use datafusion::physical_plan::memory::MemoryStream;
681    use datafusion::prelude::SessionContext;
682    use datatypes::arrow::array::TimestampMillisecondArray;
683
684    use super::*;
685
686    const TIME_INDEX_COLUMN: &str = "timestamp";
687
688    fn prepare_test_data() -> DataSourceExec {
689        let schema = Arc::new(Schema::new(vec![
690            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
691            Field::new("value_1", DataType::Float64, true),
692            Field::new("value_2", DataType::Float64, true),
693            Field::new("path", DataType::Utf8, true),
694        ]));
695        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
696            0, 30_000, 60_000, 90_000, 120_000, // every 30s
697            180_000, 240_000, // every 60s
698            241_000, 271_000, 291_000, // others
699        ])) as _;
700        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
701        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
702        let data = RecordBatch::try_new(
703            schema.clone(),
704            vec![
705                timestamp_column,
706                field_column.clone(),
707                field_column,
708                path_column,
709            ],
710        )
711        .unwrap();
712
713        DataSourceExec::new(Arc::new(
714            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
715        ))
716    }
717
718    async fn do_normalize_test(
719        start: Millisecond,
720        end: Millisecond,
721        interval: Millisecond,
722        range: Millisecond,
723        expected: String,
724    ) {
725        let memory_exec = Arc::new(prepare_test_data());
726        let time_index = TIME_INDEX_COLUMN.to_string();
727        let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
728        let manipulate_output_schema = SchemaRef::new(
729            RangeManipulate::calculate_output_schema(
730                &memory_exec.schema().to_dfschema_ref().unwrap(),
731                &time_index,
732                &field_columns,
733            )
734            .unwrap()
735            .as_ref()
736            .into(),
737        );
738        let properties = PlanProperties::new(
739            EquivalenceProperties::new(manipulate_output_schema.clone()),
740            Partitioning::UnknownPartitioning(1),
741            EmissionType::Incremental,
742            Boundedness::Bounded,
743        );
744        let normalize_exec = Arc::new(RangeManipulateExec {
745            start,
746            end,
747            interval,
748            range,
749            field_columns,
750            output_schema: manipulate_output_schema,
751            time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
752            time_index_column: time_index,
753            input: memory_exec,
754            metric: ExecutionPlanMetricsSet::new(),
755            properties,
756        });
757        let session_context = SessionContext::default();
758        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
759            .await
760            .unwrap();
761        // DirectoryArray from RangeArray cannot be print as normal arrays.
762        let result_literal: String = result
763            .into_iter()
764            .filter_map(|batch| {
765                batch
766                    .columns()
767                    .iter()
768                    .map(|array| {
769                        if matches!(array.data_type(), &DataType::Dictionary(..)) {
770                            let dict_array = array
771                                .as_any()
772                                .downcast_ref::<DictionaryArray<Int64Type>>()
773                                .unwrap()
774                                .clone();
775                            format!("{:?}", RangeArray::try_new(dict_array).unwrap())
776                        } else {
777                            format!("{array:?}")
778                        }
779                    })
780                    .reduce(|lhs, rhs| lhs + "\n" + &rhs)
781            })
782            .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
783            .unwrap();
784
785        assert_eq!(result_literal, expected);
786    }
787
788    #[tokio::test]
789    async fn interval_30s_range_90s() {
790        let expected = String::from(
791            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
792                1970-01-01T00:00:00,\n  \
793                1970-01-01T00:00:30,\n  \
794                1970-01-01T00:01:00,\n  \
795                1970-01-01T00:01:30,\n  \
796                1970-01-01T00:02:00,\n  \
797                1970-01-01T00:02:30,\n  \
798                1970-01-01T00:03:00,\n  \
799                1970-01-01T00:03:30,\n  \
800                1970-01-01T00:04:00,\n  \
801                1970-01-01T00:04:30,\n  \
802                1970-01-01T00:05:00,\n\
803            ]\nRangeArray { \
804                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
805                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
806            }\nRangeArray { \
807                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
808                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
809            }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
810            RangeArray { \
811                base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
812                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
813            }",
814        );
815        do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
816
817        // dump large range
818        do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
819    }
820
821    #[tokio::test]
822    async fn small_empty_range() {
823        let expected = String::from(
824            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
825            1970-01-01T00:00:00.001,\n  \
826            1970-01-01T00:00:03.001,\n  \
827            1970-01-01T00:00:06.001,\n  \
828            1970-01-01T00:00:09.001,\n\
829        ]\nRangeArray { \
830            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
831            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
832        }\nRangeArray { \
833            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
834            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
835        }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
836        RangeArray { \
837            base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
838            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
839        }",
840        );
841        do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
842    }
843
844    #[test]
845    fn test_calculate_range_preserves_alignment() {
846        // Test case: query starts at timestamp ending in 4000, step is 30s
847        // Data starts at different alignment - should preserve query's 4000 pattern
848        let schema = Arc::new(Schema::new(vec![Field::new(
849            "timestamp",
850            TimestampMillisecondType::DATA_TYPE,
851            false,
852        )]));
853        let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
854
855        let stream = RangeManipulateStream {
856            start: 1758093274000, // ends in 4000
857            end: 1758093334000,   // ends in 4000
858            interval: 30000,      // 30s step
859            range: 60000,         // 60s lookback
860            time_index: 0,
861            field_columns: vec![],
862            aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
863            output_schema: schema.clone(),
864            input: Box::pin(empty_stream),
865            metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
866            num_series: Count::new(),
867        };
868
869        // Create test data with timestamps not aligned to query pattern
870        let test_timestamps = vec![
871            1758093260000, // ends in 0000 (different alignment)
872            1758093290000, // ends in 0000
873            1758093320000, // ends in 0000
874        ];
875        let ts_array = TimestampMillisecondArray::from(test_timestamps);
876        let test_schema = Arc::new(Schema::new(vec![Field::new(
877            "timestamp",
878            TimestampMillisecondType::DATA_TYPE,
879            false,
880        )]));
881        let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
882
883        let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
884
885        // Verify the optimized start preserves query alignment (should end in 4000)
886        assert_eq!(
887            start % 30000,
888            1758093274000 % 30000,
889            "Optimized start should preserve query alignment pattern"
890        );
891
892        // Verify we generate correct number of ranges for the alignment
893        let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
894        assert_eq!(ranges.len(), expected_timestamps.len());
895
896        // Verify all generated timestamps maintain the same alignment pattern
897        for ts in expected_timestamps {
898            assert_eq!(
899                ts % 30000,
900                1758093274000 % 30000,
901                "All timestamps should maintain query alignment pattern"
902            );
903        }
904    }
905}