promql/extension_plan/
range_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::{debug, warn};
22use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
23use datafusion::arrow::compute;
24use datafusion::arrow::datatypes::{Field, SchemaRef};
25use datafusion::arrow::error::ArrowError;
26use datafusion::arrow::record_batch::RecordBatch;
27use datafusion::common::stats::Precision;
28use datafusion::common::{DFSchema, DFSchemaRef};
29use datafusion::error::{DataFusionError, Result as DataFusionResult};
30use datafusion::execution::context::TaskContext;
31use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::EquivalenceProperties;
33use datafusion::physical_plan::metrics::{
34    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
35};
36use datafusion::physical_plan::{
37    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
38    SendableRecordBatchStream, Statistics,
39};
40use datafusion::sql::TableReference;
41use datafusion_expr::col;
42use futures::{Stream, StreamExt, ready};
43use greptime_proto::substrait_extension as pb;
44use prost::Message;
45use snafu::ResultExt;
46
47use crate::error::{DeserializeSnafu, Result};
48use crate::extension_plan::{
49    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
50};
51use crate::metrics::PROMQL_SERIES_COUNT;
52use crate::range_array::RangeArray;
53
54/// Time series manipulator for range function.
55///
56/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
57/// other columns to the same length with the "folded" [RangeArray] column.
58///
59/// To pass runtime information to the execution plan (or the range function), This plan
60/// will add those extra columns:
61/// - timestamp range with type [RangeArray], which is the folded timestamp column.
62/// - end of current range with the same type as the timestamp column. (todo)
63#[derive(Debug, PartialEq, Eq, Hash)]
64pub struct RangeManipulate {
65    start: Millisecond,
66    end: Millisecond,
67    interval: Millisecond,
68    range: Millisecond,
69    time_index: String,
70    field_columns: Vec<String>,
71    input: LogicalPlan,
72    output_schema: DFSchemaRef,
73    unfix: Option<UnfixIndices>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct UnfixIndices {
78    pub time_index_idx: u64,
79    pub tag_column_indices: Vec<u64>,
80}
81
82impl RangeManipulate {
83    pub fn new(
84        start: Millisecond,
85        end: Millisecond,
86        interval: Millisecond,
87        range: Millisecond,
88        time_index: String,
89        field_columns: Vec<String>,
90        input: LogicalPlan,
91    ) -> DataFusionResult<Self> {
92        let output_schema =
93            Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
94        Ok(Self {
95            start,
96            end,
97            interval,
98            range,
99            time_index,
100            field_columns,
101            input,
102            output_schema,
103            unfix: None,
104        })
105    }
106
107    pub const fn name() -> &'static str {
108        "RangeManipulate"
109    }
110
111    pub fn build_timestamp_range_name(time_index: &str) -> String {
112        format!("{time_index}_range")
113    }
114
115    pub fn internal_range_end_col_name() -> String {
116        "__internal_range_end".to_string()
117    }
118
119    fn range_timestamp_name(&self) -> String {
120        Self::build_timestamp_range_name(&self.time_index)
121    }
122
123    fn calculate_output_schema(
124        input_schema: &DFSchemaRef,
125        time_index: &str,
126        field_columns: &[String],
127    ) -> DataFusionResult<DFSchemaRef> {
128        let columns = input_schema.fields();
129        let mut new_columns = Vec::with_capacity(columns.len() + 1);
130        for i in 0..columns.len() {
131            let x = input_schema.qualified_field(i);
132            new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
133        }
134
135        // process time index column
136        // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
137        let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
138            return Err(datafusion::common::field_not_found(
139                None::<TableReference>,
140                time_index,
141                input_schema.as_ref(),
142            ));
143        };
144        let ts_col_field = &columns[ts_col_index];
145        let timestamp_range_field = Field::new(
146            Self::build_timestamp_range_name(time_index),
147            RangeArray::convert_field(ts_col_field).data_type().clone(),
148            ts_col_field.is_nullable(),
149        );
150        new_columns.push((None, Arc::new(timestamp_range_field)));
151
152        // process value columns
153        for name in field_columns {
154            let Some(index) = input_schema.index_of_column_by_name(None, name) else {
155                return Err(datafusion::common::field_not_found(
156                    None::<TableReference>,
157                    name,
158                    input_schema.as_ref(),
159                ));
160            };
161            new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
162        }
163
164        Ok(Arc::new(DFSchema::new_with_metadata(
165            new_columns,
166            HashMap::new(),
167        )?))
168    }
169
170    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
171        let output_schema: SchemaRef = self.output_schema.inner().clone();
172        let properties = exec_input.properties();
173        let properties = PlanProperties::new(
174            EquivalenceProperties::new(output_schema.clone()),
175            properties.partitioning.clone(),
176            properties.emission_type,
177            properties.boundedness,
178        );
179        Arc::new(RangeManipulateExec {
180            start: self.start,
181            end: self.end,
182            interval: self.interval,
183            range: self.range,
184            time_index_column: self.time_index.clone(),
185            time_range_column: self.range_timestamp_name(),
186            field_columns: self.field_columns.clone(),
187            input: exec_input,
188            output_schema,
189            metric: ExecutionPlanMetricsSet::new(),
190            properties,
191        })
192    }
193
194    pub fn serialize(&self) -> Vec<u8> {
195        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
196
197        let tag_column_indices = self
198            .field_columns
199            .iter()
200            .map(|name| serialize_column_index(self.input.schema(), name))
201            .collect::<Vec<u64>>();
202
203        pb::RangeManipulate {
204            start: self.start,
205            end: self.end,
206            interval: self.interval,
207            range: self.range,
208            time_index_idx,
209            tag_column_indices,
210            ..Default::default()
211        }
212        .encode_to_vec()
213    }
214
215    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
216        let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
217        let empty_schema = Arc::new(DFSchema::empty());
218        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
219            produce_one_row: false,
220            schema: empty_schema.clone(),
221        });
222
223        let unfix = UnfixIndices {
224            time_index_idx: pb_range_manipulate.time_index_idx,
225            tag_column_indices: pb_range_manipulate.tag_column_indices.clone(),
226        };
227        debug!("RangeManipulate deserialize unfix: {:?}", unfix);
228
229        // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
230        // because the input schema is empty.
231        // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
232        // deserialized plan.
233        Ok(Self {
234            start: pb_range_manipulate.start,
235            end: pb_range_manipulate.end,
236            interval: pb_range_manipulate.interval,
237            range: pb_range_manipulate.range,
238            time_index: String::new(),
239            field_columns: Vec::new(),
240            input: placeholder_plan,
241            output_schema: empty_schema,
242            unfix: Some(unfix),
243        })
244    }
245}
246
247impl PartialOrd for RangeManipulate {
248    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
249        // Compare fields in order excluding output_schema
250        match self.start.partial_cmp(&other.start) {
251            Some(core::cmp::Ordering::Equal) => {}
252            ord => return ord,
253        }
254        match self.end.partial_cmp(&other.end) {
255            Some(core::cmp::Ordering::Equal) => {}
256            ord => return ord,
257        }
258        match self.interval.partial_cmp(&other.interval) {
259            Some(core::cmp::Ordering::Equal) => {}
260            ord => return ord,
261        }
262        match self.range.partial_cmp(&other.range) {
263            Some(core::cmp::Ordering::Equal) => {}
264            ord => return ord,
265        }
266        match self.time_index.partial_cmp(&other.time_index) {
267            Some(core::cmp::Ordering::Equal) => {}
268            ord => return ord,
269        }
270        match self.field_columns.partial_cmp(&other.field_columns) {
271            Some(core::cmp::Ordering::Equal) => {}
272            ord => return ord,
273        }
274        self.input.partial_cmp(&other.input)
275    }
276}
277
278impl UserDefinedLogicalNodeCore for RangeManipulate {
279    fn name(&self) -> &str {
280        Self::name()
281    }
282
283    fn inputs(&self) -> Vec<&LogicalPlan> {
284        vec![&self.input]
285    }
286
287    fn schema(&self) -> &DFSchemaRef {
288        &self.output_schema
289    }
290
291    fn expressions(&self) -> Vec<Expr> {
292        if self.unfix.is_some() {
293            return vec![];
294        }
295
296        let mut exprs = Vec::with_capacity(1 + self.field_columns.len());
297        exprs.push(col(&self.time_index));
298        exprs.extend(self.field_columns.iter().map(col));
299        exprs
300    }
301
302    fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
303        if self.unfix.is_some() {
304            return None;
305        }
306
307        let input_schema = self.input.schema();
308        let input_len = input_schema.fields().len();
309        let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
310
311        if output_columns.is_empty() {
312            let indices = (0..input_len).collect::<Vec<_>>();
313            return Some(vec![indices]);
314        }
315
316        let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len());
317        required.push(time_index_idx);
318        for value_column in &self.field_columns {
319            required.push(input_schema.index_of_column_by_name(None, value_column)?);
320        }
321        for &idx in output_columns {
322            if idx < input_len {
323                required.push(idx);
324            } else if idx == input_len {
325                // Derived timestamp range column.
326                required.push(time_index_idx);
327            } else {
328                warn!(
329                    "Output column index {} is out of bounds for input schema with length {}",
330                    idx, input_len
331                );
332                return None;
333            }
334        }
335
336        required.sort_unstable();
337        required.dedup();
338        Some(vec![required])
339    }
340
341    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
342        write!(
343            f,
344            "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
345            self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
346        )
347    }
348
349    fn with_exprs_and_inputs(
350        &self,
351        _exprs: Vec<Expr>,
352        mut inputs: Vec<LogicalPlan>,
353    ) -> DataFusionResult<Self> {
354        if inputs.len() != 1 {
355            return Err(DataFusionError::Internal(
356                "RangeManipulate should have at exact one input".to_string(),
357            ));
358        }
359
360        let input: LogicalPlan = inputs.pop().unwrap();
361        let input_schema = input.schema();
362
363        if let Some(unfix) = &self.unfix {
364            // transform indices to names
365            let time_index = resolve_column_name(
366                unfix.time_index_idx,
367                input_schema,
368                "RangeManipulate",
369                "time index",
370            )?;
371
372            let field_columns = unfix
373                .tag_column_indices
374                .iter()
375                .map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag"))
376                .collect::<DataFusionResult<Vec<String>>>()?;
377
378            let output_schema =
379                Self::calculate_output_schema(input_schema, &time_index, &field_columns)?;
380
381            Ok(Self {
382                start: self.start,
383                end: self.end,
384                interval: self.interval,
385                range: self.range,
386                time_index,
387                field_columns,
388                input,
389                output_schema,
390                unfix: None,
391            })
392        } else {
393            let output_schema =
394                Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
395
396            Ok(Self {
397                start: self.start,
398                end: self.end,
399                interval: self.interval,
400                range: self.range,
401                time_index: self.time_index.clone(),
402                field_columns: self.field_columns.clone(),
403                input,
404                output_schema,
405                unfix: None,
406            })
407        }
408    }
409}
410
411#[derive(Debug)]
412pub struct RangeManipulateExec {
413    start: Millisecond,
414    end: Millisecond,
415    interval: Millisecond,
416    range: Millisecond,
417    time_index_column: String,
418    time_range_column: String,
419    field_columns: Vec<String>,
420
421    input: Arc<dyn ExecutionPlan>,
422    output_schema: SchemaRef,
423    metric: ExecutionPlanMetricsSet,
424    properties: PlanProperties,
425}
426
427impl ExecutionPlan for RangeManipulateExec {
428    fn as_any(&self) -> &dyn Any {
429        self
430    }
431
432    fn schema(&self) -> SchemaRef {
433        self.output_schema.clone()
434    }
435
436    fn properties(&self) -> &PlanProperties {
437        &self.properties
438    }
439
440    fn maintains_input_order(&self) -> Vec<bool> {
441        vec![true; self.children().len()]
442    }
443
444    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
445        vec![&self.input]
446    }
447
448    fn required_input_distribution(&self) -> Vec<Distribution> {
449        let input_requirement = self.input.required_input_distribution();
450        if input_requirement.is_empty() {
451            // if the input is EmptyMetric, its required_input_distribution() is empty so we can't
452            // use its input distribution.
453            vec![Distribution::UnspecifiedDistribution]
454        } else {
455            input_requirement
456        }
457    }
458
459    fn with_new_children(
460        self: Arc<Self>,
461        children: Vec<Arc<dyn ExecutionPlan>>,
462    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
463        assert!(!children.is_empty());
464        let exec_input = children[0].clone();
465        let properties = exec_input.properties();
466        let properties = PlanProperties::new(
467            EquivalenceProperties::new(self.output_schema.clone()),
468            properties.partitioning.clone(),
469            properties.emission_type,
470            properties.boundedness,
471        );
472        Ok(Arc::new(Self {
473            start: self.start,
474            end: self.end,
475            interval: self.interval,
476            range: self.range,
477            time_index_column: self.time_index_column.clone(),
478            time_range_column: self.time_range_column.clone(),
479            field_columns: self.field_columns.clone(),
480            output_schema: self.output_schema.clone(),
481            input: children[0].clone(),
482            metric: self.metric.clone(),
483            properties,
484        }))
485    }
486
487    fn execute(
488        &self,
489        partition: usize,
490        context: Arc<TaskContext>,
491    ) -> DataFusionResult<SendableRecordBatchStream> {
492        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
493        let metrics_builder = MetricBuilder::new(&self.metric);
494        let num_series = Count::new();
495        metrics_builder
496            .with_partition(partition)
497            .build(MetricValue::Count {
498                name: METRIC_NUM_SERIES.into(),
499                count: num_series.clone(),
500            });
501
502        let input = self.input.execute(partition, context)?;
503        let schema = input.schema();
504        let time_index = schema
505            .column_with_name(&self.time_index_column)
506            .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
507            .0;
508        let field_columns = self
509            .field_columns
510            .iter()
511            .map(|value_col| {
512                schema
513                    .column_with_name(value_col)
514                    .unwrap_or_else(|| panic!("value column {value_col} not found",))
515                    .0
516            })
517            .collect();
518        let aligned_ts_array =
519            RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
520        Ok(Box::pin(RangeManipulateStream {
521            start: self.start,
522            end: self.end,
523            interval: self.interval,
524            range: self.range,
525            time_index,
526            field_columns,
527            aligned_ts_array,
528            output_schema: self.output_schema.clone(),
529            input,
530            metric: baseline_metric,
531            num_series,
532        }))
533    }
534
535    fn metrics(&self) -> Option<MetricsSet> {
536        Some(self.metric.clone_inner())
537    }
538
539    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
540        let input_stats = self.input.partition_statistics(partition)?;
541
542        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
543        let estimated_total_bytes = input_stats
544            .total_byte_size
545            .get_value()
546            .zip(input_stats.num_rows.get_value())
547            .map(|(size, rows)| {
548                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
549            })
550            .unwrap_or_default();
551
552        Ok(Statistics {
553            num_rows: Precision::Inexact(estimated_row_num as _),
554            total_byte_size: estimated_total_bytes,
555            // TODO(ruihang): support this column statistics
556            column_statistics: Statistics::unknown_column(&self.schema()),
557        })
558    }
559
560    fn name(&self) -> &str {
561        "RangeManipulateExec"
562    }
563}
564
565impl DisplayAs for RangeManipulateExec {
566    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
567        match t {
568            DisplayFormatType::Default
569            | DisplayFormatType::Verbose
570            | DisplayFormatType::TreeRender => {
571                write!(
572                    f,
573                    "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
574                    self.start, self.end, self.interval, self.range, self.time_index_column
575                )
576            }
577        }
578    }
579}
580
581pub struct RangeManipulateStream {
582    start: Millisecond,
583    end: Millisecond,
584    interval: Millisecond,
585    range: Millisecond,
586    time_index: usize,
587    field_columns: Vec<usize>,
588    aligned_ts_array: ArrayRef,
589
590    output_schema: SchemaRef,
591    input: SendableRecordBatchStream,
592    metric: BaselineMetrics,
593    /// Number of series processed.
594    num_series: Count,
595}
596
597impl RecordBatchStream for RangeManipulateStream {
598    fn schema(&self) -> SchemaRef {
599        self.output_schema.clone()
600    }
601}
602
603impl Stream for RangeManipulateStream {
604    type Item = DataFusionResult<RecordBatch>;
605
606    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
607        let poll = loop {
608            match ready!(self.input.poll_next_unpin(cx)) {
609                Some(Ok(batch)) => {
610                    let timer = std::time::Instant::now();
611                    let result = self.manipulate(batch);
612                    if let Ok(None) = result {
613                        self.metric.elapsed_compute().add_elapsed(timer);
614                        continue;
615                    } else {
616                        self.num_series.add(1);
617                        self.metric.elapsed_compute().add_elapsed(timer);
618                        break Poll::Ready(result.transpose());
619                    }
620                }
621                None => {
622                    PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
623                    break Poll::Ready(None);
624                }
625                Some(Err(e)) => break Poll::Ready(Some(Err(e))),
626            }
627        };
628        self.metric.record_poll(poll)
629    }
630}
631
632impl RangeManipulateStream {
633    // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
634    // But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
635    // And the generated timestamp is not aligned to the step. It's expected to do later.
636    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
637        let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
638        // calculate the range
639        let (ranges, (start, end)) = self.calculate_range(&input)?;
640        // ignore this if all ranges are empty
641        if ranges.iter().all(|(_, len)| *len == 0) {
642            return Ok(None);
643        }
644
645        // transform columns
646        let mut new_columns = input.columns().to_vec();
647        for index in self.field_columns.iter() {
648            let _ = other_columns.remove(index);
649            let column = input.column(*index);
650            let new_column = Arc::new(
651                RangeArray::from_ranges(column.clone(), ranges.clone())
652                    .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
653                    .into_dict(),
654            );
655            new_columns[*index] = new_column;
656        }
657
658        // push timestamp range column
659        let ts_range_column =
660            RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
661                .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
662                .into_dict();
663        new_columns.push(Arc::new(ts_range_column));
664
665        // truncate other columns
666        let take_indices = Int64Array::from(vec![0; ranges.len()]);
667        for index in other_columns.into_iter() {
668            new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
669        }
670        // replace timestamp with the aligned one
671        let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
672            Self::build_aligned_ts_array(start, end, self.interval)
673        } else {
674            self.aligned_ts_array.clone()
675        };
676        new_columns[self.time_index] = new_time_index;
677
678        RecordBatch::try_new(self.output_schema.clone(), new_columns)
679            .map(Some)
680            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
681    }
682
683    fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
684        Arc::new(TimestampMillisecondArray::from_iter_values(
685            (start..=end).step_by(interval as _),
686        ))
687    }
688
689    /// Return values:
690    /// - A vector of tuples where each tuple contains the start index and length of the range.
691    /// - A tuple of the actual start/end timestamp used to calculate the range.
692    #[allow(clippy::type_complexity)]
693    fn calculate_range(
694        &self,
695        input: &RecordBatch,
696    ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
697        let ts_column = input
698            .column(self.time_index)
699            .as_any()
700            .downcast_ref::<TimestampMillisecondArray>()
701            .ok_or_else(|| {
702                DataFusionError::Execution(
703                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
704                )
705            })?;
706
707        let len = ts_column.len();
708        if len == 0 {
709            return Ok((vec![], (self.start, self.end)));
710        }
711
712        // shorten the range to calculate
713        let first_ts = ts_column.value(0);
714        // Preserve the query's alignment pattern when optimizing start time
715        let remainder = (first_ts - self.start).rem_euclid(self.interval);
716        let first_ts_aligned = if remainder == 0 {
717            first_ts
718        } else {
719            first_ts + (self.interval - remainder)
720        };
721        let last_ts = ts_column.value(ts_column.len() - 1);
722        let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
723        let start = self.start.max(first_ts_aligned);
724        let end = self.end.min(last_ts_aligned);
725        if start > end {
726            return Ok((vec![], (start, end)));
727        }
728        let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
729
730        // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
731        let mut range_start_index = 0usize;
732        let mut last_range_start = 0;
733        let mut start_delta = 0;
734        for curr_ts in (start..=end).step_by(self.interval as _) {
735            // determine range start
736            let start_ts = curr_ts - self.range;
737
738            // advance cursor based on last range
739            let mut range_start = ts_column.len();
740            let mut range_end = 0;
741            let mut cursor = range_start_index + start_delta;
742            // search back to keep the result correct
743            while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
744                cursor -= 1;
745            }
746
747            while cursor < ts_column.len() {
748                let ts = ts_column.value(cursor);
749                if range_start > cursor && ts >= start_ts {
750                    range_start = cursor;
751                    range_start_index = range_start;
752                }
753                if ts <= curr_ts {
754                    range_end = range_end.max(cursor);
755                } else {
756                    range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
757                    break;
758                }
759                cursor += 1;
760            }
761            if range_start > range_end {
762                ranges.push((0, 0));
763                start_delta = 0;
764            } else {
765                ranges.push((range_start as _, (range_end + 1 - range_start) as _));
766                start_delta = range_start - last_range_start;
767                last_range_start = range_start;
768            }
769        }
770
771        Ok((ranges, (start, end)))
772    }
773}
774
775#[cfg(test)]
776mod test {
777    use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
778    use datafusion::arrow::datatypes::{
779        ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
780    };
781    use datafusion::common::ToDFSchema;
782    use datafusion::datasource::memory::MemorySourceConfig;
783    use datafusion::datasource::source::DataSourceExec;
784    use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
785    use datafusion::physical_expr::Partitioning;
786    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
787    use datafusion::physical_plan::memory::MemoryStream;
788    use datafusion::prelude::SessionContext;
789    use datatypes::arrow::array::TimestampMillisecondArray;
790    use futures::FutureExt;
791
792    use super::*;
793
794    const TIME_INDEX_COLUMN: &str = "timestamp";
795
796    fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
797        let fields = indices
798            .iter()
799            .map(|&idx| batch.schema().field(idx).clone())
800            .collect::<Vec<_>>();
801        let columns = indices
802            .iter()
803            .map(|&idx| batch.column(idx).clone())
804            .collect::<Vec<_>>();
805        let schema = Arc::new(Schema::new(fields));
806        RecordBatch::try_new(schema, columns).unwrap()
807    }
808
809    fn prepare_test_data() -> DataSourceExec {
810        let schema = Arc::new(Schema::new(vec![
811            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
812            Field::new("value_1", DataType::Float64, true),
813            Field::new("value_2", DataType::Float64, true),
814            Field::new("path", DataType::Utf8, true),
815        ]));
816        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
817            0, 30_000, 60_000, 90_000, 120_000, // every 30s
818            180_000, 240_000, // every 60s
819            241_000, 271_000, 291_000, // others
820        ])) as _;
821        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
822        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
823        let data = RecordBatch::try_new(
824            schema.clone(),
825            vec![
826                timestamp_column,
827                field_column.clone(),
828                field_column,
829                path_column,
830            ],
831        )
832        .unwrap();
833
834        DataSourceExec::new(Arc::new(
835            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
836        ))
837    }
838
839    async fn do_normalize_test(
840        start: Millisecond,
841        end: Millisecond,
842        interval: Millisecond,
843        range: Millisecond,
844        expected: String,
845    ) {
846        let memory_exec = Arc::new(prepare_test_data());
847        let time_index = TIME_INDEX_COLUMN.to_string();
848        let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
849        let manipulate_output_schema = SchemaRef::new(
850            RangeManipulate::calculate_output_schema(
851                &memory_exec.schema().to_dfschema_ref().unwrap(),
852                &time_index,
853                &field_columns,
854            )
855            .unwrap()
856            .as_arrow()
857            .clone(),
858        );
859        let properties = PlanProperties::new(
860            EquivalenceProperties::new(manipulate_output_schema.clone()),
861            Partitioning::UnknownPartitioning(1),
862            EmissionType::Incremental,
863            Boundedness::Bounded,
864        );
865        let normalize_exec = Arc::new(RangeManipulateExec {
866            start,
867            end,
868            interval,
869            range,
870            field_columns,
871            output_schema: manipulate_output_schema,
872            time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
873            time_index_column: time_index,
874            input: memory_exec,
875            metric: ExecutionPlanMetricsSet::new(),
876            properties,
877        });
878        let session_context = SessionContext::default();
879        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
880            .await
881            .unwrap();
882        // DirectoryArray from RangeArray cannot be print as normal arrays.
883        let result_literal: String = result
884            .into_iter()
885            .filter_map(|batch| {
886                batch
887                    .columns()
888                    .iter()
889                    .map(|array| {
890                        if matches!(array.data_type(), &DataType::Dictionary(..)) {
891                            let dict_array = array
892                                .as_any()
893                                .downcast_ref::<DictionaryArray<Int64Type>>()
894                                .unwrap()
895                                .clone();
896                            format!("{:?}", RangeArray::try_new(dict_array).unwrap())
897                        } else {
898                            format!("{array:?}")
899                        }
900                    })
901                    .reduce(|lhs, rhs| lhs + "\n" + &rhs)
902            })
903            .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
904            .unwrap();
905
906        assert_eq!(result_literal, expected);
907    }
908
909    #[tokio::test]
910    async fn pruning_should_keep_time_and_value_columns_for_exec() {
911        let schema = Arc::new(Schema::new(vec![
912            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
913            Field::new("value_1", DataType::Float64, true),
914            Field::new("value_2", DataType::Float64, true),
915            Field::new("path", DataType::Utf8, true),
916        ]));
917        let df_schema = schema.clone().to_dfschema_ref().unwrap();
918        let input = LogicalPlan::EmptyRelation(EmptyRelation {
919            produce_one_row: false,
920            schema: df_schema,
921        });
922        let plan = RangeManipulate::new(
923            0,
924            310_000,
925            30_000,
926            90_000,
927            TIME_INDEX_COLUMN.to_string(),
928            vec!["value_1".to_string(), "value_2".to_string()],
929            input,
930        )
931        .unwrap();
932
933        // Simulate a parent projection requesting only the `path` column.
934        let output_columns = [3usize];
935        let required = plan.necessary_children_exprs(&output_columns).unwrap();
936        let required = &required[0];
937        assert_eq!(required.as_slice(), &[0, 1, 2, 3]);
938
939        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
940            0, 30_000, 60_000, 90_000, 120_000, // every 30s
941            180_000, 240_000, // every 60s
942            241_000, 271_000, 291_000, // others
943        ])) as _;
944        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
945        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
946        let input_batch = RecordBatch::try_new(
947            schema,
948            vec![
949                timestamp_column,
950                field_column.clone(),
951                field_column,
952                path_column,
953            ],
954        )
955        .unwrap();
956
957        let projected = project_batch(&input_batch, required);
958        let projected_schema = projected.schema();
959        let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
960            MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
961        )));
962        let range_exec = plan.to_execution_plan(memory_exec);
963        let session_context = SessionContext::default();
964        let output_batches =
965            datafusion::physical_plan::collect(range_exec, session_context.task_ctx())
966                .await
967                .unwrap();
968        assert_eq!(output_batches.len(), 1);
969
970        let output_batch = &output_batches[0];
971        let path = output_batch
972            .column(3)
973            .as_any()
974            .downcast_ref::<StringArray>()
975            .unwrap();
976        assert!(path.iter().all(|v| v == Some("foo")));
977
978        // Simulate the pre-fix pruning behavior: omit the timestamp/value columns from the child.
979        let broken_required = [3usize];
980        let broken = project_batch(&input_batch, &broken_required);
981        let broken_schema = broken.schema();
982        let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
983            MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
984        )));
985        let broken_range_exec = plan.to_execution_plan(broken_exec);
986        let session_context = SessionContext::default();
987        let broken_result = std::panic::AssertUnwindSafe(async {
988            datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await
989        })
990        .catch_unwind()
991        .await;
992        assert!(broken_result.is_err());
993    }
994
995    #[tokio::test]
996    async fn interval_30s_range_90s() {
997        let expected = String::from(
998            "PrimitiveArray<Timestamp(ms)>\n[\n  \
999                1970-01-01T00:00:00,\n  \
1000                1970-01-01T00:00:30,\n  \
1001                1970-01-01T00:01:00,\n  \
1002                1970-01-01T00:01:30,\n  \
1003                1970-01-01T00:02:00,\n  \
1004                1970-01-01T00:02:30,\n  \
1005                1970-01-01T00:03:00,\n  \
1006                1970-01-01T00:03:30,\n  \
1007                1970-01-01T00:04:00,\n  \
1008                1970-01-01T00:04:30,\n  \
1009                1970-01-01T00:05:00,\n\
1010            ]\nRangeArray { \
1011                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
1012                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1013            }\nRangeArray { \
1014                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
1015                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1016            }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
1017            RangeArray { \
1018                base array: PrimitiveArray<Timestamp(ms)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
1019                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1020            }",
1021        );
1022        do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
1023
1024        // dump large range
1025        do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
1026    }
1027
1028    #[tokio::test]
1029    async fn small_empty_range() {
1030        let expected = String::from(
1031            "PrimitiveArray<Timestamp(ms)>\n[\n  \
1032            1970-01-01T00:00:00.001,\n  \
1033            1970-01-01T00:00:03.001,\n  \
1034            1970-01-01T00:00:06.001,\n  \
1035            1970-01-01T00:00:09.001,\n\
1036        ]\nRangeArray { \
1037            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
1038            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1039        }\nRangeArray { \
1040            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
1041            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1042        }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
1043        RangeArray { \
1044            base array: PrimitiveArray<Timestamp(ms)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
1045            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1046        }",
1047        );
1048        do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
1049    }
1050
1051    #[test]
1052    fn test_calculate_range_preserves_alignment() {
1053        // Test case: query starts at timestamp ending in 4000, step is 30s
1054        // Data starts at different alignment - should preserve query's 4000 pattern
1055        let schema = Arc::new(Schema::new(vec![Field::new(
1056            "timestamp",
1057            TimestampMillisecondType::DATA_TYPE,
1058            false,
1059        )]));
1060        let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
1061
1062        let stream = RangeManipulateStream {
1063            start: 1758093274000, // ends in 4000
1064            end: 1758093334000,   // ends in 4000
1065            interval: 30000,      // 30s step
1066            range: 60000,         // 60s lookback
1067            time_index: 0,
1068            field_columns: vec![],
1069            aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
1070            output_schema: schema.clone(),
1071            input: Box::pin(empty_stream),
1072            metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1073            num_series: Count::new(),
1074        };
1075
1076        // Create test data with timestamps not aligned to query pattern
1077        let test_timestamps = vec![
1078            1758093260000, // ends in 0000 (different alignment)
1079            1758093290000, // ends in 0000
1080            1758093320000, // ends in 0000
1081        ];
1082        let ts_array = TimestampMillisecondArray::from(test_timestamps);
1083        let test_schema = Arc::new(Schema::new(vec![Field::new(
1084            "timestamp",
1085            TimestampMillisecondType::DATA_TYPE,
1086            false,
1087        )]));
1088        let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
1089
1090        let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
1091
1092        // Verify the optimized start preserves query alignment (should end in 4000)
1093        assert_eq!(
1094            start % 30000,
1095            1758093274000 % 30000,
1096            "Optimized start should preserve query alignment pattern"
1097        );
1098
1099        // Verify we generate correct number of ranges for the alignment
1100        let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
1101        assert_eq!(ranges.len(), expected_timestamps.len());
1102
1103        // Verify all generated timestamps maintain the same alignment pattern
1104        for ts in expected_timestamps {
1105            assert_eq!(
1106                ts % 30000,
1107                1758093274000 % 30000,
1108                "All timestamps should maintain query alignment pattern"
1109            );
1110        }
1111    }
1112}