1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use datafusion::arrow::array::{Array, ArrayRef, Int64Array, TimestampMillisecondArray};
22use datafusion::arrow::compute;
23use datafusion::arrow::datatypes::{Field, SchemaRef};
24use datafusion::arrow::error::ArrowError;
25use datafusion::arrow::record_batch::RecordBatch;
26use datafusion::common::stats::Precision;
27use datafusion::common::{DFSchema, DFSchemaRef};
28use datafusion::error::{DataFusionError, Result as DataFusionResult};
29use datafusion::execution::context::TaskContext;
30use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
31use datafusion::physical_expr::EquivalenceProperties;
32use datafusion::physical_plan::metrics::{
33 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
34};
35use datafusion::physical_plan::{
36 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
37 SendableRecordBatchStream, Statistics,
38};
39use datafusion::sql::TableReference;
40use futures::{ready, Stream, StreamExt};
41use greptime_proto::substrait_extension as pb;
42use prost::Message;
43use snafu::ResultExt;
44
45use crate::error::{DeserializeSnafu, Result};
46use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
47use crate::metrics::PROMQL_SERIES_COUNT;
48use crate::range_array::RangeArray;
49
50#[derive(Debug, PartialEq, Eq, Hash)]
60pub struct RangeManipulate {
61 start: Millisecond,
62 end: Millisecond,
63 interval: Millisecond,
64 range: Millisecond,
65
66 time_index: String,
67 field_columns: Vec<String>,
68 input: LogicalPlan,
69 output_schema: DFSchemaRef,
70}
71
72impl RangeManipulate {
73 pub fn new(
74 start: Millisecond,
75 end: Millisecond,
76 interval: Millisecond,
77 range: Millisecond,
78 time_index: String,
79 field_columns: Vec<String>,
80 input: LogicalPlan,
81 ) -> DataFusionResult<Self> {
82 let output_schema =
83 Self::calculate_output_schema(input.schema(), &time_index, &field_columns)?;
84 Ok(Self {
85 start,
86 end,
87 interval,
88 range,
89 time_index,
90 field_columns,
91 input,
92 output_schema,
93 })
94 }
95
96 pub const fn name() -> &'static str {
97 "RangeManipulate"
98 }
99
100 pub fn build_timestamp_range_name(time_index: &str) -> String {
101 format!("{time_index}_range")
102 }
103
104 pub fn internal_range_end_col_name() -> String {
105 "__internal_range_end".to_string()
106 }
107
108 fn range_timestamp_name(&self) -> String {
109 Self::build_timestamp_range_name(&self.time_index)
110 }
111
112 fn calculate_output_schema(
113 input_schema: &DFSchemaRef,
114 time_index: &str,
115 field_columns: &[String],
116 ) -> DataFusionResult<DFSchemaRef> {
117 let columns = input_schema.fields();
118 let mut new_columns = Vec::with_capacity(columns.len() + 1);
119 for i in 0..columns.len() {
120 let x = input_schema.qualified_field(i);
121 new_columns.push((x.0.cloned(), Arc::new(x.1.clone())));
122 }
123
124 let Some(ts_col_index) = input_schema.index_of_column_by_name(None, time_index) else {
127 return Err(datafusion::common::field_not_found(
128 None::<TableReference>,
129 time_index,
130 input_schema.as_ref(),
131 ));
132 };
133 let ts_col_field = &columns[ts_col_index];
134 let timestamp_range_field = Field::new(
135 Self::build_timestamp_range_name(time_index),
136 RangeArray::convert_field(ts_col_field).data_type().clone(),
137 ts_col_field.is_nullable(),
138 );
139 new_columns.push((None, Arc::new(timestamp_range_field)));
140
141 for name in field_columns {
143 let Some(index) = input_schema.index_of_column_by_name(None, name) else {
144 return Err(datafusion::common::field_not_found(
145 None::<TableReference>,
146 name,
147 input_schema.as_ref(),
148 ));
149 };
150 new_columns[index] = (None, Arc::new(RangeArray::convert_field(&columns[index])));
151 }
152
153 Ok(Arc::new(DFSchema::new_with_metadata(
154 new_columns,
155 HashMap::new(),
156 )?))
157 }
158
159 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
160 let output_schema: SchemaRef = SchemaRef::new(self.output_schema.as_ref().into());
161 let properties = exec_input.properties();
162 let properties = PlanProperties::new(
163 EquivalenceProperties::new(output_schema.clone()),
164 properties.partitioning.clone(),
165 properties.emission_type,
166 properties.boundedness,
167 );
168 Arc::new(RangeManipulateExec {
169 start: self.start,
170 end: self.end,
171 interval: self.interval,
172 range: self.range,
173 time_index_column: self.time_index.clone(),
174 time_range_column: self.range_timestamp_name(),
175 field_columns: self.field_columns.clone(),
176 input: exec_input,
177 output_schema,
178 metric: ExecutionPlanMetricsSet::new(),
179 properties,
180 })
181 }
182
183 pub fn serialize(&self) -> Vec<u8> {
184 pb::RangeManipulate {
185 start: self.start,
186 end: self.end,
187 interval: self.interval,
188 range: self.range,
189 time_index: self.time_index.clone(),
190 tag_columns: self.field_columns.clone(),
191 }
192 .encode_to_vec()
193 }
194
195 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
196 let pb_range_manipulate = pb::RangeManipulate::decode(bytes).context(DeserializeSnafu)?;
197 let empty_schema = Arc::new(DFSchema::empty());
198 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
199 produce_one_row: false,
200 schema: empty_schema.clone(),
201 });
202
203 Ok(Self {
208 start: pb_range_manipulate.start,
209 end: pb_range_manipulate.end,
210 interval: pb_range_manipulate.interval,
211 range: pb_range_manipulate.range,
212 time_index: pb_range_manipulate.time_index,
213 field_columns: pb_range_manipulate.tag_columns,
214 input: placeholder_plan,
215 output_schema: empty_schema,
216 })
217 }
218}
219
220impl PartialOrd for RangeManipulate {
221 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
222 match self.start.partial_cmp(&other.start) {
224 Some(core::cmp::Ordering::Equal) => {}
225 ord => return ord,
226 }
227 match self.end.partial_cmp(&other.end) {
228 Some(core::cmp::Ordering::Equal) => {}
229 ord => return ord,
230 }
231 match self.interval.partial_cmp(&other.interval) {
232 Some(core::cmp::Ordering::Equal) => {}
233 ord => return ord,
234 }
235 match self.range.partial_cmp(&other.range) {
236 Some(core::cmp::Ordering::Equal) => {}
237 ord => return ord,
238 }
239 match self.time_index.partial_cmp(&other.time_index) {
240 Some(core::cmp::Ordering::Equal) => {}
241 ord => return ord,
242 }
243 match self.field_columns.partial_cmp(&other.field_columns) {
244 Some(core::cmp::Ordering::Equal) => {}
245 ord => return ord,
246 }
247 self.input.partial_cmp(&other.input)
248 }
249}
250
251impl UserDefinedLogicalNodeCore for RangeManipulate {
252 fn name(&self) -> &str {
253 Self::name()
254 }
255
256 fn inputs(&self) -> Vec<&LogicalPlan> {
257 vec![&self.input]
258 }
259
260 fn schema(&self) -> &DFSchemaRef {
261 &self.output_schema
262 }
263
264 fn expressions(&self) -> Vec<Expr> {
265 vec![]
266 }
267
268 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
269 write!(
270 f,
271 "PromRangeManipulate: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}], values={:?}",
272 self.start, self.end, self.interval, self.range, self.time_index, self.field_columns
273 )
274 }
275
276 fn with_exprs_and_inputs(
277 &self,
278 _exprs: Vec<Expr>,
279 mut inputs: Vec<LogicalPlan>,
280 ) -> DataFusionResult<Self> {
281 if inputs.len() != 1 {
282 return Err(DataFusionError::Internal(
283 "RangeManipulate should have at exact one input".to_string(),
284 ));
285 }
286
287 let input: LogicalPlan = inputs.pop().unwrap();
288 let input_schema = input.schema();
289 let output_schema =
290 Self::calculate_output_schema(input_schema, &self.time_index, &self.field_columns)?;
291
292 Ok(Self {
293 start: self.start,
294 end: self.end,
295 interval: self.interval,
296 range: self.range,
297 time_index: self.time_index.clone(),
298 field_columns: self.field_columns.clone(),
299 input,
300 output_schema,
301 })
302 }
303}
304
305#[derive(Debug)]
306pub struct RangeManipulateExec {
307 start: Millisecond,
308 end: Millisecond,
309 interval: Millisecond,
310 range: Millisecond,
311 time_index_column: String,
312 time_range_column: String,
313 field_columns: Vec<String>,
314
315 input: Arc<dyn ExecutionPlan>,
316 output_schema: SchemaRef,
317 metric: ExecutionPlanMetricsSet,
318 properties: PlanProperties,
319}
320
321impl ExecutionPlan for RangeManipulateExec {
322 fn as_any(&self) -> &dyn Any {
323 self
324 }
325
326 fn schema(&self) -> SchemaRef {
327 self.output_schema.clone()
328 }
329
330 fn properties(&self) -> &PlanProperties {
331 &self.properties
332 }
333
334 fn maintains_input_order(&self) -> Vec<bool> {
335 vec![true; self.children().len()]
336 }
337
338 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339 vec![&self.input]
340 }
341
342 fn required_input_distribution(&self) -> Vec<Distribution> {
343 let input_requirement = self.input.required_input_distribution();
344 if input_requirement.is_empty() {
345 vec![Distribution::UnspecifiedDistribution]
348 } else {
349 input_requirement
350 }
351 }
352
353 fn with_new_children(
354 self: Arc<Self>,
355 children: Vec<Arc<dyn ExecutionPlan>>,
356 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
357 assert!(!children.is_empty());
358 let exec_input = children[0].clone();
359 let properties = exec_input.properties();
360 let properties = PlanProperties::new(
361 EquivalenceProperties::new(self.output_schema.clone()),
362 properties.partitioning.clone(),
363 properties.emission_type,
364 properties.boundedness,
365 );
366 Ok(Arc::new(Self {
367 start: self.start,
368 end: self.end,
369 interval: self.interval,
370 range: self.range,
371 time_index_column: self.time_index_column.clone(),
372 time_range_column: self.time_range_column.clone(),
373 field_columns: self.field_columns.clone(),
374 output_schema: self.output_schema.clone(),
375 input: children[0].clone(),
376 metric: self.metric.clone(),
377 properties,
378 }))
379 }
380
381 fn execute(
382 &self,
383 partition: usize,
384 context: Arc<TaskContext>,
385 ) -> DataFusionResult<SendableRecordBatchStream> {
386 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
387 let metrics_builder = MetricBuilder::new(&self.metric);
388 let num_series = Count::new();
389 metrics_builder
390 .with_partition(partition)
391 .build(MetricValue::Count {
392 name: METRIC_NUM_SERIES.into(),
393 count: num_series.clone(),
394 });
395
396 let input = self.input.execute(partition, context)?;
397 let schema = input.schema();
398 let time_index = schema
399 .column_with_name(&self.time_index_column)
400 .unwrap_or_else(|| panic!("time index column {} not found", self.time_index_column))
401 .0;
402 let field_columns = self
403 .field_columns
404 .iter()
405 .map(|value_col| {
406 schema
407 .column_with_name(value_col)
408 .unwrap_or_else(|| panic!("value column {value_col} not found",))
409 .0
410 })
411 .collect();
412 let aligned_ts_array =
413 RangeManipulateStream::build_aligned_ts_array(self.start, self.end, self.interval);
414 Ok(Box::pin(RangeManipulateStream {
415 start: self.start,
416 end: self.end,
417 interval: self.interval,
418 range: self.range,
419 time_index,
420 field_columns,
421 aligned_ts_array,
422 output_schema: self.output_schema.clone(),
423 input,
424 metric: baseline_metric,
425 num_series,
426 }))
427 }
428
429 fn metrics(&self) -> Option<MetricsSet> {
430 Some(self.metric.clone_inner())
431 }
432
433 fn statistics(&self) -> DataFusionResult<Statistics> {
434 let input_stats = self.input.statistics()?;
435
436 let estimated_row_num = (self.end - self.start) as f64 / self.interval as f64;
437 let estimated_total_bytes = input_stats
438 .total_byte_size
439 .get_value()
440 .zip(input_stats.num_rows.get_value())
441 .map(|(size, rows)| {
442 Precision::Inexact(((*size as f64 / *rows as f64) * estimated_row_num).floor() as _)
443 })
444 .unwrap_or_default();
445
446 Ok(Statistics {
447 num_rows: Precision::Inexact(estimated_row_num as _),
448 total_byte_size: estimated_total_bytes,
449 column_statistics: Statistics::unknown_column(&self.schema()),
451 })
452 }
453
454 fn name(&self) -> &str {
455 "RangeManipulateExec"
456 }
457}
458
459impl DisplayAs for RangeManipulateExec {
460 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
461 match t {
462 DisplayFormatType::Default | DisplayFormatType::Verbose => {
463 write!(
464 f,
465 "PromRangeManipulateExec: req range=[{}..{}], interval=[{}], eval range=[{}], time index=[{}]",
466 self.start, self.end, self.interval, self.range, self.time_index_column
467 )
468 }
469 }
470 }
471}
472
473pub struct RangeManipulateStream {
474 start: Millisecond,
475 end: Millisecond,
476 interval: Millisecond,
477 range: Millisecond,
478 time_index: usize,
479 field_columns: Vec<usize>,
480 aligned_ts_array: ArrayRef,
481
482 output_schema: SchemaRef,
483 input: SendableRecordBatchStream,
484 metric: BaselineMetrics,
485 num_series: Count,
487}
488
489impl RecordBatchStream for RangeManipulateStream {
490 fn schema(&self) -> SchemaRef {
491 self.output_schema.clone()
492 }
493}
494
495impl Stream for RangeManipulateStream {
496 type Item = DataFusionResult<RecordBatch>;
497
498 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499 let poll = loop {
500 match ready!(self.input.poll_next_unpin(cx)) {
501 Some(Ok(batch)) => {
502 let timer = std::time::Instant::now();
503 let result = self.manipulate(batch);
504 if let Ok(None) = result {
505 self.metric.elapsed_compute().add_elapsed(timer);
506 continue;
507 } else {
508 self.num_series.add(1);
509 self.metric.elapsed_compute().add_elapsed(timer);
510 break Poll::Ready(result.transpose());
511 }
512 }
513 None => {
514 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
515 break Poll::Ready(None);
516 }
517 Some(Err(e)) => break Poll::Ready(Some(Err(e))),
518 }
519 };
520 self.metric.record_poll(poll)
521 }
522}
523
524impl RangeManipulateStream {
525 pub fn manipulate(&self, input: RecordBatch) -> DataFusionResult<Option<RecordBatch>> {
529 let mut other_columns = (0..input.columns().len()).collect::<HashSet<_>>();
530 let (ranges, (start, end)) = self.calculate_range(&input)?;
532 if ranges.iter().all(|(_, len)| *len == 0) {
534 return Ok(None);
535 }
536
537 let mut new_columns = input.columns().to_vec();
539 for index in self.field_columns.iter() {
540 let _ = other_columns.remove(index);
541 let column = input.column(*index);
542 let new_column = Arc::new(
543 RangeArray::from_ranges(column.clone(), ranges.clone())
544 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
545 .into_dict(),
546 );
547 new_columns[*index] = new_column;
548 }
549
550 let ts_range_column =
552 RangeArray::from_ranges(input.column(self.time_index).clone(), ranges.clone())
553 .map_err(|e| ArrowError::InvalidArgumentError(e.to_string()))?
554 .into_dict();
555 new_columns.push(Arc::new(ts_range_column));
556
557 let take_indices = Int64Array::from(vec![0; ranges.len()]);
559 for index in other_columns.into_iter() {
560 new_columns[index] = compute::take(&input.column(index), &take_indices, None)?;
561 }
562 let new_time_index = if ranges.len() != self.aligned_ts_array.len() {
564 Self::build_aligned_ts_array(start, end, self.interval)
565 } else {
566 self.aligned_ts_array.clone()
567 };
568 new_columns[self.time_index] = new_time_index;
569
570 RecordBatch::try_new(self.output_schema.clone(), new_columns)
571 .map(Some)
572 .map_err(|e| DataFusionError::ArrowError(e, None))
573 }
574
575 fn build_aligned_ts_array(start: i64, end: i64, interval: i64) -> ArrayRef {
576 Arc::new(TimestampMillisecondArray::from_iter_values(
577 (start..=end).step_by(interval as _),
578 ))
579 }
580
581 #[allow(clippy::type_complexity)]
585 fn calculate_range(
586 &self,
587 input: &RecordBatch,
588 ) -> DataFusionResult<(Vec<(u32, u32)>, (i64, i64))> {
589 let ts_column = input
590 .column(self.time_index)
591 .as_any()
592 .downcast_ref::<TimestampMillisecondArray>()
593 .ok_or_else(|| {
594 DataFusionError::Execution(
595 "Time index Column downcast to TimestampMillisecondArray failed".into(),
596 )
597 })?;
598
599 let len = ts_column.len();
600 if len == 0 {
601 return Ok((vec![], (self.start, self.end)));
602 }
603
604 let first_ts = ts_column.value(0);
606 let first_ts_aligned = (first_ts / self.interval) * self.interval;
607 let last_ts = ts_column.value(ts_column.len() - 1);
608 let last_ts_aligned = ((last_ts + self.range) / self.interval) * self.interval;
609 let start = self.start.max(first_ts_aligned);
610 let end = self.end.min(last_ts_aligned);
611 if start > end {
612 return Ok((vec![], (start, end)));
613 }
614 let mut ranges = Vec::with_capacity(((self.end - self.start) / self.interval + 1) as usize);
615
616 let mut range_start_index = 0usize;
618 let mut last_range_start = 0;
619 let mut start_delta = 0;
620 for curr_ts in (start..=end).step_by(self.interval as _) {
621 let start_ts = curr_ts - self.range;
623
624 let mut range_start = ts_column.len();
626 let mut range_end = 0;
627 let mut cursor = range_start_index + start_delta;
628 while cursor < ts_column.len() && ts_column.value(cursor) > start_ts && cursor > 0 {
630 cursor -= 1;
631 }
632
633 while cursor < ts_column.len() {
634 let ts = ts_column.value(cursor);
635 if range_start > cursor && ts >= start_ts {
636 range_start = cursor;
637 range_start_index = range_start;
638 }
639 if ts <= curr_ts {
640 range_end = range_end.max(cursor);
641 } else {
642 range_start_index = range_start_index.checked_sub(1usize).unwrap_or_default();
643 break;
644 }
645 cursor += 1;
646 }
647 if range_start > range_end {
648 ranges.push((0, 0));
649 start_delta = 0;
650 } else {
651 ranges.push((range_start as _, (range_end + 1 - range_start) as _));
652 start_delta = range_start - last_range_start;
653 last_range_start = range_start;
654 }
655 }
656
657 Ok((ranges, (start, end)))
658 }
659}
660
661#[cfg(test)]
662mod test {
663 use datafusion::arrow::array::{ArrayRef, DictionaryArray, Float64Array, StringArray};
664 use datafusion::arrow::datatypes::{
665 ArrowPrimitiveType, DataType, Field, Int64Type, Schema, TimestampMillisecondType,
666 };
667 use datafusion::common::ToDFSchema;
668 use datafusion::physical_expr::Partitioning;
669 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
670 use datafusion::physical_plan::memory::MemoryExec;
671 use datafusion::prelude::SessionContext;
672 use datatypes::arrow::array::TimestampMillisecondArray;
673
674 use super::*;
675
676 const TIME_INDEX_COLUMN: &str = "timestamp";
677
678 fn prepare_test_data() -> MemoryExec {
679 let schema = Arc::new(Schema::new(vec![
680 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
681 Field::new("value_1", DataType::Float64, true),
682 Field::new("value_2", DataType::Float64, true),
683 Field::new("path", DataType::Utf8, true),
684 ]));
685 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
686 0, 30_000, 60_000, 90_000, 120_000, 180_000, 240_000, 241_000, 271_000, 291_000, ])) as _;
690 let field_column: ArrayRef = Arc::new(Float64Array::from(vec![1.0; 10])) as _;
691 let path_column = Arc::new(StringArray::from(vec!["foo"; 10])) as _;
692 let data = RecordBatch::try_new(
693 schema.clone(),
694 vec![
695 timestamp_column,
696 field_column.clone(),
697 field_column,
698 path_column,
699 ],
700 )
701 .unwrap();
702
703 MemoryExec::try_new(&[vec![data]], schema, None).unwrap()
704 }
705
706 async fn do_normalize_test(
707 start: Millisecond,
708 end: Millisecond,
709 interval: Millisecond,
710 range: Millisecond,
711 expected: String,
712 ) {
713 let memory_exec = Arc::new(prepare_test_data());
714 let time_index = TIME_INDEX_COLUMN.to_string();
715 let field_columns = vec!["value_1".to_string(), "value_2".to_string()];
716 let manipulate_output_schema = SchemaRef::new(
717 RangeManipulate::calculate_output_schema(
718 &memory_exec.schema().to_dfschema_ref().unwrap(),
719 &time_index,
720 &field_columns,
721 )
722 .unwrap()
723 .as_ref()
724 .into(),
725 );
726 let properties = PlanProperties::new(
727 EquivalenceProperties::new(manipulate_output_schema.clone()),
728 Partitioning::UnknownPartitioning(1),
729 EmissionType::Incremental,
730 Boundedness::Bounded,
731 );
732 let normalize_exec = Arc::new(RangeManipulateExec {
733 start,
734 end,
735 interval,
736 range,
737 field_columns,
738 output_schema: manipulate_output_schema,
739 time_range_column: RangeManipulate::build_timestamp_range_name(&time_index),
740 time_index_column: time_index,
741 input: memory_exec,
742 metric: ExecutionPlanMetricsSet::new(),
743 properties,
744 });
745 let session_context = SessionContext::default();
746 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
747 .await
748 .unwrap();
749 let result_literal: String = result
751 .into_iter()
752 .filter_map(|batch| {
753 batch
754 .columns()
755 .iter()
756 .map(|array| {
757 if matches!(array.data_type(), &DataType::Dictionary(..)) {
758 let dict_array = array
759 .as_any()
760 .downcast_ref::<DictionaryArray<Int64Type>>()
761 .unwrap()
762 .clone();
763 format!("{:?}", RangeArray::try_new(dict_array).unwrap())
764 } else {
765 format!("{array:?}")
766 }
767 })
768 .reduce(|lhs, rhs| lhs + "\n" + &rhs)
769 })
770 .reduce(|lhs, rhs| lhs + "\n\n" + &rhs)
771 .unwrap();
772
773 assert_eq!(result_literal, expected);
774 }
775
776 #[tokio::test]
777 async fn interval_30s_range_90s() {
778 let expected = String::from(
779 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
780 1970-01-01T00:00:00,\n \
781 1970-01-01T00:00:30,\n \
782 1970-01-01T00:01:00,\n \
783 1970-01-01T00:01:30,\n \
784 1970-01-01T00:02:00,\n \
785 1970-01-01T00:02:30,\n \
786 1970-01-01T00:03:00,\n \
787 1970-01-01T00:03:30,\n \
788 1970-01-01T00:04:00,\n \
789 1970-01-01T00:04:30,\n \
790 1970-01-01T00:05:00,\n\
791 ]\nRangeArray { \
792 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
793 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
794 }\nRangeArray { \
795 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
796 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
797 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
798 RangeArray { \
799 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
800 ranges: [Some(0..1), Some(0..2), Some(0..3), Some(0..4), Some(1..5), Some(2..5), Some(3..6), Some(4..6), Some(5..7), Some(5..8), Some(6..10)] \
801 }",
802);
803 do_normalize_test(0, 310_000, 30_000, 90_000, expected.clone()).await;
804
805 do_normalize_test(-300000, 310_000, 30_000, 90_000, expected).await;
807 }
808
809 #[tokio::test]
810 async fn small_empty_range() {
811 let expected = String::from(
812 "PrimitiveArray<Timestamp(Millisecond, None)>\n[\n \
813 1970-01-01T00:00:00.001,\n \
814 1970-01-01T00:00:03.001,\n \
815 1970-01-01T00:00:06.001,\n \
816 1970-01-01T00:00:09.001,\n\
817 ]\nRangeArray { \
818 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
819 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
820 }\nRangeArray { \
821 base array: PrimitiveArray<Float64>\n[\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n 1.0,\n], \
822 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
823 }\nStringArray\n[\n \"foo\",\n \"foo\",\n \"foo\",\n \"foo\",\n]\n\
824 RangeArray { \
825 base array: PrimitiveArray<Timestamp(Millisecond, None)>\n[\n 1970-01-01T00:00:00,\n 1970-01-01T00:00:30,\n 1970-01-01T00:01:00,\n 1970-01-01T00:01:30,\n 1970-01-01T00:02:00,\n 1970-01-01T00:03:00,\n 1970-01-01T00:04:00,\n 1970-01-01T00:04:01,\n 1970-01-01T00:04:31,\n 1970-01-01T00:04:51,\n], \
826 ranges: [Some(0..1), Some(0..0), Some(0..0), Some(0..0)] \
827 }");
828 do_normalize_test(1, 10_001, 3_000, 1_000, expected).await;
829 }
830}