1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::debug;
22use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
23use datafusion::arrow::compute;
24use datafusion::arrow::datatypes::{Field, SchemaRef};
25use datafusion::arrow::error::ArrowError;
26use datafusion::arrow::record_batch::RecordBatch;
27use datafusion::common::stats::Precision;
28use datafusion::common::{DFSchema, DFSchemaRef};
29use datafusion::error::{DataFusionError, Result as DataFusionResult};
30use datafusion::execution::context::TaskContext;
31use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::EquivalenceProperties;
33use datafusion::physical_plan::metrics::{
34 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
35};
36use datafusion::physical_plan::{
37 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
38 SendableRecordBatchStream, Statistics,
39};
40use datafusion::sql::TableReference;
41use futures::{Stream, StreamExt, ready};
42use greptime_proto::substrait_extension as pb;
43use prost::Message;
44use snafu::ResultExt;
45
46use crate::error::{DeserializeSnafu, Result};
47use crate::extension_plan::{
48 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
49};
50use crate::metrics::PROMQL_SERIES_COUNT;
51use crate::range_array::RangeArray;
52
53#[derive(Debug, PartialEq, Eq, Hash)]
63pub struct RangeManipulate {
64 start: Millisecond,
65 end: Millisecond,
66 interval: Millisecond,
67 range: Millisecond,
68 time_index: String,
69 field_columns: Vec<String>,
70 input: LogicalPlan,
71 output_schema: DFSchemaRef,
72 unfix: Option<UnfixIndices>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Hash)]
76struct UnfixIndices {
77 pub time_index_idx: u64,
78 pub tag_column_indices: Vec<u64>,
79}
80
81impl RangeManipulate {
82 pub fn new(
83 start: Millisecond,
84 end: Millisecond,
85 interval: Millisecond,
86 range: Millisecond,
87 time_index: String,
88 field_columns: Vec<String>,
89 input: LogicalPlan,
90 ) -> DataFusionResult<Self> {
91 let output_schema =
92 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
93 Ok(Self {
94 start,
95 end,
96 interval,
97 range,
98 time_index,
99 field_columns,
100 input,
101 output_schema,
102 unfix: None,
103 })
104 }
105
106 pub const fn name() -> &'static str {
107 "RangeManipulate"
108 }
109
110 pub fn build_timestamp_range_name(time_index: &str) -> String {
111 format!("{time_index}_range")
112 }
113
114 pub fn internal_range_end_col_name() -> String {
115 "__internal_range_end".to_string()
116 }
117
118 fn range_timestamp_name(&self) -> String {
119 Self::build_timestamp_range_name(&self.time_index)
120 }
121
122 fn calculate_output_schema(
123 input_schema: &DFSchemaRef,
124 time_index: &str,
125 field_columns: &[String],
126 ) -> DataFusionResult<DFSchemaRef> {
127 let columns = input_schema.fields();
128 let mut new_columns = Vec::with_capacity(columns.len() + 1);
129 for i in 0..columns.len() {
130 let x = input_schema.qualified_field(i);
131 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
132 }
133
134 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
137 return Err(datafusion::common::field_not_found(
138 None::<TableReference>,
139 time_index,
140 input_schema.as_ref(),
141 ));
142 };
143 let ts_col_field = &columns[ts_col_index];
144 let timestamp_range_field = Field::new(
145 Self::build_timestamp_range_name(time_index),
146 RangeArray::convert_field(ts_col_field).data_type().clone(),
147 ts_col_field.is_nullable(),
148 );
149 new_columns.push((None, Arc::new(timestamp_range_field)));
150
151 for name in field_columns {
153 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
154 return Err(datafusion::common::field_not_found(
155 None::<TableReference>,
156 name,
157 input_schema.as_ref(),
158 ));
159 };
160 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
161 }
162
163 Ok(Arc::new(DFSchema::new_with_metadata(
164 new_columns,
165 HashMap::new(),
166 )?))
167 }
168
169 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
170 let output_schema: SchemaRef = self.output_schema.inner().clone();
171 let properties = exec_input.properties();
172 let properties = PlanProperties::new(
173 EquivalenceProperties::new(output_schema.clone()),
174 properties.partitioning.clone(),
175 properties.emission_type,
176 properties.boundedness,
177 );
178 Arc::new(RangeManipulateExec {
179 start: self.start,
180 end: self.end,
181 interval: self.interval,
182 range: self.range,
183 time_index_column: self.time_index.clone(),
184 time_range_column: self.range_timestamp_name(),
185 field_columns: self.field_columns.clone(),
186 input: exec_input,
187 output_schema,
188 metric: ExecutionPlanMetricsSet::new(),
189 properties,
190 })
191 }
192
193 pub fn serialize(&self) -> Vec<u8> {
194 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
195
196 let tag_column_indices = self
197 .field_columns
198 .iter()
199 .map(|name| serialize_column_index(self.input.schema(), name))
200 .collect::<Vec<u64>>();
201
202 pb::RangeManipulate {
203 start: self.start,
204 end: self.end,
205 interval: self.interval,
206 range: self.range,
207 time_index_idx,
208 tag_column_indices,
209 ..Default::default()
210 }
211 .encode_to_vec()
212 }
213
214 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
215 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
216 let empty_schema = Arc::new(DFSchema::empty());
217 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
218 produce_one_row: false,
219 schema: empty_schema.clone(),
220 });
221
222 let unfix = UnfixIndices {
223 time_index_idx: pb_range_manipulate.time_index_idx,
224 tag_column_indices: pb_range_manipulate.tag_column_indices.clone(),
225 };
226 debug!("RangeManipulate deserialize unfix: {:?}", unfix);
227
228 Ok(Self {
233 start: pb_range_manipulate.start,
234 end: pb_range_manipulate.end,
235 interval: pb_range_manipulate.interval,
236 range: pb_range_manipulate.range,
237 time_index: String::new(),
238 field_columns: Vec::new(),
239 input: placeholder_plan,
240 output_schema: empty_schema,
241 unfix: Some(unfix),
242 })
243 }
244}
245
246impl PartialOrd for RangeManipulate {
247 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
248 match self.start.partial_cmp(&other.start) {
250 Some(core::cmp::Ordering::Equal) => {}
251 ord => return ord,
252 }
253 match self.end.partial_cmp(&other.end) {
254 Some(core::cmp::Ordering::Equal) => {}
255 ord => return ord,
256 }
257 match self.interval.partial_cmp(&other.interval) {
258 Some(core::cmp::Ordering::Equal) => {}
259 ord => return ord,
260 }
261 match self.range.partial_cmp(&other.range) {
262 Some(core::cmp::Ordering::Equal) => {}
263 ord => return ord,
264 }
265 match self.time_index.partial_cmp(&other.time_index) {
266 Some(core::cmp::Ordering::Equal) => {}
267 ord => return ord,
268 }
269 match self.field_columns.partial_cmp(&other.field_columns) {
270 Some(core::cmp::Ordering::Equal) => {}
271 ord => return ord,
272 }
273 self.input.partial_cmp(&other.input)
274 }
275}
276
277impl UserDefinedLogicalNodeCore for RangeManipulate {
278 fn name(&self) -> &str {
279 Self::name()
280 }
281
282 fn inputs(&self) -> Vec<&LogicalPlan> {
283 vec![&self.input]
284 }
285
286 fn schema(&self) -> &DFSchemaRef {
287 &self.output_schema
288 }
289
290 fn expressions(&self) -> Vec<Expr> {
291 vec![]
292 }
293
294 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
295 write!(
296 f,
297 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
298 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
299 )
300 }
301
302 fn with_exprs_and_inputs(
303 &self,
304 _exprs: Vec<Expr>,
305 mut inputs: Vec<LogicalPlan>,
306 ) -> DataFusionResult<Self> {
307 if inputs.len() != 1 {
308 return Err(DataFusionError::Internal(
309 "RangeManipulate should have at exact one input".to_string(),
310 ));
311 }
312
313 let input: LogicalPlan = inputs.pop().unwrap();
314 let input_schema = input.schema();
315
316 if let Some(unfix) = &self.unfix {
317 let time_index = resolve_column_name(
319 unfix.time_index_idx,
320 input_schema,
321 "RangeManipulate",
322 "time index",
323 )?;
324
325 let field_columns = unfix
326 .tag_column_indices
327 .iter()
328 .map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag"))
329 .collect::<DataFusionResult<Vec<String>>>()?;
330
331 let output_schema =
332 Self::calculate_output_schema(input_schema, &time_index, &field_columns)?;
333
334 Ok(Self {
335 start: self.start,
336 end: self.end,
337 interval: self.interval,
338 range: self.range,
339 time_index,
340 field_columns,
341 input,
342 output_schema,
343 unfix: None,
344 })
345 } else {
346 let output_schema =
347 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
348
349 Ok(Self {
350 start: self.start,
351 end: self.end,
352 interval: self.interval,
353 range: self.range,
354 time_index: self.time_index.clone(),
355 field_columns: self.field_columns.clone(),
356 input,
357 output_schema,
358 unfix: None,
359 })
360 }
361 }
362}
363
364#[derive(Debug)]
365pub struct RangeManipulateExec {
366 start: Millisecond,
367 end: Millisecond,
368 interval: Millisecond,
369 range: Millisecond,
370 time_index_column: String,
371 time_range_column: String,
372 field_columns: Vec<String>,
373
374 input: Arc<dyn ExecutionPlan>,
375 output_schema: SchemaRef,
376 metric: ExecutionPlanMetricsSet,
377 properties: PlanProperties,
378}
379
380impl ExecutionPlan for RangeManipulateExec {
381 fn as_any(&self) -> &dyn Any {
382 self
383 }
384
385 fn schema(&self) -> SchemaRef {
386 self.output_schema.clone()
387 }
388
389 fn properties(&self) -> &PlanProperties {
390 &self.properties
391 }
392
393 fn maintains_input_order(&self) -> Vec<bool> {
394 vec![true; self.children().len()]
395 }
396
397 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
398 vec![&self.input]
399 }
400
401 fn required_input_distribution(&self) -> Vec<Distribution> {
402 let input_requirement = self.input.required_input_distribution();
403 if input_requirement.is_empty() {
404 vec![Distribution::UnspecifiedDistribution]
407 } else {
408 input_requirement
409 }
410 }
411
412 fn with_new_children(
413 self: Arc<Self>,
414 children: Vec<Arc<dyn ExecutionPlan>>,
415 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
416 assert!(!children.is_empty());
417 let exec_input = children[0].clone();
418 let properties = exec_input.properties();
419 let properties = PlanProperties::new(
420 EquivalenceProperties::new(self.output_schema.clone()),
421 properties.partitioning.clone(),
422 properties.emission_type,
423 properties.boundedness,
424 );
425 Ok(Arc::new(Self {
426 start: self.start,
427 end: self.end,
428 interval: self.interval,
429 range: self.range,
430 time_index_column: self.time_index_column.clone(),
431 time_range_column: self.time_range_column.clone(),
432 field_columns: self.field_columns.clone(),
433 output_schema: self.output_schema.clone(),
434 input: children[0].clone(),
435 metric: self.metric.clone(),
436 properties,
437 }))
438 }
439
440 fn execute(
441 &self,
442 partition: usize,
443 context: Arc<TaskContext>,
444 ) -> DataFusionResult<SendableRecordBatchStream> {
445 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
446 let metrics_builder = MetricBuilder::new(&self.metric);
447 let num_series = Count::new();
448 metrics_builder
449 .with_partition(partition)
450 .build(MetricValue::Count {
451 name: METRIC_NUM_SERIES.into(),
452 count: num_series.clone(),
453 });
454
455 let input = self.input.execute(partition, context)?;
456 let schema = input.schema();
457 let time_index = schema
458 .column_with_name(&self.time_index_column)
459 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
460 .0;
461 let field_columns = self
462 .field_columns
463 .iter()
464 .map(|value_col| {
465 schema
466 .column_with_name(value_col)
467 .unwrap_or_else(|| panic!("value column {value_col} not found",))
468 .0
469 })
470 .collect();
471 let aligned_ts_array =
472 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
473 Ok(Box::pin(RangeManipulateStream {
474 start: self.start,
475 end: self.end,
476 interval: self.interval,
477 range: self.range,
478 time_index,
479 field_columns,
480 aligned_ts_array,
481 output_schema: self.output_schema.clone(),
482 input,
483 metric: baseline_metric,
484 num_series,
485 }))
486 }
487
488 fn metrics(&self) -> Option<MetricsSet> {
489 Some(self.metric.clone_inner())
490 }
491
492 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
493 let input_stats = self.input.partition_statistics(partition)?;
494
495 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
496 let estimated_total_bytes = input_stats
497 .total_byte_size
498 .get_value()
499 .zip(input_stats.num_rows.get_value())
500 .map(|(size, rows)| {
501 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
502 })
503 .unwrap_or_default();
504
505 Ok(Statistics {
506 num_rows: Precision::Inexact(estimated_row_num as _),
507 total_byte_size: estimated_total_bytes,
508 column_statistics: Statistics::unknown_column(&self.schema()),
510 })
511 }
512
513 fn name(&self) -> &str {
514 "RangeManipulateExec"
515 }
516}
517
518impl DisplayAs for RangeManipulateExec {
519 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
520 match t {
521 DisplayFormatType::Default
522 | DisplayFormatType::Verbose
523 | DisplayFormatType::TreeRender => {
524 write!(
525 f,
526 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
527 self.start, self.end, self.interval, self.range, self.time_index_column
528 )
529 }
530 }
531 }
532}
533
534pub struct RangeManipulateStream {
535 start: Millisecond,
536 end: Millisecond,
537 interval: Millisecond,
538 range: Millisecond,
539 time_index: usize,
540 field_columns: Vec<usize>,
541 aligned_ts_array: ArrayRef,
542
543 output_schema: SchemaRef,
544 input: SendableRecordBatchStream,
545 metric: BaselineMetrics,
546 num_series: Count,
548}
549
550impl RecordBatchStream for RangeManipulateStream {
551 fn schema(&self) -> SchemaRef {
552 self.output_schema.clone()
553 }
554}
555
556impl Stream for RangeManipulateStream {
557 type Item = DataFusionResult<RecordBatch>;
558
559 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
560 let poll = loop {
561 match ready!(self.input.poll_next_unpin(cx)) {
562 Some(Ok(batch)) => {
563 let timer = std::time::Instant::now();
564 let result = self.manipulate(batch);
565 if let Ok(None) = result {
566 self.metric.elapsed_compute().add_elapsed(timer);
567 continue;
568 } else {
569 self.num_series.add(1);
570 self.metric.elapsed_compute().add_elapsed(timer);
571 break Poll::Ready(result.transpose());
572 }
573 }
574 None => {
575 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
576 break Poll::Ready(None);
577 }
578 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
579 }
580 };
581 self.metric.record_poll(poll)
582 }
583}
584
585impl RangeManipulateStream {
586 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
590 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
591 let (ranges, (start, end)) = self.calculate_range(&input)?;
593 if ranges.iter().all(|(_, len)| *len == 0) {
595 return Ok(None);
596 }
597
598 let mut new_columns = input.columns().to_vec();
600 for index in self.field_columns.iter() {
601 let _ = other_columns.remove(index);
602 let column = input.column(*index);
603 let new_column = Arc::new(
604 RangeArray::from_ranges(column.clone(), ranges.clone())
605 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
606 .into_dict(),
607 );
608 new_columns[*index] = new_column;
609 }
610
611 let ts_range_column =
613 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
614 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
615 .into_dict();
616 new_columns.push(Arc::new(ts_range_column));
617
618 let take_indices = Int64Array::from(vec![0; ranges.len()]);
620 for index in other_columns.into_iter() {
621 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
622 }
623 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
625 Self::build_aligned_ts_array(start, end, self.interval)
626 } else {
627 self.aligned_ts_array.clone()
628 };
629 new_columns[self.time_index] = new_time_index;
630
631 RecordBatch::try_new(self.output_schema.clone(), new_columns)
632 .map(Some)
633 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
634 }
635
636 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
637 Arc::new(TimestampMillisecondArray::from_iter_values(
638 (start..=end).step_by(interval as _),
639 ))
640 }
641
642 #[allow(clippy::type_complexity)]
646 fn calculate_range(
647 &self,
648 input: &RecordBatch,
649 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
650 let ts_column = input
651 .column(self.time_index)
652 .as_any()
653 .downcast_ref::<TimestampMillisecondArray>()
654 .ok_or_else(|| {
655 DataFusionError::Execution(
656 "Time index Column downcast to TimestampMillisecondArray failed".into(),
657 )
658 })?;
659
660 let len = ts_column.len();
661 if len == 0 {
662 return Ok((vec![], (self.start, self.end)));
663 }
664
665 let first_ts = ts_column.value(0);
667 let remainder = (first_ts - self.start).rem_euclid(self.interval);
669 let first_ts_aligned = if remainder == 0 {
670 first_ts
671 } else {
672 first_ts + (self.interval - remainder)
673 };
674 let last_ts = ts_column.value(ts_column.len() - 1);
675 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
676 let start = self.start.max(first_ts_aligned);
677 let end = self.end.min(last_ts_aligned);
678 if start > end {
679 return Ok((vec![], (start, end)));
680 }
681 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
682
683 let mut range_start_index = 0usize;
685 let mut last_range_start = 0;
686 let mut start_delta = 0;
687 for curr_ts in (start..=end).step_by(self.interval as _) {
688 let start_ts = curr_ts - self.range;
690
691 let mut range_start = ts_column.len();
693 let mut range_end = 0;
694 let mut cursor = range_start_index + start_delta;
695 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
697 cursor -= 1;
698 }
699
700 while cursor < ts_column.len() {
701 let ts = ts_column.value(cursor);
702 if range_start > cursor && ts >= start_ts {
703 range_start = cursor;
704 range_start_index = range_start;
705 }
706 if ts <= curr_ts {
707 range_end = range_end.max(cursor);
708 } else {
709 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
710 break;
711 }
712 cursor += 1;
713 }
714 if range_start > range_end {
715 ranges.push((0, 0));
716 start_delta = 0;
717 } else {
718 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
719 start_delta = range_start - last_range_start;
720 last_range_start = range_start;
721 }
722 }
723
724 Ok((ranges, (start, end)))
725 }
726}
727
728#[cfg(test)]
729mod test {
730 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
731 use datafusion::arrow::datatypes::{
732 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
733 };
734 use datafusion::common::ToDFSchema;
735 use datafusion::datasource::memory::MemorySourceConfig;
736 use datafusion::datasource::source::DataSourceExec;
737 use datafusion::physical_expr::Partitioning;
738 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
739 use datafusion::physical_plan::memory::MemoryStream;
740 use datafusion::prelude::SessionContext;
741 use datatypes::arrow::array::TimestampMillisecondArray;
742
743 use super::*;
744
745 const TIME_INDEX_COLUMN: &str = "timestamp";
746
747 fn prepare_test_data() -> DataSourceExec {
748 let schema = Arc::new(Schema::new(vec![
749 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
750 Field::new("value_1", DataType::Float64, true),
751 Field::new("value_2", DataType::Float64, true),
752 Field::new("path", DataType::Utf8, true),
753 ]));
754 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
755 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
759 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
760 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
761 let data = RecordBatch::try_new(
762 schema.clone(),
763 vec![
764 timestamp_column,
765 field_column.clone(),
766 field_column,
767 path_column,
768 ],
769 )
770 .unwrap();
771
772 DataSourceExec::new(Arc::new(
773 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
774 ))
775 }
776
777 async fn do_normalize_test(
778 start: Millisecond,
779 end: Millisecond,
780 interval: Millisecond,
781 range: Millisecond,
782 expected: String,
783 ) {
784 let memory_exec = Arc::new(prepare_test_data());
785 let time_index = TIME_INDEX_COLUMN.to_string();
786 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
787 let manipulate_output_schema = SchemaRef::new(
788 RangeManipulate::calculate_output_schema(
789 &memory_exec.schema().to_dfschema_ref().unwrap(),
790 &time_index,
791 &field_columns,
792 )
793 .unwrap()
794 .as_arrow()
795 .clone(),
796 );
797 let properties = PlanProperties::new(
798 EquivalenceProperties::new(manipulate_output_schema.clone()),
799 Partitioning::UnknownPartitioning(1),
800 EmissionType::Incremental,
801 Boundedness::Bounded,
802 );
803 let normalize_exec = Arc::new(RangeManipulateExec {
804 start,
805 end,
806 interval,
807 range,
808 field_columns,
809 output_schema: manipulate_output_schema,
810 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
811 time_index_column: time_index,
812 input: memory_exec,
813 metric: ExecutionPlanMetricsSet::new(),
814 properties,
815 });
816 let session_context = SessionContext::default();
817 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
818 .await
819 .unwrap();
820 let result_literal: String = result
822 .into_iter()
823 .filter_map(|batch| {
824 batch
825 .columns()
826 .iter()
827 .map(|array| {
828 if matches!(array.data_type(), &DataType::Dictionary(..)) {
829 let dict_array = array
830 .as_any()
831 .downcast_ref::<DictionaryArray<Int64Type>>()
832 .unwrap()
833 .clone();
834 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
835 } else {
836 format!("{array:?}")
837 }
838 })
839 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
840 })
841 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
842 .unwrap();
843
844 assert_eq!(result_literal, expected);
845 }
846
847 #[tokio::test]
848 async fn interval_30s_range_90s() {
849 let expected = String::from(
850 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
851 1970-01-01T00:00:00,\n \
852 1970-01-01T00:00:30,\n \
853 1970-01-01T00:01:00,\n \
854 1970-01-01T00:01:30,\n \
855 1970-01-01T00:02:00,\n \
856 1970-01-01T00:02:30,\n \
857 1970-01-01T00:03:00,\n \
858 1970-01-01T00:03:30,\n \
859 1970-01-01T00:04:00,\n \
860 1970-01-01T00:04:30,\n \
861 1970-01-01T00:05:00,\n\
862 ]\nRangeArray { \
863 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
864 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
865 }\nRangeArray { \
866 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
867 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
868 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
869 RangeArray { \
870 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
871 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
872 }",
873 );
874 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
875
876 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
878 }
879
880 #[tokio::test]
881 async fn small_empty_range() {
882 let expected = String::from(
883 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
884 1970-01-01T00:00:00.001,\n \
885 1970-01-01T00:00:03.001,\n \
886 1970-01-01T00:00:06.001,\n \
887 1970-01-01T00:00:09.001,\n\
888 ]\nRangeArray { \
889 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
890 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
891 }\nRangeArray { \
892 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
893 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
894 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
895 RangeArray { \
896 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
897 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
898 }",
899 );
900 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
901 }
902
903 #[test]
904 fn test_calculate_range_preserves_alignment() {
905 let schema = Arc::new(Schema::new(vec![Field::new(
908 "timestamp",
909 TimestampMillisecondType::DATA_TYPE,
910 false,
911 )]));
912 let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
913
914 let stream = RangeManipulateStream {
915 start: 1758093274000, end: 1758093334000, interval: 30000, range: 60000, time_index: 0,
920 field_columns: vec![],
921 aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
922 output_schema: schema.clone(),
923 input: Box::pin(empty_stream),
924 metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
925 num_series: Count::new(),
926 };
927
928 let test_timestamps = vec![
930 1758093260000, 1758093290000, 1758093320000, ];
934 let ts_array = TimestampMillisecondArray::from(test_timestamps);
935 let test_schema = Arc::new(Schema::new(vec![Field::new(
936 "timestamp",
937 TimestampMillisecondType::DATA_TYPE,
938 false,
939 )]));
940 let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
941
942 let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
943
944 assert_eq!(
946 start % 30000,
947 1758093274000 % 30000,
948 "Optimized start should preserve query alignment pattern"
949 );
950
951 let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
953 assert_eq!(ranges.len(), expected_timestamps.len());
954
955 for ts in expected_timestamps {
957 assert_eq!(
958 ts % 30000,
959 1758093274000 % 30000,
960 "All timestamps should maintain query alignment pattern"
961 );
962 }
963 }
964}