1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{ready, Stream, StreamExt};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61 start: Millisecond,
62 end: Millisecond,
63 interval: Millisecond,
64 range: Millisecond,
65
66 time_index: String,
67 field_columns: Vec<String>,
68 input: LogicalPlan,
69 output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73 pub fn new(
74 start: Millisecond,
75 end: Millisecond,
76 interval: Millisecond,
77 range: Millisecond,
78 time_index: String,
79 field_columns: Vec<String>,
80 input: LogicalPlan,
81 ) -> DataFusionResult<Self> {
82 let output_schema =
83 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84 Ok(Self {
85 start,
86 end,
87 interval,
88 range,
89 time_index,
90 field_columns,
91 input,
92 output_schema,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "RangeManipulate"
98 }
99
100 pub fn build_timestamp_range_name(time_index: &str) -> String {
101 format!("{time_index}_range")
102 }
103
104 pub fn internal_range_end_col_name() -> String {
105 "__internal_range_end".to_string()
106 }
107
108 fn range_timestamp_name(&self) -> String {
109 Self::build_timestamp_range_name(&self.time_index)
110 }
111
112 fn calculate_output_schema(
113 input_schema: &DFSchemaRef,
114 time_index: &str,
115 field_columns: &[String],
116 ) -> DataFusionResult<DFSchemaRef> {
117 let columns = input_schema.fields();
118 let mut new_columns = Vec::with_capacity(columns.len() + 1);
119 for i in 0..columns.len() {
120 let x = input_schema.qualified_field(i);
121 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122 }
123
124 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127 return Err(datafusion::common::field_not_found(
128 None::<TableReference>,
129 time_index,
130 input_schema.as_ref(),
131 ));
132 };
133 let ts_col_field = &columns[ts_col_index];
134 let timestamp_range_field = Field::new(
135 Self::build_timestamp_range_name(time_index),
136 RangeArray::convert_field(ts_col_field).data_type().clone(),
137 ts_col_field.is_nullable(),
138 );
139 new_columns.push((None, Arc::new(timestamp_range_field)));
140
141 for name in field_columns {
143 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144 return Err(datafusion::common::field_not_found(
145 None::<TableReference>,
146 name,
147 input_schema.as_ref(),
148 ));
149 };
150 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151 }
152
153 Ok(Arc::new(DFSchema::new_with_metadata(
154 new_columns,
155 HashMap::new(),
156 )?))
157 }
158
159 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160 let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161 let properties = exec_input.properties();
162 let properties = PlanProperties::new(
163 EquivalenceProperties::new(output_schema.clone()),
164 properties.partitioning.clone(),
165 properties.emission_type,
166 properties.boundedness,
167 );
168 Arc::new(RangeManipulateExec {
169 start: self.start,
170 end: self.end,
171 interval: self.interval,
172 range: self.range,
173 time_index_column: self.time_index.clone(),
174 time_range_column: self.range_timestamp_name(),
175 field_columns: self.field_columns.clone(),
176 input: exec_input,
177 output_schema,
178 metric: ExecutionPlanMetricsSet::new(),
179 properties,
180 })
181 }
182
183 pub fn serialize(&self) -> Vec<u8> {
184 pb::RangeManipulate {
185 start: self.start,
186 end: self.end,
187 interval: self.interval,
188 range: self.range,
189 time_index: self.time_index.clone(),
190 tag_columns: self.field_columns.clone(),
191 }
192 .encode_to_vec()
193 }
194
195 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197 let empty_schema = Arc::new(DFSchema::empty());
198 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199 produce_one_row: false,
200 schema: empty_schema.clone(),
201 });
202
203 Ok(Self {
208 start: pb_range_manipulate.start,
209 end: pb_range_manipulate.end,
210 interval: pb_range_manipulate.interval,
211 range: pb_range_manipulate.range,
212 time_index: pb_range_manipulate.time_index,
213 field_columns: pb_range_manipulate.tag_columns,
214 input: placeholder_plan,
215 output_schema: empty_schema,
216 })
217 }
218}
219
220impl PartialOrd for RangeManipulate {
221 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222 match self.start.partial_cmp(&other.start) {
224 Some(core::cmp::Ordering::Equal) => {}
225 ord => return ord,
226 }
227 match self.end.partial_cmp(&other.end) {
228 Some(core::cmp::Ordering::Equal) => {}
229 ord => return ord,
230 }
231 match self.interval.partial_cmp(&other.interval) {
232 Some(core::cmp::Ordering::Equal) => {}
233 ord => return ord,
234 }
235 match self.range.partial_cmp(&other.range) {
236 Some(core::cmp::Ordering::Equal) => {}
237 ord => return ord,
238 }
239 match self.time_index.partial_cmp(&other.time_index) {
240 Some(core::cmp::Ordering::Equal) => {}
241 ord => return ord,
242 }
243 match self.field_columns.partial_cmp(&other.field_columns) {
244 Some(core::cmp::Ordering::Equal) => {}
245 ord => return ord,
246 }
247 self.input.partial_cmp(&other.input)
248 }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252 fn name(&self) -> &str {
253 Self::name()
254 }
255
256 fn inputs(&self) -> Vec<&LogicalPlan> {
257 vec![&self.input]
258 }
259
260 fn schema(&self) -> &DFSchemaRef {
261 &self.output_schema
262 }
263
264 fn expressions(&self) -> Vec<Expr> {
265 vec![]
266 }
267
268 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269 write!(
270 f,
271 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273 )
274 }
275
276 fn with_exprs_and_inputs(
277 &self,
278 _exprs: Vec<Expr>,
279 mut inputs: Vec<LogicalPlan>,
280 ) -> DataFusionResult<Self> {
281 if inputs.len() != 1 {
282 return Err(DataFusionError::Internal(
283 "RangeManipulate should have at exact one input".to_string(),
284 ));
285 }
286
287 let input: LogicalPlan = inputs.pop().unwrap();
288 let input_schema = input.schema();
289 let output_schema =
290 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292 Ok(Self {
293 start: self.start,
294 end: self.end,
295 interval: self.interval,
296 range: self.range,
297 time_index: self.time_index.clone(),
298 field_columns: self.field_columns.clone(),
299 input,
300 output_schema,
301 })
302 }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307 start: Millisecond,
308 end: Millisecond,
309 interval: Millisecond,
310 range: Millisecond,
311 time_index_column: String,
312 time_range_column: String,
313 field_columns: Vec<String>,
314
315 input: Arc<dyn ExecutionPlan>,
316 output_schema: SchemaRef,
317 metric: ExecutionPlanMetricsSet,
318 properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322 fn as_any(&self) -> &dyn Any {
323 self
324 }
325
326 fn schema(&self) -> SchemaRef {
327 self.output_schema.clone()
328 }
329
330 fn properties(&self) -> &PlanProperties {
331 &self.properties
332 }
333
334 fn maintains_input_order(&self) -> Vec<bool> {
335 vec![true; self.children().len()]
336 }
337
338 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339 vec![&self.input]
340 }
341
342 fn required_input_distribution(&self) -> Vec<Distribution> {
343 self.input.required_input_distribution()
344 }
345
346 fn with_new_children(
347 self: Arc<Self>,
348 children: Vec<Arc<dyn ExecutionPlan>>,
349 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
350 assert!(!children.is_empty());
351 let exec_input = children[0].clone();
352 let properties = exec_input.properties();
353 let properties = PlanProperties::new(
354 EquivalenceProperties::new(self.output_schema.clone()),
355 properties.partitioning.clone(),
356 properties.emission_type,
357 properties.boundedness,
358 );
359 Ok(Arc::new(Self {
360 start: self.start,
361 end: self.end,
362 interval: self.interval,
363 range: self.range,
364 time_index_column: self.time_index_column.clone(),
365 time_range_column: self.time_range_column.clone(),
366 field_columns: self.field_columns.clone(),
367 output_schema: self.output_schema.clone(),
368 input: children[0].clone(),
369 metric: self.metric.clone(),
370 properties,
371 }))
372 }
373
374 fn execute(
375 &self,
376 partition: usize,
377 context: Arc<TaskContext>,
378 ) -> DataFusionResult<SendableRecordBatchStream> {
379 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
380 let metrics_builder = MetricBuilder::new(&self.metric);
381 let num_series = Count::new();
382 metrics_builder
383 .with_partition(partition)
384 .build(MetricValue::Count {
385 name: METRIC_NUM_SERIES.into(),
386 count: num_series.clone(),
387 });
388
389 let input = self.input.execute(partition, context)?;
390 let schema = input.schema();
391 let time_index = schema
392 .column_with_name(&self.time_index_column)
393 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
394 .0;
395 let field_columns = self
396 .field_columns
397 .iter()
398 .map(|value_col| {
399 schema
400 .column_with_name(value_col)
401 .unwrap_or_else(|| panic!("value column {value_col} not found",))
402 .0
403 })
404 .collect();
405 let aligned_ts_array =
406 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
407 Ok(Box::pin(RangeManipulateStream {
408 start: self.start,
409 end: self.end,
410 interval: self.interval,
411 range: self.range,
412 time_index,
413 field_columns,
414 aligned_ts_array,
415 output_schema: self.output_schema.clone(),
416 input,
417 metric: baseline_metric,
418 num_series,
419 }))
420 }
421
422 fn metrics(&self) -> Option<MetricsSet> {
423 Some(self.metric.clone_inner())
424 }
425
426 fn statistics(&self) -> DataFusionResult<Statistics> {
427 let input_stats = self.input.statistics()?;
428
429 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
430 let estimated_total_bytes = input_stats
431 .total_byte_size
432 .get_value()
433 .zip(input_stats.num_rows.get_value())
434 .map(|(size, rows)| {
435 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
436 })
437 .unwrap_or_default();
438
439 Ok(Statistics {
440 num_rows: Precision::Inexact(estimated_row_num as _),
441 total_byte_size: estimated_total_bytes,
442 column_statistics: Statistics::unknown_column(&self.schema()),
444 })
445 }
446
447 fn name(&self) -> &str {
448 "RangeManipulateExec"
449 }
450}
451
452impl DisplayAs for RangeManipulateExec {
453 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
454 match t {
455 DisplayFormatType::Default | DisplayFormatType::Verbose => {
456 write!(
457 f,
458 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
459 self.start, self.end, self.interval, self.range, self.time_index_column
460 )
461 }
462 }
463 }
464}
465
466pub struct RangeManipulateStream {
467 start: Millisecond,
468 end: Millisecond,
469 interval: Millisecond,
470 range: Millisecond,
471 time_index: usize,
472 field_columns: Vec<usize>,
473 aligned_ts_array: ArrayRef,
474
475 output_schema: SchemaRef,
476 input: SendableRecordBatchStream,
477 metric: BaselineMetrics,
478 num_series: Count,
480}
481
482impl RecordBatchStream for RangeManipulateStream {
483 fn schema(&self) -> SchemaRef {
484 self.output_schema.clone()
485 }
486}
487
488impl Stream for RangeManipulateStream {
489 type Item = DataFusionResult<RecordBatch>;
490
491 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
492 let poll = loop {
493 match ready!(self.input.poll_next_unpin(cx)) {
494 Some(Ok(batch)) => {
495 let timer = std::time::Instant::now();
496 let result = self.manipulate(batch);
497 if let Ok(None) = result {
498 self.metric.elapsed_compute().add_elapsed(timer);
499 continue;
500 } else {
501 self.num_series.add(1);
502 self.metric.elapsed_compute().add_elapsed(timer);
503 break Poll::Ready(result.transpose());
504 }
505 }
506 None => {
507 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
508 break Poll::Ready(None);
509 }
510 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
511 }
512 };
513 self.metric.record_poll(poll)
514 }
515}
516
517impl RangeManipulateStream {
518 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
522 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
523 let (ranges, (start, end)) = self.calculate_range(&input)?;
525 if ranges.iter().all(|(_, len)| *len == 0) {
527 return Ok(None);
528 }
529
530 let mut new_columns = input.columns().to_vec();
532 for index in self.field_columns.iter() {
533 let _ = other_columns.remove(index);
534 let column = input.column(*index);
535 let new_column = Arc::new(
536 RangeArray::from_ranges(column.clone(), ranges.clone())
537 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
538 .into_dict(),
539 );
540 new_columns[*index] = new_column;
541 }
542
543 let ts_range_column =
545 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
546 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
547 .into_dict();
548 new_columns.push(Arc::new(ts_range_column));
549
550 let take_indices = Int64Array::from(vec![0; ranges.len()]);
552 for index in other_columns.into_iter() {
553 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
554 }
555 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
557 Self::build_aligned_ts_array(start, end, self.interval)
558 } else {
559 self.aligned_ts_array.clone()
560 };
561 new_columns[self.time_index] = new_time_index;
562
563 RecordBatch::try_new(self.output_schema.clone(), new_columns)
564 .map(Some)
565 .map_err(|e| DataFusionError::ArrowError(e, None))
566 }
567
568 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
569 Arc::new(TimestampMillisecondArray::from_iter_values(
570 (start..=end).step_by(interval as _),
571 ))
572 }
573
574 #[allow(clippy::type_complexity)]
578 fn calculate_range(
579 &self,
580 input: &RecordBatch,
581 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
582 let ts_column = input
583 .column(self.time_index)
584 .as_any()
585 .downcast_ref::<TimestampMillisecondArray>()
586 .ok_or_else(|| {
587 DataFusionError::Execution(
588 "Time index Column downcast to TimestampMillisecondArray failed".into(),
589 )
590 })?;
591
592 let len = ts_column.len();
593 if len == 0 {
594 return Ok((vec![], (self.start, self.end)));
595 }
596
597 let first_ts = ts_column.value(0);
599 let first_ts_aligned = (first_ts / self.interval) * self.interval;
600 let last_ts = ts_column.value(ts_column.len() - 1);
601 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
602 let start = self.start.max(first_ts_aligned);
603 let end = self.end.min(last_ts_aligned);
604 if start > end {
605 return Ok((vec![], (start, end)));
606 }
607 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
608
609 let mut range_start_index = 0usize;
611 let mut last_range_start = 0;
612 let mut start_delta = 0;
613 for curr_ts in (start..=end).step_by(self.interval as _) {
614 let start_ts = curr_ts - self.range;
616
617 let mut range_start = ts_column.len();
619 let mut range_end = 0;
620 let mut cursor = range_start_index + start_delta;
621 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
623 cursor -= 1;
624 }
625
626 while cursor < ts_column.len() {
627 let ts = ts_column.value(cursor);
628 if range_start > cursor && ts >= start_ts {
629 range_start = cursor;
630 range_start_index = range_start;
631 }
632 if ts <= curr_ts {
633 range_end = range_end.max(cursor);
634 } else {
635 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
636 break;
637 }
638 cursor += 1;
639 }
640 if range_start > range_end {
641 ranges.push((0, 0));
642 start_delta = 0;
643 } else {
644 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
645 start_delta = range_start - last_range_start;
646 last_range_start = range_start;
647 }
648 }
649
650 Ok((ranges, (start, end)))
651 }
652}
653
654#[cfg(test)]
655mod test {
656 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
657 use datafusion::arrow::datatypes::{
658 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
659 };
660 use datafusion::common::ToDFSchema;
661 use datafusion::physical_expr::Partitioning;
662 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
663 use datafusion::physical_plan::memory::MemoryExec;
664 use datafusion::prelude::SessionContext;
665 use datatypes::arrow::array::TimestampMillisecondArray;
666
667 use super::*;
668
669 const TIME_INDEX_COLUMN: &str = "timestamp";
670
671 fn prepare_test_data() -> MemoryExec {
672 let schema = Arc::new(Schema::new(vec![
673 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
674 Field::new("value_1", DataType::Float64, true),
675 Field::new("value_2", DataType::Float64, true),
676 Field::new("path", DataType::Utf8, true),
677 ]));
678 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
679 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
683 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
684 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
685 let data = RecordBatch::try_new(
686 schema.clone(),
687 vec![
688 timestamp_column,
689 field_column.clone(),
690 field_column,
691 path_column,
692 ],
693 )
694 .unwrap();
695
696 MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
697 }
698
699 async fn do_normalize_test(
700 start: Millisecond,
701 end: Millisecond,
702 interval: Millisecond,
703 range: Millisecond,
704 expected: String,
705 ) {
706 let memory_exec = Arc::new(prepare_test_data());
707 let time_index = TIME_INDEX_COLUMN.to_string();
708 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
709 let manipulate_output_schema = SchemaRef::new(
710 RangeManipulate::calculate_output_schema(
711 &memory_exec.schema().to_dfschema_ref().unwrap(),
712 &time_index,
713 &field_columns,
714 )
715 .unwrap()
716 .as_ref()
717 .into(),
718 );
719 let properties = PlanProperties::new(
720 EquivalenceProperties::new(manipulate_output_schema.clone()),
721 Partitioning::UnknownPartitioning(1),
722 EmissionType::Incremental,
723 Boundedness::Bounded,
724 );
725 let normalize_exec = Arc::new(RangeManipulateExec {
726 start,
727 end,
728 interval,
729 range,
730 field_columns,
731 output_schema: manipulate_output_schema,
732 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
733 time_index_column: time_index,
734 input: memory_exec,
735 metric: ExecutionPlanMetricsSet::new(),
736 properties,
737 });
738 let session_context = SessionContext::default();
739 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
740 .await
741 .unwrap();
742 let result_literal: String = result
744 .into_iter()
745 .filter_map(|batch| {
746 batch
747 .columns()
748 .iter()
749 .map(|array| {
750 if matches!(array.data_type(), &DataType::Dictionary(..)) {
751 let dict_array = array
752 .as_any()
753 .downcast_ref::<DictionaryArray<Int64Type>>()
754 .unwrap()
755 .clone();
756 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
757 } else {
758 format!("{array:?}")
759 }
760 })
761 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
762 })
763 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
764 .unwrap();
765
766 assert_eq!(result_literal, expected);
767 }
768
769 #[tokio::test]
770 async fn interval_30s_range_90s() {
771 let expected = String::from(
772 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
773 1970-01-01T00:00:00,\n \
774 1970-01-01T00:00:30,\n \
775 1970-01-01T00:01:00,\n \
776 1970-01-01T00:01:30,\n \
777 1970-01-01T00:02:00,\n \
778 1970-01-01T00:02:30,\n \
779 1970-01-01T00:03:00,\n \
780 1970-01-01T00:03:30,\n \
781 1970-01-01T00:04:00,\n \
782 1970-01-01T00:04:30,\n \
783 1970-01-01T00:05:00,\n\
784 ]\nRangeArray { \
785 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
786 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
787 }\nRangeArray { \
788 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
789 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
790 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
791 RangeArray { \
792 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
793 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
794 }",
795);
796 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
797
798 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
800 }
801
802 #[tokio::test]
803 async fn small_empty_range() {
804 let expected = String::from(
805 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
806 1970-01-01T00:00:00.001,\n \
807 1970-01-01T00:00:03.001,\n \
808 1970-01-01T00:00:06.001,\n \
809 1970-01-01T00:00:09.001,\n\
810 ]\nRangeArray { \
811 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
812 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
813 }\nRangeArray { \
814 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
815 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
816 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
817 RangeArray { \
818 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
819 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
820 }");
821 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
822 }
823}