1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{Stream, StreamExt, ready};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{METRIC_NUM_SERIES, Millisecond};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61 start: Millisecond,
62 end: Millisecond,
63 interval: Millisecond,
64 range: Millisecond,
65
66 time_index: String,
67 field_columns: Vec<String>,
68 input: LogicalPlan,
69 output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73 pub fn new(
74 start: Millisecond,
75 end: Millisecond,
76 interval: Millisecond,
77 range: Millisecond,
78 time_index: String,
79 field_columns: Vec<String>,
80 input: LogicalPlan,
81 ) -> DataFusionResult<Self> {
82 let output_schema =
83 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84 Ok(Self {
85 start,
86 end,
87 interval,
88 range,
89 time_index,
90 field_columns,
91 input,
92 output_schema,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "RangeManipulate"
98 }
99
100 pub fn build_timestamp_range_name(time_index: &str) -> String {
101 format!("{time_index}_range")
102 }
103
104 pub fn internal_range_end_col_name() -> String {
105 "__internal_range_end".to_string()
106 }
107
108 fn range_timestamp_name(&self) -> String {
109 Self::build_timestamp_range_name(&self.time_index)
110 }
111
112 fn calculate_output_schema(
113 input_schema: &DFSchemaRef,
114 time_index: &str,
115 field_columns: &[String],
116 ) -> DataFusionResult<DFSchemaRef> {
117 let columns = input_schema.fields();
118 let mut new_columns = Vec::with_capacity(columns.len() + 1);
119 for i in 0..columns.len() {
120 let x = input_schema.qualified_field(i);
121 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122 }
123
124 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127 return Err(datafusion::common::field_not_found(
128 None::<TableReference>,
129 time_index,
130 input_schema.as_ref(),
131 ));
132 };
133 let ts_col_field = &columns[ts_col_index];
134 let timestamp_range_field = Field::new(
135 Self::build_timestamp_range_name(time_index),
136 RangeArray::convert_field(ts_col_field).data_type().clone(),
137 ts_col_field.is_nullable(),
138 );
139 new_columns.push((None, Arc::new(timestamp_range_field)));
140
141 for name in field_columns {
143 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144 return Err(datafusion::common::field_not_found(
145 None::<TableReference>,
146 name,
147 input_schema.as_ref(),
148 ));
149 };
150 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151 }
152
153 Ok(Arc::new(DFSchema::new_with_metadata(
154 new_columns,
155 HashMap::new(),
156 )?))
157 }
158
159 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160 let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161 let properties = exec_input.properties();
162 let properties = PlanProperties::new(
163 EquivalenceProperties::new(output_schema.clone()),
164 properties.partitioning.clone(),
165 properties.emission_type,
166 properties.boundedness,
167 );
168 Arc::new(RangeManipulateExec {
169 start: self.start,
170 end: self.end,
171 interval: self.interval,
172 range: self.range,
173 time_index_column: self.time_index.clone(),
174 time_range_column: self.range_timestamp_name(),
175 field_columns: self.field_columns.clone(),
176 input: exec_input,
177 output_schema,
178 metric: ExecutionPlanMetricsSet::new(),
179 properties,
180 })
181 }
182
183 pub fn serialize(&self) -> Vec<u8> {
184 pb::RangeManipulate {
185 start: self.start,
186 end: self.end,
187 interval: self.interval,
188 range: self.range,
189 time_index: self.time_index.clone(),
190 tag_columns: self.field_columns.clone(),
191 }
192 .encode_to_vec()
193 }
194
195 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197 let empty_schema = Arc::new(DFSchema::empty());
198 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199 produce_one_row: false,
200 schema: empty_schema.clone(),
201 });
202
203 Ok(Self {
208 start: pb_range_manipulate.start,
209 end: pb_range_manipulate.end,
210 interval: pb_range_manipulate.interval,
211 range: pb_range_manipulate.range,
212 time_index: pb_range_manipulate.time_index,
213 field_columns: pb_range_manipulate.tag_columns,
214 input: placeholder_plan,
215 output_schema: empty_schema,
216 })
217 }
218}
219
220impl PartialOrd for RangeManipulate {
221 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222 match self.start.partial_cmp(&other.start) {
224 Some(core::cmp::Ordering::Equal) => {}
225 ord => return ord,
226 }
227 match self.end.partial_cmp(&other.end) {
228 Some(core::cmp::Ordering::Equal) => {}
229 ord => return ord,
230 }
231 match self.interval.partial_cmp(&other.interval) {
232 Some(core::cmp::Ordering::Equal) => {}
233 ord => return ord,
234 }
235 match self.range.partial_cmp(&other.range) {
236 Some(core::cmp::Ordering::Equal) => {}
237 ord => return ord,
238 }
239 match self.time_index.partial_cmp(&other.time_index) {
240 Some(core::cmp::Ordering::Equal) => {}
241 ord => return ord,
242 }
243 match self.field_columns.partial_cmp(&other.field_columns) {
244 Some(core::cmp::Ordering::Equal) => {}
245 ord => return ord,
246 }
247 self.input.partial_cmp(&other.input)
248 }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252 fn name(&self) -> &str {
253 Self::name()
254 }
255
256 fn inputs(&self) -> Vec<&LogicalPlan> {
257 vec![&self.input]
258 }
259
260 fn schema(&self) -> &DFSchemaRef {
261 &self.output_schema
262 }
263
264 fn expressions(&self) -> Vec<Expr> {
265 vec![]
266 }
267
268 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269 write!(
270 f,
271 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273 )
274 }
275
276 fn with_exprs_and_inputs(
277 &self,
278 _exprs: Vec<Expr>,
279 mut inputs: Vec<LogicalPlan>,
280 ) -> DataFusionResult<Self> {
281 if inputs.len() != 1 {
282 return Err(DataFusionError::Internal(
283 "RangeManipulate should have at exact one input".to_string(),
284 ));
285 }
286
287 let input: LogicalPlan = inputs.pop().unwrap();
288 let input_schema = input.schema();
289 let output_schema =
290 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292 Ok(Self {
293 start: self.start,
294 end: self.end,
295 interval: self.interval,
296 range: self.range,
297 time_index: self.time_index.clone(),
298 field_columns: self.field_columns.clone(),
299 input,
300 output_schema,
301 })
302 }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307 start: Millisecond,
308 end: Millisecond,
309 interval: Millisecond,
310 range: Millisecond,
311 time_index_column: String,
312 time_range_column: String,
313 field_columns: Vec<String>,
314
315 input: Arc<dyn ExecutionPlan>,
316 output_schema: SchemaRef,
317 metric: ExecutionPlanMetricsSet,
318 properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322 fn as_any(&self) -> &dyn Any {
323 self
324 }
325
326 fn schema(&self) -> SchemaRef {
327 self.output_schema.clone()
328 }
329
330 fn properties(&self) -> &PlanProperties {
331 &self.properties
332 }
333
334 fn maintains_input_order(&self) -> Vec<bool> {
335 vec![true; self.children().len()]
336 }
337
338 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339 vec![&self.input]
340 }
341
342 fn required_input_distribution(&self) -> Vec<Distribution> {
343 let input_requirement = self.input.required_input_distribution();
344 if input_requirement.is_empty() {
345 vec![Distribution::UnspecifiedDistribution]
348 } else {
349 input_requirement
350 }
351 }
352
353 fn with_new_children(
354 self: Arc<Self>,
355 children: Vec<Arc<dyn ExecutionPlan>>,
356 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
357 assert!(!children.is_empty());
358 let exec_input = children[0].clone();
359 let properties = exec_input.properties();
360 let properties = PlanProperties::new(
361 EquivalenceProperties::new(self.output_schema.clone()),
362 properties.partitioning.clone(),
363 properties.emission_type,
364 properties.boundedness,
365 );
366 Ok(Arc::new(Self {
367 start: self.start,
368 end: self.end,
369 interval: self.interval,
370 range: self.range,
371 time_index_column: self.time_index_column.clone(),
372 time_range_column: self.time_range_column.clone(),
373 field_columns: self.field_columns.clone(),
374 output_schema: self.output_schema.clone(),
375 input: children[0].clone(),
376 metric: self.metric.clone(),
377 properties,
378 }))
379 }
380
381 fn execute(
382 &self,
383 partition: usize,
384 context: Arc<TaskContext>,
385 ) -> DataFusionResult<SendableRecordBatchStream> {
386 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
387 let metrics_builder = MetricBuilder::new(&self.metric);
388 let num_series = Count::new();
389 metrics_builder
390 .with_partition(partition)
391 .build(MetricValue::Count {
392 name: METRIC_NUM_SERIES.into(),
393 count: num_series.clone(),
394 });
395
396 let input = self.input.execute(partition, context)?;
397 let schema = input.schema();
398 let time_index = schema
399 .column_with_name(&self.time_index_column)
400 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
401 .0;
402 let field_columns = self
403 .field_columns
404 .iter()
405 .map(|value_col| {
406 schema
407 .column_with_name(value_col)
408 .unwrap_or_else(|| panic!("value column {value_col} not found",))
409 .0
410 })
411 .collect();
412 let aligned_ts_array =
413 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
414 Ok(Box::pin(RangeManipulateStream {
415 start: self.start,
416 end: self.end,
417 interval: self.interval,
418 range: self.range,
419 time_index,
420 field_columns,
421 aligned_ts_array,
422 output_schema: self.output_schema.clone(),
423 input,
424 metric: baseline_metric,
425 num_series,
426 }))
427 }
428
429 fn metrics(&self) -> Option<MetricsSet> {
430 Some(self.metric.clone_inner())
431 }
432
433 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
434 let input_stats = self.input.partition_statistics(partition)?;
435
436 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
437 let estimated_total_bytes = input_stats
438 .total_byte_size
439 .get_value()
440 .zip(input_stats.num_rows.get_value())
441 .map(|(size, rows)| {
442 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
443 })
444 .unwrap_or_default();
445
446 Ok(Statistics {
447 num_rows: Precision::Inexact(estimated_row_num as _),
448 total_byte_size: estimated_total_bytes,
449 column_statistics: Statistics::unknown_column(&self.schema()),
451 })
452 }
453
454 fn name(&self) -> &str {
455 "RangeManipulateExec"
456 }
457}
458
459impl DisplayAs for RangeManipulateExec {
460 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
461 match t {
462 DisplayFormatType::Default
463 | DisplayFormatType::Verbose
464 | DisplayFormatType::TreeRender => {
465 write!(
466 f,
467 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
468 self.start, self.end, self.interval, self.range, self.time_index_column
469 )
470 }
471 }
472 }
473}
474
475pub struct RangeManipulateStream {
476 start: Millisecond,
477 end: Millisecond,
478 interval: Millisecond,
479 range: Millisecond,
480 time_index: usize,
481 field_columns: Vec<usize>,
482 aligned_ts_array: ArrayRef,
483
484 output_schema: SchemaRef,
485 input: SendableRecordBatchStream,
486 metric: BaselineMetrics,
487 num_series: Count,
489}
490
491impl RecordBatchStream for RangeManipulateStream {
492 fn schema(&self) -> SchemaRef {
493 self.output_schema.clone()
494 }
495}
496
497impl Stream for RangeManipulateStream {
498 type Item = DataFusionResult<RecordBatch>;
499
500 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501 let poll = loop {
502 match ready!(self.input.poll_next_unpin(cx)) {
503 Some(Ok(batch)) => {
504 let timer = std::time::Instant::now();
505 let result = self.manipulate(batch);
506 if let Ok(None) = result {
507 self.metric.elapsed_compute().add_elapsed(timer);
508 continue;
509 } else {
510 self.num_series.add(1);
511 self.metric.elapsed_compute().add_elapsed(timer);
512 break Poll::Ready(result.transpose());
513 }
514 }
515 None => {
516 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
517 break Poll::Ready(None);
518 }
519 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
520 }
521 };
522 self.metric.record_poll(poll)
523 }
524}
525
526impl RangeManipulateStream {
527 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
531 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
532 let (ranges, (start, end)) = self.calculate_range(&input)?;
534 if ranges.iter().all(|(_, len)| *len == 0) {
536 return Ok(None);
537 }
538
539 let mut new_columns = input.columns().to_vec();
541 for index in self.field_columns.iter() {
542 let _ = other_columns.remove(index);
543 let column = input.column(*index);
544 let new_column = Arc::new(
545 RangeArray::from_ranges(column.clone(), ranges.clone())
546 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
547 .into_dict(),
548 );
549 new_columns[*index] = new_column;
550 }
551
552 let ts_range_column =
554 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
555 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
556 .into_dict();
557 new_columns.push(Arc::new(ts_range_column));
558
559 let take_indices = Int64Array::from(vec![0; ranges.len()]);
561 for index in other_columns.into_iter() {
562 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
563 }
564 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
566 Self::build_aligned_ts_array(start, end, self.interval)
567 } else {
568 self.aligned_ts_array.clone()
569 };
570 new_columns[self.time_index] = new_time_index;
571
572 RecordBatch::try_new(self.output_schema.clone(), new_columns)
573 .map(Some)
574 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
575 }
576
577 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
578 Arc::new(TimestampMillisecondArray::from_iter_values(
579 (start..=end).step_by(interval as _),
580 ))
581 }
582
583 #[allow(clippy::type_complexity)]
587 fn calculate_range(
588 &self,
589 input: &RecordBatch,
590 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
591 let ts_column = input
592 .column(self.time_index)
593 .as_any()
594 .downcast_ref::<TimestampMillisecondArray>()
595 .ok_or_else(|| {
596 DataFusionError::Execution(
597 "Time index Column downcast to TimestampMillisecondArray failed".into(),
598 )
599 })?;
600
601 let len = ts_column.len();
602 if len == 0 {
603 return Ok((vec![], (self.start, self.end)));
604 }
605
606 let first_ts = ts_column.value(0);
608 let remainder = (first_ts - self.start).rem_euclid(self.interval);
610 let first_ts_aligned = if remainder == 0 {
611 first_ts
612 } else {
613 first_ts + (self.interval - remainder)
614 };
615 let last_ts = ts_column.value(ts_column.len() - 1);
616 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
617 let start = self.start.max(first_ts_aligned);
618 let end = self.end.min(last_ts_aligned);
619 if start > end {
620 return Ok((vec![], (start, end)));
621 }
622 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
623
624 let mut range_start_index = 0usize;
626 let mut last_range_start = 0;
627 let mut start_delta = 0;
628 for curr_ts in (start..=end).step_by(self.interval as _) {
629 let start_ts = curr_ts - self.range;
631
632 let mut range_start = ts_column.len();
634 let mut range_end = 0;
635 let mut cursor = range_start_index + start_delta;
636 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
638 cursor -= 1;
639 }
640
641 while cursor < ts_column.len() {
642 let ts = ts_column.value(cursor);
643 if range_start > cursor && ts >= start_ts {
644 range_start = cursor;
645 range_start_index = range_start;
646 }
647 if ts <= curr_ts {
648 range_end = range_end.max(cursor);
649 } else {
650 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
651 break;
652 }
653 cursor += 1;
654 }
655 if range_start > range_end {
656 ranges.push((0, 0));
657 start_delta = 0;
658 } else {
659 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
660 start_delta = range_start - last_range_start;
661 last_range_start = range_start;
662 }
663 }
664
665 Ok((ranges, (start, end)))
666 }
667}
668
669#[cfg(test)]
670mod test {
671 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
672 use datafusion::arrow::datatypes::{
673 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
674 };
675 use datafusion::common::ToDFSchema;
676 use datafusion::datasource::memory::MemorySourceConfig;
677 use datafusion::datasource::source::DataSourceExec;
678 use datafusion::physical_expr::Partitioning;
679 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
680 use datafusion::physical_plan::memory::MemoryStream;
681 use datafusion::prelude::SessionContext;
682 use datatypes::arrow::array::TimestampMillisecondArray;
683
684 use super::*;
685
686 const TIME_INDEX_COLUMN: &str = "timestamp";
687
688 fn prepare_test_data() -> DataSourceExec {
689 let schema = Arc::new(Schema::new(vec![
690 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
691 Field::new("value_1", DataType::Float64, true),
692 Field::new("value_2", DataType::Float64, true),
693 Field::new("path", DataType::Utf8, true),
694 ]));
695 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
696 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
700 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
701 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
702 let data = RecordBatch::try_new(
703 schema.clone(),
704 vec![
705 timestamp_column,
706 field_column.clone(),
707 field_column,
708 path_column,
709 ],
710 )
711 .unwrap();
712
713 DataSourceExec::new(Arc::new(
714 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
715 ))
716 }
717
718 async fn do_normalize_test(
719 start: Millisecond,
720 end: Millisecond,
721 interval: Millisecond,
722 range: Millisecond,
723 expected: String,
724 ) {
725 let memory_exec = Arc::new(prepare_test_data());
726 let time_index = TIME_INDEX_COLUMN.to_string();
727 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
728 let manipulate_output_schema = SchemaRef::new(
729 RangeManipulate::calculate_output_schema(
730 &memory_exec.schema().to_dfschema_ref().unwrap(),
731 &time_index,
732 &field_columns,
733 )
734 .unwrap()
735 .as_ref()
736 .into(),
737 );
738 let properties = PlanProperties::new(
739 EquivalenceProperties::new(manipulate_output_schema.clone()),
740 Partitioning::UnknownPartitioning(1),
741 EmissionType::Incremental,
742 Boundedness::Bounded,
743 );
744 let normalize_exec = Arc::new(RangeManipulateExec {
745 start,
746 end,
747 interval,
748 range,
749 field_columns,
750 output_schema: manipulate_output_schema,
751 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
752 time_index_column: time_index,
753 input: memory_exec,
754 metric: ExecutionPlanMetricsSet::new(),
755 properties,
756 });
757 let session_context = SessionContext::default();
758 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
759 .await
760 .unwrap();
761 let result_literal: String = result
763 .into_iter()
764 .filter_map(|batch| {
765 batch
766 .columns()
767 .iter()
768 .map(|array| {
769 if matches!(array.data_type(), &DataType::Dictionary(..)) {
770 let dict_array = array
771 .as_any()
772 .downcast_ref::<DictionaryArray<Int64Type>>()
773 .unwrap()
774 .clone();
775 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
776 } else {
777 format!("{array:?}")
778 }
779 })
780 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
781 })
782 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
783 .unwrap();
784
785 assert_eq!(result_literal, expected);
786 }
787
788 #[tokio::test]
789 async fn interval_30s_range_90s() {
790 let expected = String::from(
791 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
792 1970-01-01T00:00:00,\n \
793 1970-01-01T00:00:30,\n \
794 1970-01-01T00:01:00,\n \
795 1970-01-01T00:01:30,\n \
796 1970-01-01T00:02:00,\n \
797 1970-01-01T00:02:30,\n \
798 1970-01-01T00:03:00,\n \
799 1970-01-01T00:03:30,\n \
800 1970-01-01T00:04:00,\n \
801 1970-01-01T00:04:30,\n \
802 1970-01-01T00:05:00,\n\
803 ]\nRangeArray { \
804 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
805 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
806 }\nRangeArray { \
807 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
808 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
809 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
810 RangeArray { \
811 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
812 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
813 }",
814 );
815 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
816
817 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
819 }
820
821 #[tokio::test]
822 async fn small_empty_range() {
823 let expected = String::from(
824 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
825 1970-01-01T00:00:00.001,\n \
826 1970-01-01T00:00:03.001,\n \
827 1970-01-01T00:00:06.001,\n \
828 1970-01-01T00:00:09.001,\n\
829 ]\nRangeArray { \
830 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
831 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
832 }\nRangeArray { \
833 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
834 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
835 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
836 RangeArray { \
837 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
838 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
839 }",
840 );
841 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
842 }
843
844 #[test]
845 fn test_calculate_range_preserves_alignment() {
846 let schema = Arc::new(Schema::new(vec![Field::new(
849 "timestamp",
850 TimestampMillisecondType::DATA_TYPE,
851 false,
852 )]));
853 let empty_stream = MemoryStream::try_new(vec![], schema.clone(), None).unwrap();
854
855 let stream = RangeManipulateStream {
856 start: 1758093274000, end: 1758093334000, interval: 30000, range: 60000, time_index: 0,
861 field_columns: vec![],
862 aligned_ts_array: Arc::new(TimestampMillisecondArray::from(vec![0i64; 0])),
863 output_schema: schema.clone(),
864 input: Box::pin(empty_stream),
865 metric: BaselineMetrics::new(&ExecutionPlanMetricsSet::new(), 0),
866 num_series: Count::new(),
867 };
868
869 let test_timestamps = vec![
871 1758093260000, 1758093290000, 1758093320000, ];
875 let ts_array = TimestampMillisecondArray::from(test_timestamps);
876 let test_schema = Arc::new(Schema::new(vec![Field::new(
877 "timestamp",
878 TimestampMillisecondType::DATA_TYPE,
879 false,
880 )]));
881 let batch = RecordBatch::try_new(test_schema, vec![Arc::new(ts_array)]).unwrap();
882
883 let (ranges, (start, end)) = stream.calculate_range(&batch).unwrap();
884
885 assert_eq!(
887 start % 30000,
888 1758093274000 % 30000,
889 "Optimized start should preserve query alignment pattern"
890 );
891
892 let expected_timestamps: Vec<i64> = (start..=end).step_by(30000).collect();
894 assert_eq!(ranges.len(), expected_timestamps.len());
895
896 for ts in expected_timestamps {
898 assert_eq!(
899 ts % 30000,
900 1758093274000 % 30000,
901 "All timestamps should maintain query alignment pattern"
902 );
903 }
904 }
905}