1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{ready, Stream, StreamExt};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61 start: Millisecond,
62 end: Millisecond,
63 interval: Millisecond,
64 range: Millisecond,
65
66 time_index: String,
67 field_columns: Vec<String>,
68 input: LogicalPlan,
69 output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73 pub fn new(
74 start: Millisecond,
75 end: Millisecond,
76 interval: Millisecond,
77 range: Millisecond,
78 time_index: String,
79 field_columns: Vec<String>,
80 input: LogicalPlan,
81 ) -> DataFusionResult<Self> {
82 let output_schema =
83 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84 Ok(Self {
85 start,
86 end,
87 interval,
88 range,
89 time_index,
90 field_columns,
91 input,
92 output_schema,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "RangeManipulate"
98 }
99
100 pub fn build_timestamp_range_name(time_index: &str) -> String {
101 format!("{time_index}_range")
102 }
103
104 pub fn internal_range_end_col_name() -> String {
105 "__internal_range_end".to_string()
106 }
107
108 fn range_timestamp_name(&self) -> String {
109 Self::build_timestamp_range_name(&self.time_index)
110 }
111
112 fn calculate_output_schema(
113 input_schema: &DFSchemaRef,
114 time_index: &str,
115 field_columns: &[String],
116 ) -> DataFusionResult<DFSchemaRef> {
117 let columns = input_schema.fields();
118 let mut new_columns = Vec::with_capacity(columns.len() + 1);
119 for i in 0..columns.len() {
120 let x = input_schema.qualified_field(i);
121 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122 }
123
124 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127 return Err(datafusion::common::field_not_found(
128 None::<TableReference>,
129 time_index,
130 input_schema.as_ref(),
131 ));
132 };
133 let ts_col_field = &columns[ts_col_index];
134 let timestamp_range_field = Field::new(
135 Self::build_timestamp_range_name(time_index),
136 RangeArray::convert_field(ts_col_field).data_type().clone(),
137 ts_col_field.is_nullable(),
138 );
139 new_columns.push((None, Arc::new(timestamp_range_field)));
140
141 for name in field_columns {
143 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144 return Err(datafusion::common::field_not_found(
145 None::<TableReference>,
146 name,
147 input_schema.as_ref(),
148 ));
149 };
150 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151 }
152
153 Ok(Arc::new(DFSchema::new_with_metadata(
154 new_columns,
155 HashMap::new(),
156 )?))
157 }
158
159 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160 let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161 let properties = exec_input.properties();
162 let properties = PlanProperties::new(
163 EquivalenceProperties::new(output_schema.clone()),
164 properties.partitioning.clone(),
165 properties.emission_type,
166 properties.boundedness,
167 );
168 Arc::new(RangeManipulateExec {
169 start: self.start,
170 end: self.end,
171 interval: self.interval,
172 range: self.range,
173 time_index_column: self.time_index.clone(),
174 time_range_column: self.range_timestamp_name(),
175 field_columns: self.field_columns.clone(),
176 input: exec_input,
177 output_schema,
178 metric: ExecutionPlanMetricsSet::new(),
179 properties,
180 })
181 }
182
183 pub fn serialize(&self) -> Vec<u8> {
184 pb::RangeManipulate {
185 start: self.start,
186 end: self.end,
187 interval: self.interval,
188 range: self.range,
189 time_index: self.time_index.clone(),
190 tag_columns: self.field_columns.clone(),
191 }
192 .encode_to_vec()
193 }
194
195 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197 let empty_schema = Arc::new(DFSchema::empty());
198 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199 produce_one_row: false,
200 schema: empty_schema.clone(),
201 });
202
203 Ok(Self {
208 start: pb_range_manipulate.start,
209 end: pb_range_manipulate.end,
210 interval: pb_range_manipulate.interval,
211 range: pb_range_manipulate.range,
212 time_index: pb_range_manipulate.time_index,
213 field_columns: pb_range_manipulate.tag_columns,
214 input: placeholder_plan,
215 output_schema: empty_schema,
216 })
217 }
218}
219
220impl PartialOrd for RangeManipulate {
221 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222 match self.start.partial_cmp(&other.start) {
224 Some(core::cmp::Ordering::Equal) => {}
225 ord => return ord,
226 }
227 match self.end.partial_cmp(&other.end) {
228 Some(core::cmp::Ordering::Equal) => {}
229 ord => return ord,
230 }
231 match self.interval.partial_cmp(&other.interval) {
232 Some(core::cmp::Ordering::Equal) => {}
233 ord => return ord,
234 }
235 match self.range.partial_cmp(&other.range) {
236 Some(core::cmp::Ordering::Equal) => {}
237 ord => return ord,
238 }
239 match self.time_index.partial_cmp(&other.time_index) {
240 Some(core::cmp::Ordering::Equal) => {}
241 ord => return ord,
242 }
243 match self.field_columns.partial_cmp(&other.field_columns) {
244 Some(core::cmp::Ordering::Equal) => {}
245 ord => return ord,
246 }
247 self.input.partial_cmp(&other.input)
248 }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252 fn name(&self) -> &str {
253 Self::name()
254 }
255
256 fn inputs(&self) -> Vec<&LogicalPlan> {
257 vec![&self.input]
258 }
259
260 fn schema(&self) -> &DFSchemaRef {
261 &self.output_schema
262 }
263
264 fn expressions(&self) -> Vec<Expr> {
265 vec![]
266 }
267
268 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269 write!(
270 f,
271 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273 )
274 }
275
276 fn with_exprs_and_inputs(
277 &self,
278 _exprs: Vec<Expr>,
279 mut inputs: Vec<LogicalPlan>,
280 ) -> DataFusionResult<Self> {
281 if inputs.len() != 1 {
282 return Err(DataFusionError::Internal(
283 "RangeManipulate should have at exact one input".to_string(),
284 ));
285 }
286
287 let input: LogicalPlan = inputs.pop().unwrap();
288 let input_schema = input.schema();
289 let output_schema =
290 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292 Ok(Self {
293 start: self.start,
294 end: self.end,
295 interval: self.interval,
296 range: self.range,
297 time_index: self.time_index.clone(),
298 field_columns: self.field_columns.clone(),
299 input,
300 output_schema,
301 })
302 }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307 start: Millisecond,
308 end: Millisecond,
309 interval: Millisecond,
310 range: Millisecond,
311 time_index_column: String,
312 time_range_column: String,
313 field_columns: Vec<String>,
314
315 input: Arc<dyn ExecutionPlan>,
316 output_schema: SchemaRef,
317 metric: ExecutionPlanMetricsSet,
318 properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322 fn as_any(&self) -> &dyn Any {
323 self
324 }
325
326 fn schema(&self) -> SchemaRef {
327 self.output_schema.clone()
328 }
329
330 fn properties(&self) -> &PlanProperties {
331 &self.properties
332 }
333
334 fn maintains_input_order(&self) -> Vec<bool> {
335 vec![true; self.children().len()]
336 }
337
338 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339 vec![&self.input]
340 }
341
342 fn required_input_distribution(&self) -> Vec<Distribution> {
343 let input_requirement = self.input.required_input_distribution();
344 if input_requirement.is_empty() {
345 vec![Distribution::UnspecifiedDistribution]
348 } else {
349 input_requirement
350 }
351 }
352
353 fn with_new_children(
354 self: Arc<Self>,
355 children: Vec<Arc<dyn ExecutionPlan>>,
356 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
357 assert!(!children.is_empty());
358 let exec_input = children[0].clone();
359 let properties = exec_input.properties();
360 let properties = PlanProperties::new(
361 EquivalenceProperties::new(self.output_schema.clone()),
362 properties.partitioning.clone(),
363 properties.emission_type,
364 properties.boundedness,
365 );
366 Ok(Arc::new(Self {
367 start: self.start,
368 end: self.end,
369 interval: self.interval,
370 range: self.range,
371 time_index_column: self.time_index_column.clone(),
372 time_range_column: self.time_range_column.clone(),
373 field_columns: self.field_columns.clone(),
374 output_schema: self.output_schema.clone(),
375 input: children[0].clone(),
376 metric: self.metric.clone(),
377 properties,
378 }))
379 }
380
381 fn execute(
382 &self,
383 partition: usize,
384 context: Arc<TaskContext>,
385 ) -> DataFusionResult<SendableRecordBatchStream> {
386 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
387 let metrics_builder = MetricBuilder::new(&self.metric);
388 let num_series = Count::new();
389 metrics_builder
390 .with_partition(partition)
391 .build(MetricValue::Count {
392 name: METRIC_NUM_SERIES.into(),
393 count: num_series.clone(),
394 });
395
396 let input = self.input.execute(partition, context)?;
397 let schema = input.schema();
398 let time_index = schema
399 .column_with_name(&self.time_index_column)
400 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
401 .0;
402 let field_columns = self
403 .field_columns
404 .iter()
405 .map(|value_col| {
406 schema
407 .column_with_name(value_col)
408 .unwrap_or_else(|| panic!("value column {value_col} not found",))
409 .0
410 })
411 .collect();
412 let aligned_ts_array =
413 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
414 Ok(Box::pin(RangeManipulateStream {
415 start: self.start,
416 end: self.end,
417 interval: self.interval,
418 range: self.range,
419 time_index,
420 field_columns,
421 aligned_ts_array,
422 output_schema: self.output_schema.clone(),
423 input,
424 metric: baseline_metric,
425 num_series,
426 }))
427 }
428
429 fn metrics(&self) -> Option<MetricsSet> {
430 Some(self.metric.clone_inner())
431 }
432
433 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
434 let input_stats = self.input.partition_statistics(partition)?;
435
436 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
437 let estimated_total_bytes = input_stats
438 .total_byte_size
439 .get_value()
440 .zip(input_stats.num_rows.get_value())
441 .map(|(size, rows)| {
442 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
443 })
444 .unwrap_or_default();
445
446 Ok(Statistics {
447 num_rows: Precision::Inexact(estimated_row_num as _),
448 total_byte_size: estimated_total_bytes,
449 column_statistics: Statistics::unknown_column(&self.schema()),
451 })
452 }
453
454 fn name(&self) -> &str {
455 "RangeManipulateExec"
456 }
457}
458
459impl DisplayAs for RangeManipulateExec {
460 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
461 match t {
462 DisplayFormatType::Default
463 | DisplayFormatType::Verbose
464 | DisplayFormatType::TreeRender => {
465 write!(
466 f,
467 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
468 self.start, self.end, self.interval, self.range, self.time_index_column
469 )
470 }
471 }
472 }
473}
474
475pub struct RangeManipulateStream {
476 start: Millisecond,
477 end: Millisecond,
478 interval: Millisecond,
479 range: Millisecond,
480 time_index: usize,
481 field_columns: Vec<usize>,
482 aligned_ts_array: ArrayRef,
483
484 output_schema: SchemaRef,
485 input: SendableRecordBatchStream,
486 metric: BaselineMetrics,
487 num_series: Count,
489}
490
491impl RecordBatchStream for RangeManipulateStream {
492 fn schema(&self) -> SchemaRef {
493 self.output_schema.clone()
494 }
495}
496
497impl Stream for RangeManipulateStream {
498 type Item = DataFusionResult<RecordBatch>;
499
500 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
501 let poll = loop {
502 match ready!(self.input.poll_next_unpin(cx)) {
503 Some(Ok(batch)) => {
504 let timer = std::time::Instant::now();
505 let result = self.manipulate(batch);
506 if let Ok(None) = result {
507 self.metric.elapsed_compute().add_elapsed(timer);
508 continue;
509 } else {
510 self.num_series.add(1);
511 self.metric.elapsed_compute().add_elapsed(timer);
512 break Poll::Ready(result.transpose());
513 }
514 }
515 None => {
516 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
517 break Poll::Ready(None);
518 }
519 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
520 }
521 };
522 self.metric.record_poll(poll)
523 }
524}
525
526impl RangeManipulateStream {
527 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
531 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
532 let (ranges, (start, end)) = self.calculate_range(&input)?;
534 if ranges.iter().all(|(_, len)| *len == 0) {
536 return Ok(None);
537 }
538
539 let mut new_columns = input.columns().to_vec();
541 for index in self.field_columns.iter() {
542 let _ = other_columns.remove(index);
543 let column = input.column(*index);
544 let new_column = Arc::new(
545 RangeArray::from_ranges(column.clone(), ranges.clone())
546 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
547 .into_dict(),
548 );
549 new_columns[*index] = new_column;
550 }
551
552 let ts_range_column =
554 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
555 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
556 .into_dict();
557 new_columns.push(Arc::new(ts_range_column));
558
559 let take_indices = Int64Array::from(vec![0; ranges.len()]);
561 for index in other_columns.into_iter() {
562 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
563 }
564 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
566 Self::build_aligned_ts_array(start, end, self.interval)
567 } else {
568 self.aligned_ts_array.clone()
569 };
570 new_columns[self.time_index] = new_time_index;
571
572 RecordBatch::try_new(self.output_schema.clone(), new_columns)
573 .map(Some)
574 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
575 }
576
577 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
578 Arc::new(TimestampMillisecondArray::from_iter_values(
579 (start..=end).step_by(interval as _),
580 ))
581 }
582
583 #[allow(clippy::type_complexity)]
587 fn calculate_range(
588 &self,
589 input: &RecordBatch,
590 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
591 let ts_column = input
592 .column(self.time_index)
593 .as_any()
594 .downcast_ref::<TimestampMillisecondArray>()
595 .ok_or_else(|| {
596 DataFusionError::Execution(
597 "Time index Column downcast to TimestampMillisecondArray failed".into(),
598 )
599 })?;
600
601 let len = ts_column.len();
602 if len == 0 {
603 return Ok((vec![], (self.start, self.end)));
604 }
605
606 let first_ts = ts_column.value(0);
608 let first_ts_aligned = (first_ts / self.interval) * self.interval;
609 let last_ts = ts_column.value(ts_column.len() - 1);
610 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
611 let start = self.start.max(first_ts_aligned);
612 let end = self.end.min(last_ts_aligned);
613 if start > end {
614 return Ok((vec![], (start, end)));
615 }
616 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
617
618 let mut range_start_index = 0usize;
620 let mut last_range_start = 0;
621 let mut start_delta = 0;
622 for curr_ts in (start..=end).step_by(self.interval as _) {
623 let start_ts = curr_ts - self.range;
625
626 let mut range_start = ts_column.len();
628 let mut range_end = 0;
629 let mut cursor = range_start_index + start_delta;
630 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
632 cursor -= 1;
633 }
634
635 while cursor < ts_column.len() {
636 let ts = ts_column.value(cursor);
637 if range_start > cursor && ts >= start_ts {
638 range_start = cursor;
639 range_start_index = range_start;
640 }
641 if ts <= curr_ts {
642 range_end = range_end.max(cursor);
643 } else {
644 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
645 break;
646 }
647 cursor += 1;
648 }
649 if range_start > range_end {
650 ranges.push((0, 0));
651 start_delta = 0;
652 } else {
653 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
654 start_delta = range_start - last_range_start;
655 last_range_start = range_start;
656 }
657 }
658
659 Ok((ranges, (start, end)))
660 }
661}
662
663#[cfg(test)]
664mod test {
665 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
666 use datafusion::arrow::datatypes::{
667 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
668 };
669 use datafusion::common::ToDFSchema;
670 use datafusion::datasource::memory::MemorySourceConfig;
671 use datafusion::datasource::source::DataSourceExec;
672 use datafusion::physical_expr::Partitioning;
673 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
674 use datafusion::prelude::SessionContext;
675 use datatypes::arrow::array::TimestampMillisecondArray;
676
677 use super::*;
678
679 const TIME_INDEX_COLUMN: &str = "timestamp";
680
681 fn prepare_test_data() -> DataSourceExec {
682 let schema = Arc::new(Schema::new(vec![
683 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
684 Field::new("value_1", DataType::Float64, true),
685 Field::new("value_2", DataType::Float64, true),
686 Field::new("path", DataType::Utf8, true),
687 ]));
688 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
689 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
693 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
694 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
695 let data = RecordBatch::try_new(
696 schema.clone(),
697 vec![
698 timestamp_column,
699 field_column.clone(),
700 field_column,
701 path_column,
702 ],
703 )
704 .unwrap();
705
706 DataSourceExec::new(Arc::new(
707 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
708 ))
709 }
710
711 async fn do_normalize_test(
712 start: Millisecond,
713 end: Millisecond,
714 interval: Millisecond,
715 range: Millisecond,
716 expected: String,
717 ) {
718 let memory_exec = Arc::new(prepare_test_data());
719 let time_index = TIME_INDEX_COLUMN.to_string();
720 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
721 let manipulate_output_schema = SchemaRef::new(
722 RangeManipulate::calculate_output_schema(
723 &memory_exec.schema().to_dfschema_ref().unwrap(),
724 &time_index,
725 &field_columns,
726 )
727 .unwrap()
728 .as_ref()
729 .into(),
730 );
731 let properties = PlanProperties::new(
732 EquivalenceProperties::new(manipulate_output_schema.clone()),
733 Partitioning::UnknownPartitioning(1),
734 EmissionType::Incremental,
735 Boundedness::Bounded,
736 );
737 let normalize_exec = Arc::new(RangeManipulateExec {
738 start,
739 end,
740 interval,
741 range,
742 field_columns,
743 output_schema: manipulate_output_schema,
744 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
745 time_index_column: time_index,
746 input: memory_exec,
747 metric: ExecutionPlanMetricsSet::new(),
748 properties,
749 });
750 let session_context = SessionContext::default();
751 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
752 .await
753 .unwrap();
754 let result_literal: String = result
756 .into_iter()
757 .filter_map(|batch| {
758 batch
759 .columns()
760 .iter()
761 .map(|array| {
762 if matches!(array.data_type(), &DataType::Dictionary(..)) {
763 let dict_array = array
764 .as_any()
765 .downcast_ref::<DictionaryArray<Int64Type>>()
766 .unwrap()
767 .clone();
768 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
769 } else {
770 format!("{array:?}")
771 }
772 })
773 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
774 })
775 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
776 .unwrap();
777
778 assert_eq!(result_literal, expected);
779 }
780
781 #[tokio::test]
782 async fn interval_30s_range_90s() {
783 let expected = String::from(
784 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
785 1970-01-01T00:00:00,\n \
786 1970-01-01T00:00:30,\n \
787 1970-01-01T00:01:00,\n \
788 1970-01-01T00:01:30,\n \
789 1970-01-01T00:02:00,\n \
790 1970-01-01T00:02:30,\n \
791 1970-01-01T00:03:00,\n \
792 1970-01-01T00:03:30,\n \
793 1970-01-01T00:04:00,\n \
794 1970-01-01T00:04:30,\n \
795 1970-01-01T00:05:00,\n\
796 ]\nRangeArray { \
797 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
798 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
799 }\nRangeArray { \
800 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
801 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
802 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
803 RangeArray { \
804 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
805 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
806 }",
807);
808 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
809
810 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
812 }
813
814 #[tokio::test]
815 async fn small_empty_range() {
816 let expected = String::from(
817 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
818 1970-01-01T00:00:00.001,\n \
819 1970-01-01T00:00:03.001,\n \
820 1970-01-01T00:00:06.001,\n \
821 1970-01-01T00:00:09.001,\n\
822 ]\nRangeArray { \
823 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
824 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
825 }\nRangeArray { \
826 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
827 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
828 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
829 RangeArray { \
830 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
831 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
832 }");
833 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
834 }
835}