1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use common_telemetry::{debug, warn};
22use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
23use datafusion::arrow::compute;
24use datafusion::arrow::datatypes::{Field, SchemaRef};
25use datafusion::arrow::error::ArrowError;
26use datafusion::arrow::record_batch::RecordBatch;
27use datafusion::common::stats::Precision;
28use datafusion::common::{DFSchema, DFSchemaRef};
29use datafusion::error::{DataFusionError, Result as DataFusionResult};
30use datafusion::execution::context::TaskContext;
31use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
32use datafusion::physical_expr::EquivalenceProperties;
33use datafusion::physical_plan::metrics::{
34 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
35};
36use datafusion::physical_plan::{
37 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
38 SendableRecordBatchStream, Statistics,
39};
40use datafusion::sql::TableReference;
41use datafusion_expr::col;
42use futures::{Stream, StreamExt, ready};
43use greptime_proto::substrait_extension as pb;
44use prost::Message;
45use snafu::ResultExt;
46
47use crate::error::{DeserializeSnafu, Result};
48use crate::extension_plan::{
49 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
50};
51use crate::metrics::PROMQL_SERIES_COUNT;
52use crate::range_array::RangeArray;
53
54#[derive(Debug, PartialEq, Eq, Hash)]
64pub struct RangeManipulate {
65 start: Millisecond,
66 end: Millisecond,
67 interval: Millisecond,
68 range: Millisecond,
69 time_index: String,
70 field_columns: Vec<String>,
71 input: LogicalPlan,
72 output_schema: DFSchemaRef,
73 unfix: Option<UnfixIndices>,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77struct UnfixIndices {
78 pub time_index_idx: u64,
79 pub tag_column_indices: Vec<u64>,
80}
81
82impl RangeManipulate {
83 pub fn new(
84 start: Millisecond,
85 end: Millisecond,
86 interval: Millisecond,
87 range: Millisecond,
88 time_index: String,
89 field_columns: Vec<String>,
90 input: LogicalPlan,
91 ) -> DataFusionResult<Self> {
92 let output_schema =
93 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
94 Ok(Self {
95 start,
96 end,
97 interval,
98 range,
99 time_index,
100 field_columns,
101 input,
102 output_schema,
103 unfix: None,
104 })
105 }
106
107 pub const fn name() -> &'static str {
108 "RangeManipulate"
109 }
110
111 pub fn build_timestamp_range_name(time_index: &str) -> String {
112 format!("{time_index}_range")
113 }
114
115 pub fn internal_range_end_col_name() -> String {
116 "__internal_range_end".to_string()
117 }
118
119 fn range_timestamp_name(&self) -> String {
120 Self::build_timestamp_range_name(&self.time_index)
121 }
122
123 fn calculate_output_schema(
124 input_schema: &DFSchemaRef,
125 time_index: &str,
126 field_columns: &[String],
127 ) -> DataFusionResult<DFSchemaRef> {
128 let columns = input_schema.fields();
129 let mut new_columns = Vec::with_capacity(columns.len() + 1);
130 for i in 0..columns.len() {
131 let x = input_schema.qualified_field(i);
132 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
133 }
134
135 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
138 return Err(datafusion::common::field_not_found(
139 None::<TableReference>,
140 time_index,
141 input_schema.as_ref(),
142 ));
143 };
144 let ts_col_field = &columns[ts_col_index];
145 let timestamp_range_field = Field::new(
146 Self::build_timestamp_range_name(time_index),
147 RangeArray::convert_field(ts_col_field).data_type().clone(),
148 ts_col_field.is_nullable(),
149 );
150 new_columns.push((None, Arc::new(timestamp_range_field)));
151
152 for name in field_columns {
154 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
155 return Err(datafusion::common::field_not_found(
156 None::<TableReference>,
157 name,
158 input_schema.as_ref(),
159 ));
160 };
161 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
162 }
163
164 Ok(Arc::new(DFSchema::new_with_metadata(
165 new_columns,
166 HashMap::new(),
167 )?))
168 }
169
170 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
171 let output_schema: SchemaRef = self.output_schema.inner().clone();
172 let properties = exec_input.properties();
173 let properties = PlanProperties::new(
174 EquivalenceProperties::new(output_schema.clone()),
175 properties.partitioning.clone(),
176 properties.emission_type,
177 properties.boundedness,
178 );
179 Arc::new(RangeManipulateExec {
180 start: self.start,
181 end: self.end,
182 interval: self.interval,
183 range: self.range,
184 time_index_column: self.time_index.clone(),
185 time_range_column: self.range_timestamp_name(),
186 field_columns: self.field_columns.clone(),
187 input: exec_input,
188 output_schema,
189 metric: ExecutionPlanMetricsSet::new(),
190 properties,
191 })
192 }
193
194 pub fn serialize(&self) -> Vec<u8> {
195 let time_index_idx = serialize_column_index(self.input.schema(), &self.time_index);
196
197 let tag_column_indices = self
198 .field_columns
199 .iter()
200 .map(|name| serialize_column_index(self.input.schema(), name))
201 .collect::<Vec<u64>>();
202
203 pb::RangeManipulate {
204 start: self.start,
205 end: self.end,
206 interval: self.interval,
207 range: self.range,
208 time_index_idx,
209 tag_column_indices,
210 ..Default::default()
211 }
212 .encode_to_vec()
213 }
214
215 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
216 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
217 let empty_schema = Arc::new(DFSchema::empty());
218 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
219 produce_one_row: false,
220 schema: empty_schema.clone(),
221 });
222
223 let unfix = UnfixIndices {
224 time_index_idx: pb_range_manipulate.time_index_idx,
225 tag_column_indices: pb_range_manipulate.tag_column_indices.clone(),
226 };
227 debug!("RangeManipulate deserialize unfix: {:?}", unfix);
228
229 Ok(Self {
234 start: pb_range_manipulate.start,
235 end: pb_range_manipulate.end,
236 interval: pb_range_manipulate.interval,
237 range: pb_range_manipulate.range,
238 time_index: String::new(),
239 field_columns: Vec::new(),
240 input: placeholder_plan,
241 output_schema: empty_schema,
242 unfix: Some(unfix),
243 })
244 }
245}
246
247impl PartialOrd for RangeManipulate {
248 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
249 match self.start.partial_cmp(&other.start) {
251 Some(core::cmp::Ordering::Equal) => {}
252 ord => return ord,
253 }
254 match self.end.partial_cmp(&other.end) {
255 Some(core::cmp::Ordering::Equal) => {}
256 ord => return ord,
257 }
258 match self.interval.partial_cmp(&other.interval) {
259 Some(core::cmp::Ordering::Equal) => {}
260 ord => return ord,
261 }
262 match self.range.partial_cmp(&other.range) {
263 Some(core::cmp::Ordering::Equal) => {}
264 ord => return ord,
265 }
266 match self.time_index.partial_cmp(&other.time_index) {
267 Some(core::cmp::Ordering::Equal) => {}
268 ord => return ord,
269 }
270 match self.field_columns.partial_cmp(&other.field_columns) {
271 Some(core::cmp::Ordering::Equal) => {}
272 ord => return ord,
273 }
274 self.input.partial_cmp(&other.input)
275 }
276}
277
278impl UserDefinedLogicalNodeCore for RangeManipulate {
279 fn name(&self) -> &str {
280 Self::name()
281 }
282
283 fn inputs(&self) -> Vec<&LogicalPlan> {
284 vec![&self.input]
285 }
286
287 fn schema(&self) -> &DFSchemaRef {
288 &self.output_schema
289 }
290
291 fn expressions(&self) -> Vec<Expr> {
292 if self.unfix.is_some() {
293 return vec![];
294 }
295
296 let mut exprs = Vec::with_capacity(1 + self.field_columns.len());
297 exprs.push(col(&self.time_index));
298 exprs.extend(self.field_columns.iter().map(col));
299 exprs
300 }
301
302 fn necessary_children_exprs(&self, output_columns: &[usize]) -> Option<Vec<Vec<usize>>> {
303 if self.unfix.is_some() {
304 return None;
305 }
306
307 let input_schema = self.input.schema();
308 let input_len = input_schema.fields().len();
309 let time_index_idx = input_schema.index_of_column_by_name(None, &self.time_index)?;
310
311 if output_columns.is_empty() {
312 let indices = (0..input_len).collect::<Vec<_>>();
313 return Some(vec![indices]);
314 }
315
316 let mut required = Vec::with_capacity(output_columns.len() + 1 + self.field_columns.len());
317 required.push(time_index_idx);
318 for value_column in &self.field_columns {
319 required.push(input_schema.index_of_column_by_name(None, value_column)?);
320 }
321 for &idx in output_columns {
322 if idx < input_len {
323 required.push(idx);
324 } else if idx == input_len {
325 required.push(time_index_idx);
327 } else {
328 warn!(
329 "Output column index {} is out of bounds for input schema with length {}",
330 idx, input_len
331 );
332 return None;
333 }
334 }
335
336 required.sort_unstable();
337 required.dedup();
338 Some(vec![required])
339 }
340
341 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
342 write!(
343 f,
344 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
345 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
346 )
347 }
348
349 fn with_exprs_and_inputs(
350 &self,
351 _exprs: Vec<Expr>,
352 mut inputs: Vec<LogicalPlan>,
353 ) -> DataFusionResult<Self> {
354 if inputs.len() != 1 {
355 return Err(DataFusionError::Internal(
356 "RangeManipulate should have at exact one input".to_string(),
357 ));
358 }
359
360 let input: LogicalPlan = inputs.pop().unwrap();
361 let input_schema = input.schema();
362
363 if let Some(unfix) = &self.unfix {
364 let time_index = resolve_column_name(
366 unfix.time_index_idx,
367 input_schema,
368 "RangeManipulate",
369 "time index",
370 )?;
371
372 let field_columns = unfix
373 .tag_column_indices
374 .iter()
375 .map(|idx| resolve_column_name(*idx, input_schema, "RangeManipulate", "tag"))
376 .collect::<DataFusionResult<Vec<String>>>()?;
377
378 let output_schema =
379 Self::calculate_output_schema(input_schema, &time_index, &field_columns)?;
380
381 Ok(Self {
382 start: self.start,
383 end: self.end,
384 interval: self.interval,
385 range: self.range,
386 time_index,
387 field_columns,
388 input,
389 output_schema,
390 unfix: None,
391 })
392 } else {
393 let output_schema =
394 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
395
396 Ok(Self {
397 start: self.start,
398 end: self.end,
399 interval: self.interval,
400 range: self.range,
401 time_index: self.time_index.clone(),
402 field_columns: self.field_columns.clone(),
403 input,
404 output_schema,
405 unfix: None,
406 })
407 }
408 }
409}
410
411#[derive(Debug)]
412pub struct RangeManipulateExec {
413 start: Millisecond,
414 end: Millisecond,
415 interval: Millisecond,
416 range: Millisecond,
417 time_index_column: String,
418 time_range_column: String,
419 field_columns: Vec<String>,
420
421 input: Arc<dyn ExecutionPlan>,
422 output_schema: SchemaRef,
423 metric: ExecutionPlanMetricsSet,
424 properties: PlanProperties,
425}
426
427impl ExecutionPlan for RangeManipulateExec {
428 fn as_any(&self) -> &dyn Any {
429 self
430 }
431
432 fn schema(&self) -> SchemaRef {
433 self.output_schema.clone()
434 }
435
436 fn properties(&self) -> &PlanProperties {
437 &self.properties
438 }
439
440 fn maintains_input_order(&self) -> Vec<bool> {
441 vec![true; self.children().len()]
442 }
443
444 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
445 vec![&self.input]
446 }
447
448 fn required_input_distribution(&self) -> Vec<Distribution> {
449 let input_requirement = self.input.required_input_distribution();
450 if input_requirement.is_empty() {
451 vec![Distribution::UnspecifiedDistribution]
454 } else {
455 input_requirement
456 }
457 }
458
459 fn with_new_children(
460 self: Arc<Self>,
461 children: Vec<Arc<dyn ExecutionPlan>>,
462 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
463 assert!(!children.is_empty());
464 let exec_input = children[0].clone();
465 let properties = exec_input.properties();
466 let properties = PlanProperties::new(
467 EquivalenceProperties::new(self.output_schema.clone()),
468 properties.partitioning.clone(),
469 properties.emission_type,
470 properties.boundedness,
471 );
472 Ok(Arc::new(Self {
473 start: self.start,
474 end: self.end,
475 interval: self.interval,
476 range: self.range,
477 time_index_column: self.time_index_column.clone(),
478 time_range_column: self.time_range_column.clone(),
479 field_columns: self.field_columns.clone(),
480 output_schema: self.output_schema.clone(),
481 input: children[0].clone(),
482 metric: self.metric.clone(),
483 properties,
484 }))
485 }
486
487 fn execute(
488 &self,
489 partition: usize,
490 context: Arc<TaskContext>,
491 ) -> DataFusionResult<SendableRecordBatchStream> {
492 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
493 let metrics_builder = MetricBuilder::new(&self.metric);
494 let num_series = Count::new();
495 metrics_builder
496 .with_partition(partition)
497 .build(MetricValue::Count {
498 name: METRIC_NUM_SERIES.into(),
499 count: num_series.clone(),
500 });
501
502 let input = self.input.execute(partition, context)?;
503 let schema = input.schema();
504 let time_index = schema
505 .column_with_name(&self.time_index_column)
506 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
507 .0;
508 let field_columns = self
509 .field_columns
510 .iter()
511 .map(|value_col| {
512 schema
513 .column_with_name(value_col)
514 .unwrap_or_else(|| panic!("value column {value_col} not found",))
515 .0
516 })
517 .collect();
518 let aligned_ts_array =
519 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
520 Ok(Box::pin(RangeManipulateStream {
521 start: self.start,
522 end: self.end,
523 interval: self.interval,
524 range: self.range,
525 time_index,
526 field_columns,
527 aligned_ts_array,
528 output_schema: self.output_schema.clone(),
529 input,
530 metric: baseline_metric,
531 num_series,
532 }))
533 }
534
535 fn metrics(&self) -> Option<MetricsSet> {
536 Some(self.metric.clone_inner())
537 }
538
539 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
540 let input_stats = self.input.partition_statistics(partition)?;
541
542 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
543 let estimated_total_bytes = input_stats
544 .total_byte_size
545 .get_value()
546 .zip(input_stats.num_rows.get_value())
547 .map(|(size, rows)| {
548 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
549 })
550 .unwrap_or_default();
551
552 Ok(Statistics {
553 num_rows: Precision::Inexact(estimated_row_num as _),
554 total_byte_size: estimated_total_bytes,
555 column_statistics: Statistics::unknown_column(&self.schema()),
557 })
558 }
559
560 fn name(&self) -> &str {
561 "RangeManipulateExec"
562 }
563}
564
565impl DisplayAs for RangeManipulateExec {
566 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
567 match t {
568 DisplayFormatType::Default
569 | DisplayFormatType::Verbose
570 | DisplayFormatType::TreeRender => {
571 write!(
572 f,
573 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
574 self.start, self.end, self.interval, self.range, self.time_index_column
575 )
576 }
577 }
578 }
579}
580
581pub struct RangeManipulateStream {
582 start: Millisecond,
583 end: Millisecond,
584 interval: Millisecond,
585 range: Millisecond,
586 time_index: usize,
587 field_columns: Vec<usize>,
588 aligned_ts_array: ArrayRef,
589
590 output_schema: SchemaRef,
591 input: SendableRecordBatchStream,
592 metric: BaselineMetrics,
593 num_series: Count,
595}
596
597impl RecordBatchStream for RangeManipulateStream {
598 fn schema(&self) -> SchemaRef {
599 self.output_schema.clone()
600 }
601}
602
603impl Stream for RangeManipulateStream {
604 type Item = DataFusionResult<RecordBatch>;
605
606 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
607 let poll = loop {
608 match ready!(self.input.poll_next_unpin(cx)) {
609 Some(Ok(batch)) => {
610 let timer = std::time::Instant::now();
611 let result = self.manipulate(batch);
612 if let Ok(None) = result {
613 self.metric.elapsed_compute().add_elapsed(timer);
614 continue;
615 } else {
616 self.num_series.add(1);
617 self.metric.elapsed_compute().add_elapsed(timer);
618 break Poll::Ready(result.transpose());
619 }
620 }
621 None => {
622 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
623 break Poll::Ready(None);
624 }
625 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
626 }
627 };
628 self.metric.record_poll(poll)
629 }
630}
631
632impl RangeManipulateStream {
633 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
637 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
638 let (ranges, (start, end)) = self.calculate_range(&input)?;
640 if ranges.iter().all(|(_, len)| *len == 0) {
642 return Ok(None);
643 }
644
645 let mut new_columns = input.columns().to_vec();
647 for index in self.field_columns.iter() {
648 let _ = other_columns.remove(index);
649 let column = input.column(*index);
650 let new_column = Arc::new(
651 RangeArray::from_ranges(column.clone(), ranges.clone())
652 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
653 .into_dict(),
654 );
655 new_columns[*index] = new_column;
656 }
657
658 let ts_range_column =
660 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
661 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
662 .into_dict();
663 new_columns.push(Arc::new(ts_range_column));
664
665 let take_indices = Int64Array::from(vec![0; ranges.len()]);
667 for index in other_columns.into_iter() {
668 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
669 }
670 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
672 Self::build_aligned_ts_array(start, end, self.interval)
673 } else {
674 self.aligned_ts_array.clone()
675 };
676 new_columns[self.time_index] = new_time_index;
677
678 RecordBatch::try_new(self.output_schema.clone(), new_columns)
679 .map(Some)
680 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
681 }
682
683 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
684 Arc::new(TimestampMillisecondArray::from_iter_values(
685 (start..=end).step_by(interval as _),
686 ))
687 }
688
689 #[allow(clippy::type_complexity)]
693 fn calculate_range(
694 &self,
695 input: &RecordBatch,
696 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
697 let ts_column = input
698 .column(self.time_index)
699 .as_any()
700 .downcast_ref::<TimestampMillisecondArray>()
701 .ok_or_else(|| {
702 DataFusionError::Execution(
703 "Time index Column downcast to TimestampMillisecondArray failed".into(),
704 )
705 })?;
706
707 let len = ts_column.len();
708 if len == 0 {
709 return Ok((vec![], (self.start, self.end)));
710 }
711
712 let first_ts = ts_column.value(0);
714 let remainder = (first_ts - self.start).rem_euclid(self.interval);
716 let first_ts_aligned = if remainder == 0 {
717 first_ts
718 } else {
719 first_ts + (self.interval - remainder)
720 };
721 let last_ts = ts_column.value(ts_column.len() - 1);
722 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
723 let start = self.start.max(first_ts_aligned);
724 let end = self.end.min(last_ts_aligned);
725 if start > end {
726 return Ok((vec![], (start, end)));
727 }
728 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
729
730 let mut range_start_index = 0usize;
732 let mut last_range_start = 0;
733 let mut start_delta = 0;
734 for curr_ts in (start..=end).step_by(self.interval as _) {
735 let start_ts = curr_ts - self.range;
737
738 let mut range_start = ts_column.len();
740 let mut range_end = 0;
741 let mut cursor = range_start_index + start_delta;
742 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
744 cursor -= 1;
745 }
746
747 while cursor < ts_column.len() {
748 let ts = ts_column.value(cursor);
749 if range_start > cursor && ts >= start_ts {
750 range_start = cursor;
751 range_start_index = range_start;
752 }
753 if ts <= curr_ts {
754 range_end = range_end.max(cursor);
755 } else {
756 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
757 break;
758 }
759 cursor += 1;
760 }
761 if range_start > range_end {
762 ranges.push((0, 0));
763 start_delta = 0;
764 } else {
765 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
766 start_delta = range_start - last_range_start;
767 last_range_start = range_start;
768 }
769 }
770
771 Ok((ranges, (start, end)))
772 }
773}
774
775#[cfg(test)]
776mod test {
777 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
778 use datafusion::arrow::datatypes::{
779 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
780 };
781 use datafusion::common::ToDFSchema;
782 use datafusion::datasource::memory::MemorySourceConfig;
783 use datafusion::datasource::source::DataSourceExec;
784 use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
785 use datafusion::physical_expr::Partitioning;
786 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
787 use datafusion::physical_plan::memory::MemoryStream;
788 use datafusion::prelude::SessionContext;
789 use datatypes::arrow::array::TimestampMillisecondArray;
790 use futures::FutureExt;
791
792 use super::*;
793
794 const TIME_INDEX_COLUMN: &str = "timestamp";
795
796 fn project_batch(batch: &RecordBatch, indices: &[usize]) -> RecordBatch {
797 let fields = indices
798 .iter()
799 .map(|&idx| batch.schema().field(idx).clone())
800 .collect::<Vec<_>>();
801 let columns = indices
802 .iter()
803 .map(|&idx| batch.column(idx).clone())
804 .collect::<Vec<_>>();
805 let schema = Arc::new(Schema::new(fields));
806 RecordBatch::try_new(schema, columns).unwrap()
807 }
808
809 fn prepare_test_data() -> DataSourceExec {
810 let schema = Arc::new(Schema::new(vec![
811 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
812 Field::new("value_1", DataType::Float64, true),
813 Field::new("value_2", DataType::Float64, true),
814 Field::new("path", DataType::Utf8, true),
815 ]));
816 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
817 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
821 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
822 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
823 let data = RecordBatch::try_new(
824 schema.clone(),
825 vec![
826 timestamp_column,
827 field_column.clone(),
828 field_column,
829 path_column,
830 ],
831 )
832 .unwrap();
833
834 DataSourceExec::new(Arc::new(
835 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
836 ))
837 }
838
839 async fn do_normalize_test(
840 start: Millisecond,
841 end: Millisecond,
842 interval: Millisecond,
843 range: Millisecond,
844 expected: String,
845 ) {
846 let memory_exec = Arc::new(prepare_test_data());
847 let time_index = TIME_INDEX_COLUMN.to_string();
848 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
849 let manipulate_output_schema = SchemaRef::new(
850 RangeManipulate::calculate_output_schema(
851 &memory_exec.schema().to_dfschema_ref().unwrap(),
852 &time_index,
853 &field_columns,
854 )
855 .unwrap()
856 .as_arrow()
857 .clone(),
858 );
859 let properties = PlanProperties::new(
860 EquivalenceProperties::new(manipulate_output_schema.clone()),
861 Partitioning::UnknownPartitioning(1),
862 EmissionType::Incremental,
863 Boundedness::Bounded,
864 );
865 let normalize_exec = Arc::new(RangeManipulateExec {
866 start,
867 end,
868 interval,
869 range,
870 field_columns,
871 output_schema: manipulate_output_schema,
872 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
873 time_index_column: time_index,
874 input: memory_exec,
875 metric: ExecutionPlanMetricsSet::new(),
876 properties,
877 });
878 let session_context = SessionContext::default();
879 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
880 .await
881 .unwrap();
882 let result_literal: String = result
884 .into_iter()
885 .filter_map(|batch| {
886 batch
887 .columns()
888 .iter()
889 .map(|array| {
890 if matches!(array.data_type(), &DataType::Dictionary(..)) {
891 let dict_array = array
892 .as_any()
893 .downcast_ref::<DictionaryArray<Int64Type>>()
894 .unwrap()
895 .clone();
896 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
897 } else {
898 format!("{array:?}")
899 }
900 })
901 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
902 })
903 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
904 .unwrap();
905
906 assert_eq!(result_literal, expected);
907 }
908
909 #[tokio::test]
910 async fn pruning_should_keep_time_and_value_columns_for_exec() {
911 let schema = Arc::new(Schema::new(vec![
912 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
913 Field::new("value_1", DataType::Float64, true),
914 Field::new("value_2", DataType::Float64, true),
915 Field::new("path", DataType::Utf8, true),
916 ]));
917 let df_schema = schema.clone().to_dfschema_ref().unwrap();
918 let input = LogicalPlan::EmptyRelation(EmptyRelation {
919 produce_one_row: false,
920 schema: df_schema,
921 });
922 let plan = RangeManipulate::new(
923 0,
924 310_000,
925 30_000,
926 90_000,
927 TIME_INDEX_COLUMN.to_string(),
928 vec!["value_1".to_string(), "value_2".to_string()],
929 input,
930 )
931 .unwrap();
932
933 let output_columns = [3usize];
935 let required = plan.necessary_children_exprs(&output_columns).unwrap();
936 let required = &required[0];
937 assert_eq!(required.as_slice(), &[0, 1, 2, 3]);
938
939 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
940 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
944 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
945 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
946 let input_batch = RecordBatch::try_new(
947 schema,
948 vec![
949 timestamp_column,
950 field_column.clone(),
951 field_column,
952 path_column,
953 ],
954 )
955 .unwrap();
956
957 let projected = project_batch(&input_batch, required);
958 let projected_schema = projected.schema();
959 let memory_exec = Arc::new(DataSourceExec::new(Arc::new(
960 MemorySourceConfig::try_new(&[vec![projected]], projected_schema, None).unwrap(),
961 )));
962 let range_exec = plan.to_execution_plan(memory_exec);
963 let session_context = SessionContext::default();
964 let output_batches =
965 datafusion::physical_plan::collect(range_exec, session_context.task_ctx())
966 .await
967 .unwrap();
968 assert_eq!(output_batches.len(), 1);
969
970 let output_batch = &output_batches[0];
971 let path = output_batch
972 .column(3)
973 .as_any()
974 .downcast_ref::<StringArray>()
975 .unwrap();
976 assert!(path.iter().all(|v| v == Some("foo")));
977
978 let broken_required = [3usize];
980 let broken = project_batch(&input_batch, &broken_required);
981 let broken_schema = broken.schema();
982 let broken_exec = Arc::new(DataSourceExec::new(Arc::new(
983 MemorySourceConfig::try_new(&[vec![broken]], broken_schema, None).unwrap(),
984 )));
985 let broken_range_exec = plan.to_execution_plan(broken_exec);
986 let session_context = SessionContext::default();
987 let broken_result = std::panic::AssertUnwindSafe(async {
988 datafusion::physical_plan::collect(broken_range_exec, session_context.task_ctx()).await
989 })
990 .catch_unwind()
991 .await;
992 assert!(broken_result.is_err());
993 }
994
995 #[tokio::test]
996 async fn interval_30s_range_90s() {
997 let expected = String::from(
998 "PrimitiveArray<Timestamp(ms)>\n[\n \
999 1970-01-01T00:00:00,\n \
1000 1970-01-01T00:00:30,\n \
1001 1970-01-01T00:01:00,\n \
1002 1970-01-01T00:01:30,\n \
1003 1970-01-01T00:02:00,\n \
1004 1970-01-01T00:02:30,\n \
1005 1970-01-01T00:03:00,\n \
1006 1970-01-01T00:03:30,\n \
1007 1970-01-01T00:04:00,\n \
1008 1970-01-01T00:04:30,\n \
1009 1970-01-01T00:05:00,\n\
1010 ]\nRangeArray { \
1011 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
1012 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1013 }\nRangeArray { \
1014 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
1015 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1016 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
1017 RangeArray { \
1018 base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
1019 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
1020 }",
1021 );
1022 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
1023
1024 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
1026 }
1027
1028 #[tokio::test]
1029 async fn small_empty_range() {
1030 let expected = String::from(
1031 "PrimitiveArray<Timestamp(ms)>\n[\n \
1032 1970-01-01T00:00:00.001,\n \
1033 1970-01-01T00:00:03.001,\n \
1034 1970-01-01T00:00:06.001,\n \
1035 1970-01-01T00:00:09.001,\n\
1036 ]\nRangeArray { \
1037 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
1038 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1039 }\nRangeArray { \
1040 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
1041 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1042 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
1043 RangeArray { \
1044 base array: PrimitiveArray<Timestamp(ms)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
1045 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
1046 }",
1047 );
1048 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
1049 }
1050
1051 #[test]
1052 fn test_calculate_range_preserves_alignment() {
1053 let schema = Arc::new(Schema::new(vec![Field::new(
1056 "timestamp",
1057 TimestampMillisecondType::DATA_TYPE,
1058 false,
1059 )]));
1060 let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
1061
1062 let stream = RangeManipulateStream {
1063 start: 1758093274000, end: 1758093334000, interval: 30000, range: 60000, time_index: 0,
1068 field_columns: vec![],
1069 aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
1070 output_schema: schema.clone(),
1071 input: Box::pin(empty_stream),
1072 metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
1073 num_series: Count::new(),
1074 };
1075
1076 let test_timestamps = vec![
1078 1758093260000, 1758093290000, 1758093320000, ];
1082 let ts_array = TimestampMillisecondArray::from(test_timestamps);
1083 let test_schema = Arc::new(Schema::new(vec![Field::new(
1084 "timestamp",
1085 TimestampMillisecondType::DATA_TYPE,
1086 false,
1087 )]));
1088 let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
1089
1090 let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
1091
1092 assert_eq!(
1094 start % 30000,
1095 1758093274000 % 30000,
1096 "Optimized start should preserve query alignment pattern"
1097 );
1098
1099 let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
1101 assert_eq!(ranges.len(), expected_timestamps.len());
1102
1103 for ts in expected_timestamps {
1105 assert_eq!(
1106 ts % 30000,
1107 1758093274000 % 30000,
1108 "All timestamps should maintain query alignment pattern"
1109 );
1110 }
1111 }
1112}