promql/extension_plan/
range_manipulate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
23use datafusion::arrow::compute;
24use datafusion::arrow::datatypes::{Field, SchemaRef};
25use datafusion::arrow::error::ArrowError;
26use datafusion::arrow::record_batch::RecordBatch;
27use datafusion::common::stats::Precision;
28use datafusion::common::{DFSchema, DFSchemaRef};
29use datafusion::error::{DataFusionError, Result as DataFusionResult};
30use datafusion::execution::context::TaskContext;
31use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::EquivalenceProperties;
33use datafusion::physical_plan::metrics::{
34    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
35};
36use datafusion::physical_plan::{
37    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
38    SendableRecordBatchStream, Statistics,
39};
40use datafusion::sql::TableReference;
41use futures::{Stream, StreamExt, ready};
42use greptime_proto::substrait_extension as pb;
43use prost::Message;
44use snafu::ResultExt;
45
46use crate::error::{DeserializeSnafu, Result};
47use crate::extension_plan::{
48    METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
49};
50use crate::metrics::PROMQL_SERIES_COUNT;
51use crate::range_array::RangeArray;
52
53/// Time series manipulator for range function.
54///
55/// This plan will "fold" time index and value columns into [RangeArray]s, and truncate
56/// other columns to the same length with the "folded" [RangeArray] column.
57///
58/// To pass runtime information to the execution plan (or the range function), This plan
59/// will add those extra columns:
60/// - timestamp range with type [RangeArray], which is the folded timestamp column.
61/// - end of current range with the same type as the timestamp column. (todo)
62#[derive(Debug, PartialEq, Eq, Hash)]
63pub struct RangeManipulate {
64    start: Millisecond,
65    end: Millisecond,
66    interval: Millisecond,
67    range: Millisecond,
68    time_index: String,
69    field_columns: Vec<String>,
70    input: LogicalPlan,
71    output_schema: DFSchemaRef,
72    unfix: Option<UnfixIndices>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76struct UnfixIndices {
77    pub time_index_idx: u64,
78    pub tag_column_indices: Vec<u64>,
79}
80
81impl RangeManipulate {
82    pub fn new(
83        start: Millisecond,
84        end: Millisecond,
85        interval: Millisecond,
86        range: Millisecond,
87        time_index: String,
88        field_columns: Vec<String>,
89        input: LogicalPlan,
90    ) -> DataFusionResult<Self> {
91        let output_schema =
92            Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
93        Ok(Self {
94            start,
95            end,
96            interval,
97            range,
98            time_index,
99            field_columns,
100            input,
101            output_schema,
102            unfix: None,
103        })
104    }
105
106    pub const fn name() -> &'static str {
107        "RangeManipulate"
108    }
109
110    pub fn build_timestamp_range_name(time_index: &str) -> String {
111        format!("{time_index}_range")
112    }
113
114    pub fn internal_range_end_col_name() -> String {
115        "__internal_range_end".to_string()
116    }
117
118    fn range_timestamp_name(&self) -> String {
119        Self::build_timestamp_range_name(&self.time_index)
120    }
121
122    fn calculate_output_schema(
123        input_schema: &DFSchemaRef,
124        time_index: &str,
125        field_columns: &[String],
126    ) -> DataFusionResult<DFSchemaRef> {
127        let columns = input_schema.fields();
128        let mut new_columns = Vec::with_capacity(columns.len() + 1);
129        for i in 0..columns.len() {
130            let x = input_schema.qualified_field(i);
131            new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
132        }
133
134        // process time index column
135        // the raw timestamp field is preserved. And a new timestamp_range field is appended to the last.
136        let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
137            return Err(datafusion::common::field_not_found(
138                None::<TableReference>,
139                time_index,
140                input_schema.as_ref(),
141            ));
142        };
143        let ts_col_field = &columns[ts_col_index];
144        let timestamp_range_field = Field::new(
145            Self::build_timestamp_range_name(time_index),
146            RangeArray::convert_field(ts_col_field).data_type().clone(),
147            ts_col_field.is_nullable(),
148        );
149        new_columns.push((None, Arc::new(timestamp_range_field)));
150
151        // process value columns
152        for name in field_columns {
153            let Some(index) = input_schema.index_of_column_by_name(None, name) else {
154                return Err(datafusion::common::field_not_found(
155                    None::<TableReference>,
156                    name,
157                    input_schema.as_ref(),
158                ));
159            };
160            new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
161        }
162
163        Ok(Arc::new(DFSchema::new_with_metadata(
164            new_columns,
165            HashMap::new(),
166        )?))
167    }
168
169    pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
170        let output_schema: SchemaRef = self.output_schema.inner().clone();
171        let properties = exec_input.properties();
172        let properties = PlanProperties::new(
173            EquivalenceProperties::new(output_schema.clone()),
174            properties.partitioning.clone(),
175            properties.emission_type,
176            properties.boundedness,
177        );
178        Arc::new(RangeManipulateExec {
179            start: self.start,
180            end: self.end,
181            interval: self.interval,
182            range: self.range,
183            time_index_column: self.time_index.clone(),
184            time_range_column: self.range_timestamp_name(),
185            field_columns: self.field_columns.clone(),
186            input: exec_input,
187            output_schema,
188            metric: ExecutionPlanMetricsSet::new(),
189            properties,
190        })
191    }
192
193    pub fn serialize(&self) -> Vec<u8> {
194        let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
195
196        let tag_column_indices = self
197            .field_columns
198            .iter()
199            .map(|name| serialize_column_index(self.input.schema(), name))
200            .collect::<Vec<u64>>();
201
202        pb::RangeManipulate {
203            start: self.start,
204            end: self.end,
205            interval: self.interval,
206            range: self.range,
207            time_index_idx,
208            tag_column_indices,
209            ..Default::default()
210        }
211        .encode_to_vec()
212    }
213
214    pub fn deserialize(bytes: &[u8]) -> Result<Self> {
215        let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
216        let empty_schema = Arc::new(DFSchema::empty());
217        let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
218            produce_one_row: false,
219            schema: empty_schema.clone(),
220        });
221
222        let unfix = UnfixIndices {
223            time_index_idx: pb_range_manipulate.time_index_idx,
224            tag_column_indices: pb_range_manipulate.tag_column_indices.clone(),
225        };
226        debug!("RangeManipulate deserialize unfix: {:?}", unfix);
227
228        // Unlike `Self::new()`, this method doesn't check the input schema as it will fail
229        // because the input schema is empty.
230        // But this is Ok since datafusion guarantees to call `with_exprs_and_inputs` for the
231        // deserialized plan.
232        Ok(Self {
233            start: pb_range_manipulate.start,
234            end: pb_range_manipulate.end,
235            interval: pb_range_manipulate.interval,
236            range: pb_range_manipulate.range,
237            time_index: String::new(),
238            field_columns: Vec::new(),
239            input: placeholder_plan,
240            output_schema: empty_schema,
241            unfix: Some(unfix),
242        })
243    }
244}
245
246impl PartialOrd for RangeManipulate {
247    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
248        // Compare fields in order excluding output_schema
249        match self.start.partial_cmp(&other.start) {
250            Some(core::cmp::Ordering::Equal) => {}
251            ord => return ord,
252        }
253        match self.end.partial_cmp(&other.end) {
254            Some(core::cmp::Ordering::Equal) => {}
255            ord => return ord,
256        }
257        match self.interval.partial_cmp(&other.interval) {
258            Some(core::cmp::Ordering::Equal) => {}
259            ord => return ord,
260        }
261        match self.range.partial_cmp(&other.range) {
262            Some(core::cmp::Ordering::Equal) => {}
263            ord => return ord,
264        }
265        match self.time_index.partial_cmp(&other.time_index) {
266            Some(core::cmp::Ordering::Equal) => {}
267            ord => return ord,
268        }
269        match self.field_columns.partial_cmp(&other.field_columns) {
270            Some(core::cmp::Ordering::Equal) => {}
271            ord => return ord,
272        }
273        self.input.partial_cmp(&other.input)
274    }
275}
276
277impl UserDefinedLogicalNodeCore for RangeManipulate {
278    fn name(&self) -> &str {
279        Self::name()
280    }
281
282    fn inputs(&self) -> Vec<&LogicalPlan> {
283        vec![&self.input]
284    }
285
286    fn schema(&self) -> &DFSchemaRef {
287        &self.output_schema
288    }
289
290    fn expressions(&self) -> Vec<Expr> {
291        vec![]
292    }
293
294    fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
295        write!(
296            f,
297            "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
298            self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
299        )
300    }
301
302    fn with_exprs_and_inputs(
303        &self,
304        _exprs: Vec<Expr>,
305        mut inputs: Vec<LogicalPlan>,
306    ) -> DataFusionResult<Self> {
307        if inputs.len() != 1 {
308            return Err(DataFusionError::Internal(
309                "RangeManipulate should have at exact one input".to_string(),
310            ));
311        }
312
313        let input: LogicalPlan = inputs.pop().unwrap();
314        let input_schema = input.schema();
315
316        if let Some(unfix) = &self.unfix {
317            // transform indices to names
318            let time_index = resolve_column_name(
319                unfix.time_index_idx,
320                input_schema,
321                "RangeManipulate",
322                "time index",
323            )?;
324
325            let field_columns = unfix
326                .tag_column_indices
327                .iter()
328                .map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag"))
329                .collect::<DataFusionResult<Vec<String>>>()?;
330
331            let output_schema =
332                Self::calculate_output_schema(input_schema, &time_index, &field_columns)?;
333
334            Ok(Self {
335                start: self.start,
336                end: self.end,
337                interval: self.interval,
338                range: self.range,
339                time_index,
340                field_columns,
341                input,
342                output_schema,
343                unfix: None,
344            })
345        } else {
346            let output_schema =
347                Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
348
349            Ok(Self {
350                start: self.start,
351                end: self.end,
352                interval: self.interval,
353                range: self.range,
354                time_index: self.time_index.clone(),
355                field_columns: self.field_columns.clone(),
356                input,
357                output_schema,
358                unfix: None,
359            })
360        }
361    }
362}
363
364#[derive(Debug)]
365pub struct RangeManipulateExec {
366    start: Millisecond,
367    end: Millisecond,
368    interval: Millisecond,
369    range: Millisecond,
370    time_index_column: String,
371    time_range_column: String,
372    field_columns: Vec<String>,
373
374    input: Arc<dyn ExecutionPlan>,
375    output_schema: SchemaRef,
376    metric: ExecutionPlanMetricsSet,
377    properties: PlanProperties,
378}
379
380impl ExecutionPlan for RangeManipulateExec {
381    fn as_any(&self) -> &dyn Any {
382        self
383    }
384
385    fn schema(&self) -> SchemaRef {
386        self.output_schema.clone()
387    }
388
389    fn properties(&self) -> &PlanProperties {
390        &self.properties
391    }
392
393    fn maintains_input_order(&self) -> Vec<bool> {
394        vec![true; self.children().len()]
395    }
396
397    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
398        vec![&self.input]
399    }
400
401    fn required_input_distribution(&self) -> Vec<Distribution> {
402        let input_requirement = self.input.required_input_distribution();
403        if input_requirement.is_empty() {
404            // if the input is EmptyMetric, its required_input_distribution() is empty so we can't
405            // use its input distribution.
406            vec![Distribution::UnspecifiedDistribution]
407        } else {
408            input_requirement
409        }
410    }
411
412    fn with_new_children(
413        self: Arc<Self>,
414        children: Vec<Arc<dyn ExecutionPlan>>,
415    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
416        assert!(!children.is_empty());
417        let exec_input = children[0].clone();
418        let properties = exec_input.properties();
419        let properties = PlanProperties::new(
420            EquivalenceProperties::new(self.output_schema.clone()),
421            properties.partitioning.clone(),
422            properties.emission_type,
423            properties.boundedness,
424        );
425        Ok(Arc::new(Self {
426            start: self.start,
427            end: self.end,
428            interval: self.interval,
429            range: self.range,
430            time_index_column: self.time_index_column.clone(),
431            time_range_column: self.time_range_column.clone(),
432            field_columns: self.field_columns.clone(),
433            output_schema: self.output_schema.clone(),
434            input: children[0].clone(),
435            metric: self.metric.clone(),
436            properties,
437        }))
438    }
439
440    fn execute(
441        &self,
442        partition: usize,
443        context: Arc<TaskContext>,
444    ) -> DataFusionResult<SendableRecordBatchStream> {
445        let baseline_metric = BaselineMetrics::new(&self.metric, partition);
446        let metrics_builder = MetricBuilder::new(&self.metric);
447        let num_series = Count::new();
448        metrics_builder
449            .with_partition(partition)
450            .build(MetricValue::Count {
451                name: METRIC_NUM_SERIES.into(),
452                count: num_series.clone(),
453            });
454
455        let input = self.input.execute(partition, context)?;
456        let schema = input.schema();
457        let time_index = schema
458            .column_with_name(&self.time_index_column)
459            .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
460            .0;
461        let field_columns = self
462            .field_columns
463            .iter()
464            .map(|value_col| {
465                schema
466                    .column_with_name(value_col)
467                    .unwrap_or_else(|| panic!("value column {value_col} not found",))
468                    .0
469            })
470            .collect();
471        let aligned_ts_array =
472            RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
473        Ok(Box::pin(RangeManipulateStream {
474            start: self.start,
475            end: self.end,
476            interval: self.interval,
477            range: self.range,
478            time_index,
479            field_columns,
480            aligned_ts_array,
481            output_schema: self.output_schema.clone(),
482            input,
483            metric: baseline_metric,
484            num_series,
485        }))
486    }
487
488    fn metrics(&self) -> Option<MetricsSet> {
489        Some(self.metric.clone_inner())
490    }
491
492    fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
493        let input_stats = self.input.partition_statistics(partition)?;
494
495        let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
496        let estimated_total_bytes = input_stats
497            .total_byte_size
498            .get_value()
499            .zip(input_stats.num_rows.get_value())
500            .map(|(size, rows)| {
501                Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
502            })
503            .unwrap_or_default();
504
505        Ok(Statistics {
506            num_rows: Precision::Inexact(estimated_row_num as _),
507            total_byte_size: estimated_total_bytes,
508            // TODO(ruihang): support this column statistics
509            column_statistics: Statistics::unknown_column(&self.schema()),
510        })
511    }
512
513    fn name(&self) -> &str {
514        "RangeManipulateExec"
515    }
516}
517
518impl DisplayAs for RangeManipulateExec {
519    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
520        match t {
521            DisplayFormatType::Default
522            | DisplayFormatType::Verbose
523            | DisplayFormatType::TreeRender => {
524                write!(
525                    f,
526                    "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
527                    self.start, self.end, self.interval, self.range, self.time_index_column
528                )
529            }
530        }
531    }
532}
533
534pub struct RangeManipulateStream {
535    start: Millisecond,
536    end: Millisecond,
537    interval: Millisecond,
538    range: Millisecond,
539    time_index: usize,
540    field_columns: Vec<usize>,
541    aligned_ts_array: ArrayRef,
542
543    output_schema: SchemaRef,
544    input: SendableRecordBatchStream,
545    metric: BaselineMetrics,
546    /// Number of series processed.
547    num_series: Count,
548}
549
550impl RecordBatchStream for RangeManipulateStream {
551    fn schema(&self) -> SchemaRef {
552        self.output_schema.clone()
553    }
554}
555
556impl Stream for RangeManipulateStream {
557    type Item = DataFusionResult<RecordBatch>;
558
559    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
560        let poll = loop {
561            match ready!(self.input.poll_next_unpin(cx)) {
562                Some(Ok(batch)) => {
563                    let timer = std::time::Instant::now();
564                    let result = self.manipulate(batch);
565                    if let Ok(None) = result {
566                        self.metric.elapsed_compute().add_elapsed(timer);
567                        continue;
568                    } else {
569                        self.num_series.add(1);
570                        self.metric.elapsed_compute().add_elapsed(timer);
571                        break Poll::Ready(result.transpose());
572                    }
573                }
574                None => {
575                    PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
576                    break Poll::Ready(None);
577                }
578                Some(Err(e)) => break Poll::Ready(Some(Err(e))),
579            }
580        };
581        self.metric.record_poll(poll)
582    }
583}
584
585impl RangeManipulateStream {
586    // Prometheus: https://github.com/prometheus/prometheus/blob/e934d0f01158a1d55fa0ebb035346b195fcc1260/promql/engine.go#L1113-L1198
587    // But they are not exactly the same, because we don't eager-evaluate on the data in this plan.
588    // And the generated timestamp is not aligned to the step. It's expected to do later.
589    pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
590        let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
591        // calculate the range
592        let (ranges, (start, end)) = self.calculate_range(&input)?;
593        // ignore this if all ranges are empty
594        if ranges.iter().all(|(_, len)| *len == 0) {
595            return Ok(None);
596        }
597
598        // transform columns
599        let mut new_columns = input.columns().to_vec();
600        for index in self.field_columns.iter() {
601            let _ = other_columns.remove(index);
602            let column = input.column(*index);
603            let new_column = Arc::new(
604                RangeArray::from_ranges(column.clone(), ranges.clone())
605                    .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
606                    .into_dict(),
607            );
608            new_columns[*index] = new_column;
609        }
610
611        // push timestamp range column
612        let ts_range_column =
613            RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
614                .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
615                .into_dict();
616        new_columns.push(Arc::new(ts_range_column));
617
618        // truncate other columns
619        let take_indices = Int64Array::from(vec![0; ranges.len()]);
620        for index in other_columns.into_iter() {
621            new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
622        }
623        // replace timestamp with the aligned one
624        let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
625            Self::build_aligned_ts_array(start, end, self.interval)
626        } else {
627            self.aligned_ts_array.clone()
628        };
629        new_columns[self.time_index] = new_time_index;
630
631        RecordBatch::try_new(self.output_schema.clone(), new_columns)
632            .map(Some)
633            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
634    }
635
636    fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
637        Arc::new(TimestampMillisecondArray::from_iter_values(
638            (start..=end).step_by(interval as _),
639        ))
640    }
641
642    /// Return values:
643    /// - A vector of tuples where each tuple contains the start index and length of the range.
644    /// - A tuple of the actual start/end timestamp used to calculate the range.
645    #[allow(clippy::type_complexity)]
646    fn calculate_range(
647        &self,
648        input: &RecordBatch,
649    ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
650        let ts_column = input
651            .column(self.time_index)
652            .as_any()
653            .downcast_ref::<TimestampMillisecondArray>()
654            .ok_or_else(|| {
655                DataFusionError::Execution(
656                    "Time index Column downcast to TimestampMillisecondArray failed".into(),
657                )
658            })?;
659
660        let len = ts_column.len();
661        if len == 0 {
662            return Ok((vec![], (self.start, self.end)));
663        }
664
665        // shorten the range to calculate
666        let first_ts = ts_column.value(0);
667        // Preserve the query's alignment pattern when optimizing start time
668        let remainder = (first_ts - self.start).rem_euclid(self.interval);
669        let first_ts_aligned = if remainder == 0 {
670            first_ts
671        } else {
672            first_ts + (self.interval - remainder)
673        };
674        let last_ts = ts_column.value(ts_column.len() - 1);
675        let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
676        let start = self.start.max(first_ts_aligned);
677        let end = self.end.min(last_ts_aligned);
678        if start > end {
679            return Ok((vec![], (start, end)));
680        }
681        let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
682
683        // calculate for every aligned timestamp (`curr_ts`), assume the ts column is ordered.
684        let mut range_start_index = 0usize;
685        let mut last_range_start = 0;
686        let mut start_delta = 0;
687        for curr_ts in (start..=end).step_by(self.interval as _) {
688            // determine range start
689            let start_ts = curr_ts - self.range;
690
691            // advance cursor based on last range
692            let mut range_start = ts_column.len();
693            let mut range_end = 0;
694            let mut cursor = range_start_index + start_delta;
695            // search back to keep the result correct
696            while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
697                cursor -= 1;
698            }
699
700            while cursor < ts_column.len() {
701                let ts = ts_column.value(cursor);
702                if range_start > cursor && ts >= start_ts {
703                    range_start = cursor;
704                    range_start_index = range_start;
705                }
706                if ts <= curr_ts {
707                    range_end = range_end.max(cursor);
708                } else {
709                    range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
710                    break;
711                }
712                cursor += 1;
713            }
714            if range_start > range_end {
715                ranges.push((0, 0));
716                start_delta = 0;
717            } else {
718                ranges.push((range_start as _, (range_end + 1 - range_start) as _));
719                start_delta = range_start - last_range_start;
720                last_range_start = range_start;
721            }
722        }
723
724        Ok((ranges, (start, end)))
725    }
726}
727
728#[cfg(test)]
729mod test {
730    use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
731    use datafusion::arrow::datatypes::{
732        ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
733    };
734    use datafusion::common::ToDFSchema;
735    use datafusion::datasource::memory::MemorySourceConfig;
736    use datafusion::datasource::source::DataSourceExec;
737    use datafusion::physical_expr::Partitioning;
738    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
739    use datafusion::physical_plan::memory::MemoryStream;
740    use datafusion::prelude::SessionContext;
741    use datatypes::arrow::array::TimestampMillisecondArray;
742
743    use super::*;
744
745    const TIME_INDEX_COLUMN: &str = "timestamp";
746
747    fn prepare_test_data() -> DataSourceExec {
748        let schema = Arc::new(Schema::new(vec![
749            Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
750            Field::new("value_1", DataType::Float64, true),
751            Field::new("value_2", DataType::Float64, true),
752            Field::new("path", DataType::Utf8, true),
753        ]));
754        let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
755            0, 30_000, 60_000, 90_000, 120_000, // every 30s
756            180_000, 240_000, // every 60s
757            241_000, 271_000, 291_000, // others
758        ])) as _;
759        let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
760        let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
761        let data = RecordBatch::try_new(
762            schema.clone(),
763            vec![
764                timestamp_column,
765                field_column.clone(),
766                field_column,
767                path_column,
768            ],
769        )
770        .unwrap();
771
772        DataSourceExec::new(Arc::new(
773            MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
774        ))
775    }
776
777    async fn do_normalize_test(
778        start: Millisecond,
779        end: Millisecond,
780        interval: Millisecond,
781        range: Millisecond,
782        expected: String,
783    ) {
784        let memory_exec = Arc::new(prepare_test_data());
785        let time_index = TIME_INDEX_COLUMN.to_string();
786        let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
787        let manipulate_output_schema = SchemaRef::new(
788            RangeManipulate::calculate_output_schema(
789                &memory_exec.schema().to_dfschema_ref().unwrap(),
790                &time_index,
791                &field_columns,
792            )
793            .unwrap()
794            .as_arrow()
795            .clone(),
796        );
797        let properties = PlanProperties::new(
798            EquivalenceProperties::new(manipulate_output_schema.clone()),
799            Partitioning::UnknownPartitioning(1),
800            EmissionType::Incremental,
801            Boundedness::Bounded,
802        );
803        let normalize_exec = Arc::new(RangeManipulateExec {
804            start,
805            end,
806            interval,
807            range,
808            field_columns,
809            output_schema: manipulate_output_schema,
810            time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
811            time_index_column: time_index,
812            input: memory_exec,
813            metric: ExecutionPlanMetricsSet::new(),
814            properties,
815        });
816        let session_context = SessionContext::default();
817        let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
818            .await
819            .unwrap();
820        // DirectoryArray from RangeArray cannot be print as normal arrays.
821        let result_literal: String = result
822            .into_iter()
823            .filter_map(|batch| {
824                batch
825                    .columns()
826                    .iter()
827                    .map(|array| {
828                        if matches!(array.data_type(), &DataType::Dictionary(..)) {
829                            let dict_array = array
830                                .as_any()
831                                .downcast_ref::<DictionaryArray<Int64Type>>()
832                                .unwrap()
833                                .clone();
834                            format!("{:?}", RangeArray::try_new(dict_array).unwrap())
835                        } else {
836                            format!("{array:?}")
837                        }
838                    })
839                    .reduce(|lhs, rhs| lhs + "\n" + &rhs)
840            })
841            .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
842            .unwrap();
843
844        assert_eq!(result_literal, expected);
845    }
846
847    #[tokio::test]
848    async fn interval_30s_range_90s() {
849        let expected = String::from(
850            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
851                1970-01-01T00:00:00,\n  \
852                1970-01-01T00:00:30,\n  \
853                1970-01-01T00:01:00,\n  \
854                1970-01-01T00:01:30,\n  \
855                1970-01-01T00:02:00,\n  \
856                1970-01-01T00:02:30,\n  \
857                1970-01-01T00:03:00,\n  \
858                1970-01-01T00:03:30,\n  \
859                1970-01-01T00:04:00,\n  \
860                1970-01-01T00:04:30,\n  \
861                1970-01-01T00:05:00,\n\
862            ]\nRangeArray { \
863                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
864                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
865            }\nRangeArray { \
866                base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
867                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
868            }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
869            RangeArray { \
870                base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
871                ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
872            }",
873        );
874        do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
875
876        // dump large range
877        do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
878    }
879
880    #[tokio::test]
881    async fn small_empty_range() {
882        let expected = String::from(
883            "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  \
884            1970-01-01T00:00:00.001,\n  \
885            1970-01-01T00:00:03.001,\n  \
886            1970-01-01T00:00:06.001,\n  \
887            1970-01-01T00:00:09.001,\n\
888        ]\nRangeArray { \
889            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
890            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
891        }\nRangeArray { \
892            base array: PrimitiveArray<Float64>\n[\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n  1.0,\n], \
893            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
894        }\nStringArray\n[\n  \"foo\",\n  \"foo\",\n  \"foo\",\n  \"foo\",\n]\n\
895        RangeArray { \
896            base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n  1970-01-01T00:00:00,\n  1970-01-01T00:00:30,\n  1970-01-01T00:01:00,\n  1970-01-01T00:01:30,\n  1970-01-01T00:02:00,\n  1970-01-01T00:03:00,\n  1970-01-01T00:04:00,\n  1970-01-01T00:04:01,\n  1970-01-01T00:04:31,\n  1970-01-01T00:04:51,\n], \
897            ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
898        }",
899        );
900        do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
901    }
902
903    #[test]
904    fn test_calculate_range_preserves_alignment() {
905        // Test case: query starts at timestamp ending in 4000, step is 30s
906        // Data starts at different alignment - should preserve query's 4000 pattern
907        let schema = Arc::new(Schema::new(vec![Field::new(
908            "timestamp",
909            TimestampMillisecondType::DATA_TYPE,
910            false,
911        )]));
912        let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
913
914        let stream = RangeManipulateStream {
915            start: 1758093274000, // ends in 4000
916            end: 1758093334000,   // ends in 4000
917            interval: 30000,      // 30s step
918            range: 60000,         // 60s lookback
919            time_index: 0,
920            field_columns: vec![],
921            aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
922            output_schema: schema.clone(),
923            input: Box::pin(empty_stream),
924            metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
925            num_series: Count::new(),
926        };
927
928        // Create test data with timestamps not aligned to query pattern
929        let test_timestamps = vec![
930            1758093260000, // ends in 0000 (different alignment)
931            1758093290000, // ends in 0000
932            1758093320000, // ends in 0000
933        ];
934        let ts_array = TimestampMillisecondArray::from(test_timestamps);
935        let test_schema = Arc::new(Schema::new(vec![Field::new(
936            "timestamp",
937            TimestampMillisecondType::DATA_TYPE,
938            false,
939        )]));
940        let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
941
942        let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
943
944        // Verify the optimized start preserves query alignment (should end in 4000)
945        assert_eq!(
946            start % 30000,
947            1758093274000 % 30000,
948            "Optimized start should preserve query alignment pattern"
949        );
950
951        // Verify we generate correct number of ranges for the alignment
952        let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
953        assert_eq!(ranges.len(), expected_timestamps.len());
954
955        // Verify all generated timestamps maintain the same alignment pattern
956        for ts in expected_timestamps {
957            assert_eq!(
958                ts % 30000,
959                1758093274000 % 30000,
960                "All timestamps should maintain query alignment pattern"
961            );
962        }
963    }
964}