1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use common_time::timestamp::TimeUnit as TimestampUnit;
34use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
35use datafusion::execution::{RecordBatchStream, TaskContext};
36use datafusion::physical_plan::memory::MemoryStream;
37use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
38use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
39use datafusion::physical_plan::{
40 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
41};
42use datafusion_common::utils::bisect;
43use datafusion_common::{DataFusionError, internal_err};
44use datafusion_physical_expr::PhysicalSortExpr;
45use datatypes::value::Value;
46use futures::Stream;
47use itertools::Itertools;
48use snafu::ResultExt;
49use store_api::region_engine::PartitionRange;
50
51use crate::error::{QueryExecutionSnafu, Result};
52
53#[derive(Debug, Clone)]
68pub struct WindowedSortExec {
69 expression: PhysicalSortExpr,
71 fetch: Option<usize>,
73 ranges: Vec<Vec<PartitionRange>>,
77 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
82 input: Arc<dyn ExecutionPlan>,
83 metrics: ExecutionPlanMetricsSet,
85 properties: Arc<PlanProperties>,
86}
87
88pub fn check_partition_range_monotonicity(
92 ranges: &[Vec<PartitionRange>],
93 descending: bool,
94) -> Result<()> {
95 let is_valid = ranges.iter().all(|r| {
96 if descending {
97 r.windows(2)
99 .all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
100 } else {
101 r.windows(2).all(|w| {
103 w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
104 })
105 }
106 });
107
108 if !is_valid {
109 let msg = if descending {
110 "Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
111 } else {
112 "Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
113 };
114 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
115 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
116 } else {
117 Ok(())
118 }
119}
120
121impl WindowedSortExec {
122 pub fn try_new(
123 expression: PhysicalSortExpr,
124 fetch: Option<usize>,
125 ranges: Vec<Vec<PartitionRange>>,
126 input: Arc<dyn ExecutionPlan>,
127 ) -> Result<Self> {
128 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
129
130 let mut eq_properties = input.equivalence_properties().clone();
131 eq_properties.reorder(vec![expression.clone()])?;
132
133 let properties = input.properties();
134 let properties = Arc::new(PlanProperties::new(
135 eq_properties,
136 input.output_partitioning().clone(),
137 properties.emission_type,
138 properties.boundedness,
139 ));
140
141 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
142 for r in &ranges {
143 let overlap_counts = split_overlapping_ranges(r);
144 let working_ranges =
145 compute_all_working_ranges(&overlap_counts, expression.options.descending);
146 all_avail_working_range.push(working_ranges);
147 }
148
149 Ok(Self {
150 expression,
151 fetch,
152 ranges,
153 all_avail_working_range,
154 input,
155 metrics: ExecutionPlanMetricsSet::new(),
156 properties,
157 })
158 }
159
160 pub fn to_stream(
164 &self,
165 context: Arc<TaskContext>,
166 partition: usize,
167 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
168 let input_stream: DfSendableRecordBatchStream =
169 self.input.execute(partition, context.clone())?;
170
171 let df_stream = Box::pin(WindowedSortStream::new(
172 context,
173 self,
174 input_stream,
175 partition,
176 )) as _;
177
178 Ok(df_stream)
179 }
180}
181
182impl DisplayAs for WindowedSortExec {
183 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184 write!(
185 f,
186 "WindowedSortExec: expr={} num_ranges={}",
187 self.expression,
188 self.ranges.len()
189 )?;
190 if let Some(fetch) = self.fetch {
191 write!(f, " fetch={}", fetch)?;
192 }
193 Ok(())
194 }
195}
196
197impl ExecutionPlan for WindowedSortExec {
198 fn as_any(&self) -> &dyn Any {
199 self
200 }
201
202 fn schema(&self) -> SchemaRef {
203 self.input.schema()
204 }
205
206 fn properties(&self) -> &Arc<PlanProperties> {
207 &self.properties
208 }
209
210 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
211 vec![&self.input]
212 }
213
214 fn with_new_children(
215 self: Arc<Self>,
216 children: Vec<Arc<dyn ExecutionPlan>>,
217 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
218 let new_input = if let Some(first) = children.first() {
219 first
220 } else {
221 internal_err!("No children found")?
222 };
223 let new = Self::try_new(
224 self.expression.clone(),
225 self.fetch,
226 self.ranges.clone(),
227 new_input.clone(),
228 )?;
229 Ok(Arc::new(new))
230 }
231
232 fn execute(
233 &self,
234 partition: usize,
235 context: Arc<TaskContext>,
236 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
237 self.to_stream(context, partition)
238 }
239
240 fn metrics(&self) -> Option<MetricsSet> {
241 Some(self.metrics.clone_inner())
242 }
243
244 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
250 vec![false]
251 }
252
253 fn name(&self) -> &str {
254 "WindowedSortExec"
255 }
256}
257
258pub struct WindowedSortStream {
265 memory_pool: Arc<dyn MemoryPool>,
267 in_progress: Vec<DfRecordBatch>,
269 last_value: Option<Timestamp>,
271 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
273 merge_stream: VecDeque<DfSendableRecordBatchStream>,
275 merge_count: usize,
277 working_idx: usize,
279 input: DfSendableRecordBatchStream,
281 is_terminated: bool,
283 schema: SchemaRef,
285 expression: PhysicalSortExpr,
287 fetch: Option<usize>,
289 produced: usize,
291 batch_size: usize,
293 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
297 #[allow(dead_code)] ranges: Vec<PartitionRange>,
300 metrics: BaselineMetrics,
302}
303
304impl WindowedSortStream {
305 pub fn new(
306 context: Arc<TaskContext>,
307 exec: &WindowedSortExec,
308 input: DfSendableRecordBatchStream,
309 partition: usize,
310 ) -> Self {
311 Self {
312 memory_pool: context.runtime_env().memory_pool.clone(),
313 in_progress: Vec::new(),
314 last_value: None,
315 sorted_input_runs: Vec::new(),
316 merge_stream: VecDeque::new(),
317 merge_count: 0,
318 working_idx: 0,
319 schema: input.schema(),
320 input,
321 is_terminated: false,
322 expression: exec.expression.clone(),
323 fetch: exec.fetch,
324 produced: 0,
325 batch_size: context.session_config().batch_size(),
326 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
327 ranges: exec.ranges[partition].clone(),
328 metrics: BaselineMetrics::new(&exec.metrics, partition),
329 }
330 }
331}
332
333impl WindowedSortStream {
334 #[cfg(debug_assertions)]
335 fn check_subset_ranges(&self, cur_range: &TimeRange) {
336 let cur_is_subset_to = self
337 .ranges
338 .iter()
339 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
340 .collect_vec();
341 if cur_is_subset_to.is_empty() {
342 error!("Current range is not a subset of any PartitionRange");
343 let subset_ranges = self
345 .ranges
346 .iter()
347 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
348 .collect_vec();
349 let only_overlap = self
350 .ranges
351 .iter()
352 .filter(|r| {
353 let r = TimeRange::from(*r);
354 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
355 })
356 .collect_vec();
357 error!(
358 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
359 subset_ranges.len(),
360 only_overlap.len(),
361 subset_ranges,
362 only_overlap
363 );
364 } else {
365 let only_overlap = self
366 .ranges
367 .iter()
368 .filter(|r| {
369 let r = TimeRange::from(*r);
370 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
371 })
372 .collect_vec();
373 error!(
374 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
375 cur_is_subset_to.len(),
376 only_overlap.len(),
377 cur_is_subset_to,
378 only_overlap
379 );
380 }
381 let all_overlap_working_range = self
382 .all_avail_working_range
383 .iter()
384 .filter(|(range, _)| range.is_overlapping(cur_range))
385 .map(|(range, _)| range)
386 .collect_vec();
387 error!(
388 "Found {} working ranges that overlap with current range: {:?}",
389 all_overlap_working_range.len(),
390 all_overlap_working_range
391 );
392 }
393
394 fn poll_result_stream(
396 &mut self,
397 cx: &mut Context<'_>,
398 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
399 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
400 match merge_stream.as_mut().poll_next(cx) {
401 Poll::Ready(Some(Ok(batch))) => {
402 let ret = if let Some(remaining) = self.remaining_fetch() {
403 if remaining == 0 {
404 self.is_terminated = true;
405 None
406 } else if remaining < batch.num_rows() {
407 self.produced += remaining;
408 Some(Ok(batch.slice(0, remaining)))
409 } else {
410 self.produced += batch.num_rows();
411 Some(Ok(batch))
412 }
413 } else {
414 self.produced += batch.num_rows();
415 Some(Ok(batch))
416 };
417 return Poll::Ready(ret);
418 }
419 Poll::Ready(Some(Err(e))) => {
420 return Poll::Ready(Some(Err(e)));
421 }
422 Poll::Ready(None) => {
423 self.merge_stream.pop_front();
426 continue;
427 }
428 Poll::Pending => {
429 return Poll::Pending;
430 }
431 }
432 }
433 Poll::Ready(None)
435 }
436
437 pub fn poll_next_inner(
441 mut self: Pin<&mut Self>,
442 cx: &mut Context<'_>,
443 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
444 match self.poll_result_stream(cx) {
446 Poll::Ready(None) => {
447 if self.is_terminated {
448 return Poll::Ready(None);
449 }
450 }
451 x => return x,
452 };
453
454 while !self.is_terminated {
456 let SortedRunSet {
458 runs_with_batch,
459 sort_column,
460 } = match self.input.as_mut().poll_next(cx) {
461 Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
462 Poll::Ready(Some(Err(e))) => {
463 return Poll::Ready(Some(Err(e)));
464 }
465 Poll::Ready(None) => {
466 self.is_terminated = true;
468 self.build_sorted_stream()?;
469 self.start_new_merge_sort()?;
470 break;
471 }
472 Poll::Pending => return Poll::Pending,
473 };
474
475 let mut last_remaining = None;
481 let mut run_iter = runs_with_batch.into_iter();
482 loop {
483 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
484 break;
485 };
486 if sorted_rb.num_rows() == 0 {
487 continue;
488 }
489 let Some(cur_range) = run_info.get_time_range() else {
491 internal_err!("Found NULL in time index column")?
492 };
493 let Some(working_range) = self.get_working_range() else {
494 internal_err!("No working range found")?
495 };
496
497 if sort_column.options.unwrap_or_default().descending {
499 if cur_range.end > working_range.end {
500 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
501 #[cfg(debug_assertions)]
502 self.check_subset_ranges(&cur_range);
503 internal_err!(
504 "Current batch have data on the right side of working range, something is very wrong"
505 )?;
506 }
507 } else if cur_range.start < working_range.start {
508 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
509 #[cfg(debug_assertions)]
510 self.check_subset_ranges(&cur_range);
511 internal_err!(
512 "Current batch have data on the left side of working range, something is very wrong"
513 )?;
514 }
515
516 if cur_range.is_subset(&working_range) {
517 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
520 } else if let Some(intersection) = cur_range.intersection(&working_range) {
521 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
523 let (offset, len) = find_slice_from_range(
524 &SortColumn {
525 values: cur_sort_column.clone(),
526 options: sort_column.options,
527 },
528 &intersection,
529 )?;
530
531 if offset != 0 {
532 internal_err!(
533 "Current batch have data on the left side of working range, something is very wrong"
534 )?;
535 }
536
537 let sliced_rb = sorted_rb.slice(offset, len);
538
539 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
541 self.build_sorted_stream()?;
543
544 self.start_new_merge_sort()?;
546
547 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
548 if r_len != 0 {
549 let remaining_rb = sorted_rb.slice(r_offset, r_len);
551 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
552 let new_run_info = SucRun {
553 offset: run_info.offset + r_offset,
554 len: r_len,
555 first_val: new_first_val,
556 last_val: run_info.last_val,
557 };
558 last_remaining = Some((remaining_rb, new_run_info));
559 }
560 } else {
566 self.build_sorted_stream()?;
569 self.start_new_merge_sort()?;
570
571 last_remaining = Some((sorted_rb, run_info));
573 }
574 }
575
576 match self.poll_result_stream(cx) {
578 Poll::Ready(None) => {
579 if self.is_terminated {
580 return Poll::Ready(None);
581 }
582 }
583 x => return x,
584 };
585 }
586 self.poll_result_stream(cx)
588 }
589
590 fn push_batch(&mut self, batch: DfRecordBatch) {
591 self.in_progress.push(batch);
592 }
593
594 fn try_concat_batch(
598 &mut self,
599 batch: DfRecordBatch,
600 run_info: &SucRun<Timestamp>,
601 opt: Option<SortOptions>,
602 ) -> datafusion_common::Result<()> {
603 let is_ok_to_concat =
604 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
605
606 if is_ok_to_concat {
607 self.push_batch(batch);
608 } else {
610 self.build_sorted_stream()?;
612 self.push_batch(batch);
613 }
614 self.last_value = run_info.last_val;
615 Ok(())
616 }
617
618 fn get_working_range(&self) -> Option<TimeRange> {
620 self.all_avail_working_range
621 .get(self.working_idx)
622 .map(|(range, _)| *range)
623 }
624
625 fn set_next_working_range(&mut self) {
627 self.working_idx += 1;
628 }
629
630 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
632 if self.in_progress.is_empty() {
633 return Ok(());
634 }
635 let data = std::mem::take(&mut self.in_progress);
636
637 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
638 self.sorted_input_runs.push(Box::pin(new_stream));
639 Ok(())
640 }
641
642 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
644 if !self.in_progress.is_empty() {
645 return internal_err!("Starting a merge sort when in_progress is not empty")?;
646 }
647
648 self.set_next_working_range();
649
650 let streams = std::mem::take(&mut self.sorted_input_runs);
651 if streams.is_empty() {
652 return Ok(());
653 } else if streams.len() == 1 {
654 self.merge_stream
655 .push_back(streams.into_iter().next().unwrap());
656 return Ok(());
657 }
658
659 let fetch = self.remaining_fetch();
660 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
661 .register(&self.memory_pool);
662 self.merge_count += 1;
663
664 let resulting_stream = StreamingMergeBuilder::new()
665 .with_streams(streams)
666 .with_schema(self.schema())
667 .with_expressions(&[self.expression.clone()].into())
668 .with_metrics(self.metrics.clone())
669 .with_batch_size(self.batch_size)
670 .with_fetch(fetch)
671 .with_reservation(reservation)
672 .build()?;
673 self.merge_stream.push_back(resulting_stream);
674 Ok(())
676 }
677
678 fn remaining_fetch(&self) -> Option<usize> {
681 let total_now = self.produced;
682 self.fetch.map(|p| p.saturating_sub(total_now))
683 }
684}
685
686impl Stream for WindowedSortStream {
687 type Item = datafusion_common::Result<DfRecordBatch>;
688
689 fn poll_next(
690 mut self: Pin<&mut Self>,
691 cx: &mut Context<'_>,
692 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
693 let result = self.as_mut().poll_next_inner(cx);
694 self.metrics.record_poll(result)
695 }
696}
697
698impl RecordBatchStream for WindowedSortStream {
699 fn schema(&self) -> SchemaRef {
700 self.schema.clone()
701 }
702}
703
704fn split_batch_to_sorted_run(
706 batch: DfRecordBatch,
707 expression: &PhysicalSortExpr,
708) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
709 let sort_column = expression.evaluate_to_sort_column(&batch)?;
711 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
712 if let Some(run) = sorted_runs_offset.first()
713 && sorted_runs_offset.len() == 1
714 {
715 if !(run.offset == 0 && run.len == batch.num_rows()) {
716 internal_err!(
717 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
718 run.offset,
719 run.len,
720 batch.num_rows()
721 )?;
722 }
723 Ok(SortedRunSet {
725 runs_with_batch: vec![(batch, run.clone())],
726 sort_column,
727 })
728 } else {
729 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
731 for run in sorted_runs_offset {
732 if run.offset + run.len > batch.num_rows() {
733 internal_err!(
734 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
735 run.offset,
736 run.len,
737 batch.num_rows()
738 )?;
739 }
740 let new_rb = batch.slice(run.offset, run.len);
741 ret.push((new_rb, run));
742 }
743 Ok(SortedRunSet {
744 runs_with_batch: ret,
745 sort_column,
746 })
747 }
748}
749
750#[macro_export]
754macro_rules! downcast_ts_array {
755 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
756 {
757 match $data_type {
758 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
759 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
760 }
761 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
762 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
763 }
764 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
765 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
766 }
767 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
768 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
769 }
770 $($p => $fallback,)*
771 }
772 };
773}
774
775fn find_slice_from_range(
779 sort_column: &SortColumn,
780 range: &TimeRange,
781) -> datafusion_common::Result<(usize, usize)> {
782 let time_unit = sort_timestamp_unit(sort_column.values.data_type())?;
783 let array = &sort_column.values;
784 let opt = &sort_column.options.unwrap_or_default();
785 let descending = opt.descending;
786 let range = convert_time_range_for_sort(range, time_unit)?;
787
788 let typed_sorted_range = [range.start, range.end]
789 .iter()
790 .map(|t| {
791 t.convert_to(time_unit.into())
792 .ok_or_else(|| {
793 DataFusionError::Internal(format!(
794 "Failed to convert timestamp from {:?} to {:?}",
795 t.unit(),
796 time_unit
797 ))
798 })
799 .and_then(|typed_ts| {
800 let value = Value::Timestamp(typed_ts);
801 value
802 .try_to_scalar_value(&value.data_type())
803 .map_err(|e| DataFusionError::External(Box::new(e) as _))
804 })
805 })
806 .try_collect::<_, Vec<_>, _>()?;
807
808 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
809
810 let (start, end) = if descending {
812 let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
816 let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
819 (start, end)
820 } else {
821 let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
824 let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
827 (start, end)
828 };
829
830 Ok((start, end - start))
831}
832
833fn sort_timestamp_unit(data_type: &DataType) -> datafusion_common::Result<arrow_schema::TimeUnit> {
834 if let DataType::Timestamp(unit, _) = data_type {
835 Ok(*unit)
836 } else {
837 Err(DataFusionError::Internal(format!(
838 "Unsupported sort column type: {data_type}"
839 )))
840 }
841}
842
843#[derive(Debug, Clone, Copy)]
844enum RangeBoundKind {
845 InclusiveStart,
846 ExclusiveEnd,
847}
848
849fn convert_time_range_for_sort(
850 range: &TimeRange,
851 time_unit: arrow_schema::TimeUnit,
852) -> datafusion_common::Result<TimeRange> {
853 let target_unit = time_unit.into();
854 Ok(TimeRange::new(
855 convert_timestamp_range_bound(range.start, target_unit, RangeBoundKind::InclusiveStart)?,
856 convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?,
857 ))
858}
859
860fn convert_timestamp_range_bound(
861 timestamp: Timestamp,
862 target_unit: TimestampUnit,
863 bound_kind: RangeBoundKind,
864) -> datafusion_common::Result<Timestamp> {
865 let converted = match bound_kind {
866 RangeBoundKind::InclusiveStart => timestamp.convert_to(target_unit),
867 RangeBoundKind::ExclusiveEnd => timestamp.convert_to_ceil(target_unit),
868 };
869
870 converted.ok_or_else(|| {
871 DataFusionError::Internal(format!(
872 "Failed to convert timestamp from {:?} to {:?}",
873 timestamp.unit(),
874 target_unit
875 ))
876 })
877}
878
879pub(crate) fn project_partition_range_for_sort(
880 range: PartitionRange,
881 sort_data_type: &DataType,
882) -> datafusion_common::Result<PartitionRange> {
883 let target_unit = sort_timestamp_unit(sort_data_type)?.into();
884 Ok(PartitionRange {
885 start: convert_timestamp_range_bound(
886 range.start,
887 target_unit,
888 RangeBoundKind::InclusiveStart,
889 )?,
890 end: convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?,
891 ..range
892 })
893}
894
895fn discrete_exclusive_end(timestamp: Timestamp) -> Timestamp {
896 Timestamp::new(timestamp.value() + 1, timestamp.unit())
897}
898
899#[macro_export]
903macro_rules! array_iter_helper {
904 ($t:ty, $unit:expr, $arr:expr) => {{
905 let typed = $arr
906 .as_any()
907 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
908 .unwrap();
909 let iter = typed.iter().enumerate();
910 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
911 }};
912}
913
914fn cmp_with_opts<T: Ord>(
918 a: &Option<T>,
919 b: &Option<T>,
920 opt: &Option<SortOptions>,
921) -> std::cmp::Ordering {
922 let opt = opt.unwrap_or_default();
923
924 if let (Some(a), Some(b)) = (a, b) {
925 if opt.descending { b.cmp(a) } else { a.cmp(b) }
926 } else if opt.nulls_first {
927 a.cmp(b)
930 } else {
931 match (a, b) {
932 (Some(a), Some(b)) => a.cmp(b),
933 (Some(_), None) => std::cmp::Ordering::Less,
934 (None, Some(_)) => std::cmp::Ordering::Greater,
935 (None, None) => std::cmp::Ordering::Equal,
936 }
937 }
938}
939
940#[derive(Debug, Clone)]
941struct SortedRunSet<N: Ord> {
942 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
944 sort_column: SortColumn,
946}
947
948#[derive(Debug, Clone, PartialEq)]
950struct SucRun<N: Ord> {
951 offset: usize,
953 len: usize,
955 first_val: Option<N>,
957 last_val: Option<N>,
959}
960
961impl SucRun<Timestamp> {
962 fn get_time_range(&self) -> Option<TimeRange> {
964 let start = self.first_val.min(self.last_val);
965 let end = self
966 .first_val
967 .max(self.last_val)
968 .map(discrete_exclusive_end);
969 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
970 }
971}
972
973fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
975 iter: T,
976 sort_opts: &Option<SortOptions>,
977) -> Vec<SucRun<N>> {
978 let mut runs = Vec::new();
979 let mut last_value = None;
980 let mut iter_len = None;
981
982 let mut last_offset = 0;
983 let mut first_val: Option<N> = None;
984 let mut last_val: Option<N> = None;
985
986 for (idx, t) in iter {
987 if let Some(last_value) = &last_value
988 && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
989 {
990 let len = idx - last_offset;
992 let run = SucRun {
993 offset: last_offset,
994 len,
995 first_val,
996 last_val,
997 };
998 runs.push(run);
999 first_val = None;
1000 last_val = None;
1001
1002 last_offset = idx;
1003 }
1004 last_value = Some(t);
1005 if let Some(t) = t {
1006 first_val = first_val.or(Some(t));
1007 last_val = Some(t).or(last_val);
1008 }
1009 iter_len = Some(idx);
1010 }
1011 let run = SucRun {
1012 offset: last_offset,
1013 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
1014 first_val,
1015 last_val,
1016 };
1017 runs.push(run);
1018
1019 runs
1020}
1021
1022fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
1026 let ty = sort_column.values.data_type();
1027 if let DataType::Timestamp(unit, _) = ty {
1028 let array = &sort_column.values;
1029 let iter = downcast_ts_array!(
1030 array.data_type() => (array_iter_helper, array),
1031 _ => internal_err!("Unsupported sort column type: {ty}")?
1032 );
1033
1034 let raw = find_successive_runs(iter, &sort_column.options);
1035 let ts_runs = raw
1036 .into_iter()
1037 .map(|run| SucRun {
1038 offset: run.offset,
1039 len: run.len,
1040 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
1041 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
1042 })
1043 .collect_vec();
1044 Ok(ts_runs)
1045 } else {
1046 Err(DataFusionError::Internal(format!(
1047 "Unsupported sort column type: {ty}"
1048 )))
1049 }
1050}
1051
1052#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
1056struct TimeRange {
1057 start: Timestamp,
1058 end: Timestamp,
1059}
1060
1061impl From<&PartitionRange> for TimeRange {
1062 fn from(range: &PartitionRange) -> Self {
1063 Self::new(range.start, range.end)
1064 }
1065}
1066
1067impl From<(Timestamp, Timestamp)> for TimeRange {
1068 fn from(range: (Timestamp, Timestamp)) -> Self {
1069 Self::new(range.0, range.1)
1070 }
1071}
1072
1073impl From<&(Timestamp, Timestamp)> for TimeRange {
1074 fn from(range: &(Timestamp, Timestamp)) -> Self {
1075 Self::new(range.0, range.1)
1076 }
1077}
1078
1079impl TimeRange {
1080 fn new(start: Timestamp, end: Timestamp) -> Self {
1082 if start > end {
1083 Self {
1084 start: end,
1085 end: start,
1086 }
1087 } else {
1088 Self { start, end }
1089 }
1090 }
1091
1092 fn is_subset(&self, other: &Self) -> bool {
1093 self.start >= other.start && self.end <= other.end
1094 }
1095
1096 fn is_overlapping(&self, other: &Self) -> bool {
1098 !(self.start >= other.end || self.end <= other.start)
1099 }
1100
1101 fn intersection(&self, other: &Self) -> Option<Self> {
1102 if self.is_overlapping(other) {
1103 Some(Self::new(
1104 self.start.max(other.start),
1105 self.end.min(other.end),
1106 ))
1107 } else {
1108 None
1109 }
1110 }
1111
1112 fn difference(&self, other: &Self) -> Vec<Self> {
1113 if !self.is_overlapping(other) {
1114 vec![*self]
1115 } else {
1116 let mut ret = Vec::new();
1117 if self.start < other.start && self.end > other.end {
1118 ret.push(Self::new(self.start, other.start));
1119 ret.push(Self::new(other.end, self.end));
1120 } else if self.start < other.start {
1121 ret.push(Self::new(self.start, other.start));
1122 } else if self.end > other.end {
1123 ret.push(Self::new(other.end, self.end));
1124 }
1125 ret
1126 }
1127 }
1128}
1129
1130fn split_range_by(
1132 input_range: &TimeRange,
1133 input_parts: &[usize],
1134 split_by: &TimeRange,
1135 split_idx: usize,
1136) -> Vec<Action> {
1137 let mut ret = Vec::new();
1138 if input_range.is_overlapping(split_by) {
1139 let input_parts = input_parts.to_vec();
1140 let new_parts = {
1141 let mut new_parts = input_parts.clone();
1142 new_parts.push(split_idx);
1143 new_parts
1144 };
1145
1146 ret.push(Action::Pop(*input_range));
1147 if let Some(intersection) = input_range.intersection(split_by) {
1148 ret.push(Action::Push(intersection, new_parts.clone()));
1149 }
1150 for diff in input_range.difference(split_by) {
1151 ret.push(Action::Push(diff, input_parts.clone()));
1152 }
1153 }
1154 ret
1155}
1156
1157#[derive(Debug, Clone, PartialEq, Eq)]
1158enum Action {
1159 Pop(TimeRange),
1160 Push(TimeRange, Vec<usize>),
1161}
1162
1163fn compute_all_working_ranges(
1171 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1172 descending: bool,
1173) -> Vec<(TimeRange, BTreeSet<usize>)> {
1174 let mut ret = Vec::new();
1175 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1176 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1177 Box::new(overlap_counts.iter().rev()) as _
1178 } else {
1179 Box::new(overlap_counts.iter()) as _
1180 };
1181 for (range, set) in overlap_iter {
1182 match &mut cur_range_set {
1183 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1184 Some((working_range, working_set)) => {
1185 let need_expand = {
1190 let last_part = working_set.last();
1191 let inter: BTreeSet<usize> = working_set
1192 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1193 .cloned()
1194 .collect();
1195 if let Some(one) = inter.first()
1196 && inter.len() == 1
1197 && Some(one) == last_part
1198 {
1199 if set.iter().all(|p| Some(p) >= last_part) {
1201 false
1203 } else {
1204 true
1206 }
1207 } else if inter.is_empty() {
1208 false
1210 } else {
1211 true
1213 }
1214 };
1215
1216 if need_expand {
1217 if descending {
1218 working_range.start = range.start;
1219 } else {
1220 working_range.end = range.end;
1221 }
1222 working_set.extend(set.iter().cloned());
1223 } else {
1224 ret.push((*working_range, std::mem::take(working_set)));
1225 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1226 }
1227 }
1228 }
1229 }
1230
1231 if let Some(cur_range_set) = cur_range_set {
1232 ret.push(cur_range_set)
1233 }
1234
1235 ret
1236}
1237
1238fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1241 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1243 for (idx, range) in ranges.iter().enumerate() {
1244 let key: TimeRange = (range.start, range.end).into();
1245 let mut actions = Vec::new();
1246 let mut untouched = vec![key];
1247 let forward_iter = ret
1251 .range(key..)
1252 .take_while(|(range, _)| range.is_overlapping(&key));
1253 let backward_iter = ret
1254 .range(..key)
1255 .rev()
1256 .take_while(|(range, _)| range.is_overlapping(&key));
1257
1258 for (range, parts) in forward_iter.chain(backward_iter) {
1259 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1260 let act = split_range_by(range, parts, &key, idx);
1261 actions.extend(act);
1262 }
1263
1264 for action in actions {
1265 match action {
1266 Action::Pop(range) => {
1267 ret.remove(&range);
1268 }
1269 Action::Push(range, parts) => {
1270 ret.insert(range, parts);
1271 }
1272 }
1273 }
1274
1275 for range in untouched {
1277 ret.insert(range, vec![idx]);
1278 }
1279 }
1280 ret
1281}
1282
1283fn get_timestamp_from_idx(
1285 array: &ArrayRef,
1286 offset: usize,
1287) -> datafusion_common::Result<Option<Timestamp>> {
1288 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1289 unit
1290 } else {
1291 return Err(DataFusionError::Internal(format!(
1292 "Unsupported sort column type: {}",
1293 array.data_type()
1294 )));
1295 };
1296 let ty = array.data_type();
1297 let array = array.slice(offset, 1);
1298 let mut iter = downcast_ts_array!(
1299 array.data_type() => (array_iter_helper, array),
1300 _ => internal_err!("Unsupported sort column type: {ty}")?
1301 );
1302 let (_idx, val) = iter.next().ok_or_else(|| {
1303 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1304 })?;
1305 let val = if let Some(val) = val {
1306 val
1307 } else {
1308 return Ok(None);
1309 };
1310 let gt_timestamp = Timestamp::new(val, time_unit.into());
1311 Ok(Some(gt_timestamp))
1312}
1313
1314#[cfg(test)]
1315mod test {
1316 use std::io::Write;
1317 use std::sync::Arc;
1318
1319 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1320 use arrow::compute::concat_batches;
1321 use arrow::json::ArrayWriter;
1322 use arrow_schema::{Field, Schema, TimeUnit};
1323 use futures::StreamExt;
1324 use pretty_assertions::assert_eq;
1325 use serde_json::json;
1326
1327 use super::*;
1328 use crate::test_util::{MockInputExec, new_ts_array};
1329
1330 mod helpers {
1332 use datafusion::physical_plan::expressions::Column;
1333
1334 use super::*;
1335
1336 pub fn default_sort_opts(descending: bool) -> SortOptions {
1337 SortOptions {
1338 descending,
1339 nulls_first: true,
1340 }
1341 }
1342
1343 pub fn ts_field(unit: TimeUnit) -> Field {
1344 Field::new("ts", DataType::Timestamp(unit, None), false)
1345 }
1346
1347 pub fn ts_column() -> Column {
1348 Column::new("ts", 0)
1349 }
1350
1351 pub fn partition_range(start: i64, end: i64, num_rows: usize, id: usize) -> PartitionRange {
1352 PartitionRange {
1353 start: Timestamp::new_millisecond(start),
1354 end: Timestamp::new_millisecond(end),
1355 num_rows,
1356 identifier: id,
1357 }
1358 }
1359
1360 pub fn ts_array(values: impl IntoIterator<Item = i64>) -> ArrayRef {
1361 Arc::new(TimestampMillisecondArray::from_iter_values(values))
1362 }
1363 }
1364
1365 #[test]
1366 fn test_overlapping() {
1367 let testcases = [
1368 (
1369 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1370 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1371 false,
1372 ),
1373 (
1374 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1375 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1376 true,
1377 ),
1378 (
1379 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1380 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1381 true,
1382 ),
1383 (
1384 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1385 (
1386 Timestamp::new_millisecond(1000),
1387 Timestamp::new_millisecond(1002),
1388 ),
1389 true,
1390 ),
1391 (
1392 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1393 (
1394 Timestamp::new_millisecond(1001),
1395 Timestamp::new_millisecond(1002),
1396 ),
1397 false,
1398 ),
1399 (
1400 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1401 (
1402 Timestamp::new_millisecond(1002),
1403 Timestamp::new_millisecond(1003),
1404 ),
1405 false,
1406 ),
1407 ];
1408
1409 for (range1, range2, expected) in testcases.iter() {
1410 assert_eq!(
1411 TimeRange::from(range1).is_overlapping(&range2.into()),
1412 *expected,
1413 "range1: {:?}, range2: {:?}",
1414 range1,
1415 range2
1416 );
1417 }
1418 }
1419
1420 #[test]
1421 fn test_split() {
1422 let testcases = [
1423 (
1425 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1426 vec![0],
1427 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1428 1,
1429 vec![],
1430 ),
1431 (
1433 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1434 vec![0],
1435 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1436 1,
1437 vec![
1438 Action::Pop(
1439 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1440 ),
1441 Action::Push(
1442 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1443 vec![0, 1],
1444 ),
1445 ],
1446 ),
1447 (
1448 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1449 vec![0],
1450 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1451 1,
1452 vec![
1453 Action::Pop(
1454 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1455 ),
1456 Action::Push(
1457 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1458 vec![0, 1],
1459 ),
1460 ],
1461 ),
1462 (
1463 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1464 vec![0],
1465 (
1466 Timestamp::new_millisecond(1000),
1467 Timestamp::new_millisecond(1002),
1468 ),
1469 1,
1470 vec![
1471 Action::Pop(
1472 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1473 ),
1474 Action::Push(
1475 (
1476 Timestamp::new_millisecond(1000),
1477 Timestamp::new_millisecond(1001),
1478 )
1479 .into(),
1480 vec![0, 1],
1481 ),
1482 ],
1483 ),
1484 (
1486 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1487 vec![0],
1488 (
1489 Timestamp::new_millisecond(1001),
1490 Timestamp::new_millisecond(1002),
1491 ),
1492 1,
1493 vec![
1494 Action::Pop(
1495 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1496 ),
1497 Action::Push(
1498 (
1499 Timestamp::new_millisecond(1001),
1500 Timestamp::new_millisecond(1002),
1501 )
1502 .into(),
1503 vec![0, 1],
1504 ),
1505 Action::Push(
1506 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1507 vec![0],
1508 ),
1509 ],
1510 ),
1511 (
1513 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1514 vec![0],
1515 (
1516 Timestamp::new_millisecond(1001),
1517 Timestamp::new_millisecond(1002),
1518 ),
1519 1,
1520 vec![
1521 Action::Pop(
1522 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1523 ),
1524 Action::Push(
1525 (
1526 Timestamp::new_millisecond(1001),
1527 Timestamp::new_millisecond(1002),
1528 )
1529 .into(),
1530 vec![0, 1],
1531 ),
1532 Action::Push(
1533 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1534 vec![0],
1535 ),
1536 Action::Push(
1537 (
1538 Timestamp::new_millisecond(1002),
1539 Timestamp::new_millisecond(1004),
1540 )
1541 .into(),
1542 vec![0],
1543 ),
1544 ],
1545 ),
1546 ];
1547
1548 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1549 assert_eq!(
1550 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1551 *expected,
1552 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1553 range,
1554 parts,
1555 split_by,
1556 split_idx
1557 );
1558 }
1559 }
1560
1561 #[test]
1562 fn test_project_partition_range_for_sort_uses_ceil_on_exclusive_end() {
1563 let range = PartitionRange {
1564 start: Timestamp::new_nanosecond(1_000_000),
1565 end: Timestamp::new_nanosecond(1_000_001),
1566 num_rows: 1,
1567 identifier: 0,
1568 };
1569
1570 let projected = project_partition_range_for_sort(
1571 range,
1572 &DataType::Timestamp(TimeUnit::Millisecond, None),
1573 )
1574 .unwrap();
1575
1576 assert_eq!(Timestamp::new_millisecond(1), projected.start);
1577 assert_eq!(Timestamp::new_millisecond(2), projected.end);
1578 }
1579
1580 #[test]
1581 fn test_find_slice_from_range_preserves_last_row_after_precision_drop() {
1582 let sort_column = SortColumn {
1583 values: Arc::new(TimestampMillisecondArray::from_iter_values([1])) as ArrayRef,
1584 options: Some(SortOptions::default()),
1585 };
1586 let range = TimeRange::new(
1587 Timestamp::new_nanosecond(1_000_000),
1588 Timestamp::new_nanosecond(1_000_001),
1589 );
1590
1591 assert_eq!((0, 1), find_slice_from_range(&sort_column, &range).unwrap());
1592 }
1593
1594 #[test]
1595 fn test_discrete_exclusive_end_creates_half_open_upper_bound() {
1596 let timestamp = Timestamp::new_millisecond(42);
1597
1598 assert_eq!(
1599 Timestamp::new_millisecond(43),
1600 discrete_exclusive_end(timestamp)
1601 );
1602 }
1603
1604 #[allow(clippy::type_complexity)]
1605 fn run_compute_working_ranges_test(
1606 testcases: Vec<(
1607 BTreeMap<(Timestamp, Timestamp), Vec<usize>>,
1608 Vec<((Timestamp, Timestamp), BTreeSet<usize>)>,
1609 )>,
1610 descending: bool,
1611 ) {
1612 for (input, expected) in testcases {
1613 let expected = expected
1614 .into_iter()
1615 .map(|(r, s)| (r.into(), s))
1616 .collect_vec();
1617 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1618 assert_eq!(
1619 compute_all_working_ranges(&input, descending),
1620 expected,
1621 "input: {:?}, descending: {}",
1622 input,
1623 descending
1624 );
1625 }
1626 }
1627
1628 #[test]
1629 fn test_compute_working_ranges_descending() {
1630 let testcases = vec![
1631 (
1632 BTreeMap::from([(
1633 (Timestamp::new_second(1), Timestamp::new_second(2)),
1634 vec![0],
1635 )]),
1636 vec![(
1637 (Timestamp::new_second(1), Timestamp::new_second(2)),
1638 BTreeSet::from([0]),
1639 )],
1640 ),
1641 (
1642 BTreeMap::from([(
1643 (Timestamp::new_second(1), Timestamp::new_second(2)),
1644 vec![0, 1],
1645 )]),
1646 vec![(
1647 (Timestamp::new_second(1), Timestamp::new_second(2)),
1648 BTreeSet::from([0, 1]),
1649 )],
1650 ),
1651 (
1652 BTreeMap::from([
1653 (
1654 (Timestamp::new_second(2), Timestamp::new_second(3)),
1655 vec![0],
1656 ),
1657 (
1658 (Timestamp::new_second(1), Timestamp::new_second(2)),
1659 vec![0, 1],
1660 ),
1661 ]),
1662 vec![
1663 (
1664 (Timestamp::new_second(2), Timestamp::new_second(3)),
1665 BTreeSet::from([0]),
1666 ),
1667 (
1668 (Timestamp::new_second(1), Timestamp::new_second(2)),
1669 BTreeSet::from([0, 1]),
1670 ),
1671 ],
1672 ),
1673 (
1674 BTreeMap::from([
1675 (
1676 (Timestamp::new_second(2), Timestamp::new_second(3)),
1677 vec![0, 1],
1678 ),
1679 (
1680 (Timestamp::new_second(1), Timestamp::new_second(2)),
1681 vec![1],
1682 ),
1683 ]),
1684 vec![
1685 (
1686 (Timestamp::new_second(2), Timestamp::new_second(3)),
1687 BTreeSet::from([0, 1]),
1688 ),
1689 (
1690 (Timestamp::new_second(1), Timestamp::new_second(2)),
1691 BTreeSet::from([1]),
1692 ),
1693 ],
1694 ),
1695 (
1696 BTreeMap::from([
1697 (
1698 (Timestamp::new_second(3), Timestamp::new_second(4)),
1699 vec![0],
1700 ),
1701 (
1702 (Timestamp::new_second(2), Timestamp::new_second(3)),
1703 vec![0, 1],
1704 ),
1705 (
1706 (Timestamp::new_second(1), Timestamp::new_second(2)),
1707 vec![1],
1708 ),
1709 ]),
1710 vec![
1711 (
1712 (Timestamp::new_second(3), Timestamp::new_second(4)),
1713 BTreeSet::from([0]),
1714 ),
1715 (
1716 (Timestamp::new_second(2), Timestamp::new_second(3)),
1717 BTreeSet::from([0, 1]),
1718 ),
1719 (
1720 (Timestamp::new_second(1), Timestamp::new_second(2)),
1721 BTreeSet::from([1]),
1722 ),
1723 ],
1724 ),
1725 (
1726 BTreeMap::from([
1727 (
1728 (Timestamp::new_second(3), Timestamp::new_second(4)),
1729 vec![0, 2],
1730 ),
1731 (
1732 (Timestamp::new_second(2), Timestamp::new_second(3)),
1733 vec![0, 1, 2],
1734 ),
1735 (
1736 (Timestamp::new_second(1), Timestamp::new_second(2)),
1737 vec![1, 2],
1738 ),
1739 ]),
1740 vec![(
1741 (Timestamp::new_second(1), Timestamp::new_second(4)),
1742 BTreeSet::from([0, 1, 2]),
1743 )],
1744 ),
1745 (
1746 BTreeMap::from([
1747 (
1748 (Timestamp::new_second(2), Timestamp::new_second(3)),
1749 vec![0, 2],
1750 ),
1751 (
1752 (Timestamp::new_second(1), Timestamp::new_second(2)),
1753 vec![1, 2],
1754 ),
1755 ]),
1756 vec![(
1757 (Timestamp::new_second(1), Timestamp::new_second(3)),
1758 BTreeSet::from([0, 1, 2]),
1759 )],
1760 ),
1761 (
1762 BTreeMap::from([
1763 (
1764 (Timestamp::new_second(3), Timestamp::new_second(4)),
1765 vec![0, 1],
1766 ),
1767 (
1768 (Timestamp::new_second(2), Timestamp::new_second(3)),
1769 vec![0, 1, 2],
1770 ),
1771 (
1772 (Timestamp::new_second(1), Timestamp::new_second(2)),
1773 vec![1, 2],
1774 ),
1775 ]),
1776 vec![(
1777 (Timestamp::new_second(1), Timestamp::new_second(4)),
1778 BTreeSet::from([0, 1, 2]),
1779 )],
1780 ),
1781 (
1782 BTreeMap::from([
1783 (
1784 (Timestamp::new_second(2), Timestamp::new_second(3)),
1785 vec![0, 1],
1786 ),
1787 (
1788 (Timestamp::new_second(1), Timestamp::new_second(2)),
1789 vec![1, 2],
1790 ),
1791 ]),
1792 vec![
1793 (
1794 (Timestamp::new_second(2), Timestamp::new_second(3)),
1795 BTreeSet::from([0, 1]),
1796 ),
1797 (
1798 (Timestamp::new_second(1), Timestamp::new_second(2)),
1799 BTreeSet::from([1, 2]),
1800 ),
1801 ],
1802 ),
1803 (
1805 BTreeMap::from([
1806 (
1807 (Timestamp::new_second(2), Timestamp::new_second(3)),
1808 vec![0],
1809 ),
1810 (
1811 (Timestamp::new_second(1), Timestamp::new_second(2)),
1812 vec![1, 2],
1813 ),
1814 ]),
1815 vec![
1816 (
1817 (Timestamp::new_second(2), Timestamp::new_second(3)),
1818 BTreeSet::from([0]),
1819 ),
1820 (
1821 (Timestamp::new_second(1), Timestamp::new_second(2)),
1822 BTreeSet::from([1, 2]),
1823 ),
1824 ],
1825 ),
1826 ];
1827
1828 run_compute_working_ranges_test(testcases, true);
1829 }
1830
1831 #[test]
1832 fn test_compute_working_ranges_ascending() {
1833 let testcases = vec![
1834 (
1835 BTreeMap::from([(
1836 (Timestamp::new_second(1), Timestamp::new_second(2)),
1837 vec![0],
1838 )]),
1839 vec![(
1840 (Timestamp::new_second(1), Timestamp::new_second(2)),
1841 BTreeSet::from([0]),
1842 )],
1843 ),
1844 (
1845 BTreeMap::from([(
1846 (Timestamp::new_second(1), Timestamp::new_second(2)),
1847 vec![0, 1],
1848 )]),
1849 vec![(
1850 (Timestamp::new_second(1), Timestamp::new_second(2)),
1851 BTreeSet::from([0, 1]),
1852 )],
1853 ),
1854 (
1855 BTreeMap::from([
1856 (
1857 (Timestamp::new_second(1), Timestamp::new_second(2)),
1858 vec![0, 1],
1859 ),
1860 (
1861 (Timestamp::new_second(2), Timestamp::new_second(3)),
1862 vec![1],
1863 ),
1864 ]),
1865 vec![
1866 (
1867 (Timestamp::new_second(1), Timestamp::new_second(2)),
1868 BTreeSet::from([0, 1]),
1869 ),
1870 (
1871 (Timestamp::new_second(2), Timestamp::new_second(3)),
1872 BTreeSet::from([1]),
1873 ),
1874 ],
1875 ),
1876 (
1877 BTreeMap::from([
1878 (
1879 (Timestamp::new_second(1), Timestamp::new_second(2)),
1880 vec![0],
1881 ),
1882 (
1883 (Timestamp::new_second(2), Timestamp::new_second(3)),
1884 vec![0, 1],
1885 ),
1886 ]),
1887 vec![
1888 (
1889 (Timestamp::new_second(1), Timestamp::new_second(2)),
1890 BTreeSet::from([0]),
1891 ),
1892 (
1893 (Timestamp::new_second(2), Timestamp::new_second(3)),
1894 BTreeSet::from([0, 1]),
1895 ),
1896 ],
1897 ),
1898 (
1900 BTreeMap::from([
1901 (
1902 (Timestamp::new_second(1), Timestamp::new_second(2)),
1903 vec![0],
1904 ),
1905 (
1906 (Timestamp::new_second(2), Timestamp::new_second(3)),
1907 vec![0, 1],
1908 ),
1909 (
1910 (Timestamp::new_second(3), Timestamp::new_second(4)),
1911 vec![1],
1912 ),
1913 ]),
1914 vec![
1915 (
1916 (Timestamp::new_second(1), Timestamp::new_second(2)),
1917 BTreeSet::from([0]),
1918 ),
1919 (
1920 (Timestamp::new_second(2), Timestamp::new_second(3)),
1921 BTreeSet::from([0, 1]),
1922 ),
1923 (
1924 (Timestamp::new_second(3), Timestamp::new_second(4)),
1925 BTreeSet::from([1]),
1926 ),
1927 ],
1928 ),
1929 (
1930 BTreeMap::from([
1931 (
1932 (Timestamp::new_second(1), Timestamp::new_second(2)),
1933 vec![0, 2],
1934 ),
1935 (
1936 (Timestamp::new_second(2), Timestamp::new_second(3)),
1937 vec![0, 1, 2],
1938 ),
1939 (
1940 (Timestamp::new_second(3), Timestamp::new_second(4)),
1941 vec![1, 2],
1942 ),
1943 ]),
1944 vec![(
1945 (Timestamp::new_second(1), Timestamp::new_second(4)),
1946 BTreeSet::from([0, 1, 2]),
1947 )],
1948 ),
1949 (
1950 BTreeMap::from([
1951 (
1952 (Timestamp::new_second(1), Timestamp::new_second(2)),
1953 vec![0, 2],
1954 ),
1955 (
1956 (Timestamp::new_second(2), Timestamp::new_second(3)),
1957 vec![1, 2],
1958 ),
1959 ]),
1960 vec![(
1961 (Timestamp::new_second(1), Timestamp::new_second(3)),
1962 BTreeSet::from([0, 1, 2]),
1963 )],
1964 ),
1965 (
1966 BTreeMap::from([
1967 (
1968 (Timestamp::new_second(1), Timestamp::new_second(2)),
1969 vec![0, 1],
1970 ),
1971 (
1972 (Timestamp::new_second(2), Timestamp::new_second(3)),
1973 vec![0, 1, 2],
1974 ),
1975 (
1976 (Timestamp::new_second(3), Timestamp::new_second(4)),
1977 vec![1, 2],
1978 ),
1979 ]),
1980 vec![(
1981 (Timestamp::new_second(1), Timestamp::new_second(4)),
1982 BTreeSet::from([0, 1, 2]),
1983 )],
1984 ),
1985 (
1986 BTreeMap::from([
1987 (
1988 (Timestamp::new_second(1), Timestamp::new_second(2)),
1989 vec![0, 1],
1990 ),
1991 (
1992 (Timestamp::new_second(2), Timestamp::new_second(3)),
1993 vec![1, 2],
1994 ),
1995 ]),
1996 vec![
1997 (
1998 (Timestamp::new_second(1), Timestamp::new_second(2)),
1999 BTreeSet::from([0, 1]),
2000 ),
2001 (
2002 (Timestamp::new_second(2), Timestamp::new_second(3)),
2003 BTreeSet::from([1, 2]),
2004 ),
2005 ],
2006 ),
2007 (
2009 BTreeMap::from([
2010 (
2011 (Timestamp::new_second(1), Timestamp::new_second(2)),
2012 vec![0, 1],
2013 ),
2014 (
2015 (Timestamp::new_second(2), Timestamp::new_second(3)),
2016 vec![2],
2017 ),
2018 ]),
2019 vec![
2020 (
2021 (Timestamp::new_second(1), Timestamp::new_second(2)),
2022 BTreeSet::from([0, 1]),
2023 ),
2024 (
2025 (Timestamp::new_second(2), Timestamp::new_second(3)),
2026 BTreeSet::from([2]),
2027 ),
2028 ],
2029 ),
2030 ];
2031
2032 run_compute_working_ranges_test(testcases, false);
2033 }
2034
2035 #[test]
2036 fn test_split_overlap_range() {
2037 let testcases = vec![
2038 (
2040 vec![PartitionRange {
2041 start: Timestamp::new_second(1),
2042 end: Timestamp::new_second(2),
2043 num_rows: 2,
2044 identifier: 0,
2045 }],
2046 BTreeMap::from_iter(
2047 vec![(
2048 (Timestamp::new_second(1), Timestamp::new_second(2)),
2049 vec![0],
2050 )]
2051 .into_iter(),
2052 ),
2053 ),
2054 (
2056 vec![
2057 PartitionRange {
2058 start: Timestamp::new_second(1),
2059 end: Timestamp::new_second(2),
2060 num_rows: 2,
2061 identifier: 0,
2062 },
2063 PartitionRange {
2064 start: Timestamp::new_second(1),
2065 end: Timestamp::new_second(2),
2066 num_rows: 2,
2067 identifier: 1,
2068 },
2069 ],
2070 BTreeMap::from_iter(
2071 vec![(
2072 (Timestamp::new_second(1), Timestamp::new_second(2)),
2073 vec![0, 1],
2074 )]
2075 .into_iter(),
2076 ),
2077 ),
2078 (
2079 vec![
2080 PartitionRange {
2081 start: Timestamp::new_second(1),
2082 end: Timestamp::new_second(3),
2083 num_rows: 2,
2084 identifier: 0,
2085 },
2086 PartitionRange {
2087 start: Timestamp::new_second(2),
2088 end: Timestamp::new_second(4),
2089 num_rows: 2,
2090 identifier: 1,
2091 },
2092 ],
2093 BTreeMap::from_iter(
2094 vec![
2095 (
2096 (Timestamp::new_second(1), Timestamp::new_second(2)),
2097 vec![0],
2098 ),
2099 (
2100 (Timestamp::new_second(2), Timestamp::new_second(3)),
2101 vec![0, 1],
2102 ),
2103 (
2104 (Timestamp::new_second(3), Timestamp::new_second(4)),
2105 vec![1],
2106 ),
2107 ]
2108 .into_iter(),
2109 ),
2110 ),
2111 (
2113 vec![
2114 PartitionRange {
2115 start: Timestamp::new_second(1),
2116 end: Timestamp::new_second(3),
2117 num_rows: 2,
2118 identifier: 0,
2119 },
2120 PartitionRange {
2121 start: Timestamp::new_second(2),
2122 end: Timestamp::new_second(4),
2123 num_rows: 2,
2124 identifier: 1,
2125 },
2126 PartitionRange {
2127 start: Timestamp::new_second(1),
2128 end: Timestamp::new_second(4),
2129 num_rows: 2,
2130 identifier: 2,
2131 },
2132 ],
2133 BTreeMap::from_iter(
2134 vec![
2135 (
2136 (Timestamp::new_second(1), Timestamp::new_second(2)),
2137 vec![0, 2],
2138 ),
2139 (
2140 (Timestamp::new_second(2), Timestamp::new_second(3)),
2141 vec![0, 1, 2],
2142 ),
2143 (
2144 (Timestamp::new_second(3), Timestamp::new_second(4)),
2145 vec![1, 2],
2146 ),
2147 ]
2148 .into_iter(),
2149 ),
2150 ),
2151 (
2152 vec![
2153 PartitionRange {
2154 start: Timestamp::new_second(1),
2155 end: Timestamp::new_second(3),
2156 num_rows: 2,
2157 identifier: 0,
2158 },
2159 PartitionRange {
2160 start: Timestamp::new_second(1),
2161 end: Timestamp::new_second(4),
2162 num_rows: 2,
2163 identifier: 1,
2164 },
2165 PartitionRange {
2166 start: Timestamp::new_second(2),
2167 end: Timestamp::new_second(4),
2168 num_rows: 2,
2169 identifier: 2,
2170 },
2171 ],
2172 BTreeMap::from_iter(
2173 vec![
2174 (
2175 (Timestamp::new_second(1), Timestamp::new_second(2)),
2176 vec![0, 1],
2177 ),
2178 (
2179 (Timestamp::new_second(2), Timestamp::new_second(3)),
2180 vec![0, 1, 2],
2181 ),
2182 (
2183 (Timestamp::new_second(3), Timestamp::new_second(4)),
2184 vec![1, 2],
2185 ),
2186 ]
2187 .into_iter(),
2188 ),
2189 ),
2190 ];
2191
2192 for (input, expected) in testcases {
2193 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2194 assert_eq!(split_overlapping_ranges(&input), expected);
2195 }
2196 }
2197
2198 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2199 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2200 Self {
2201 offset: offset as usize,
2202 len: len as usize,
2203 first_val: min_val,
2204 last_val: max_val,
2205 }
2206 }
2207 }
2208
2209 #[test]
2210 fn test_find_successive_runs() {
2211 let testcases = vec![
2212 (
2213 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2214 Some(SortOptions {
2215 descending: false,
2216 nulls_first: false,
2217 }),
2218 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2219 ),
2220 (
2221 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2222 Some(SortOptions {
2223 descending: false,
2224 nulls_first: false,
2225 }),
2226 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2227 ),
2228 (
2229 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2230 Some(SortOptions {
2231 descending: false,
2232 nulls_first: false,
2233 }),
2234 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2235 ),
2236 (
2237 vec![Some(1), Some(2), Some(1), Some(3)],
2238 Some(SortOptions {
2239 descending: false,
2240 nulls_first: false,
2241 }),
2242 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2243 ),
2244 (
2245 vec![Some(1), Some(2), Some(1), Some(3)],
2246 Some(SortOptions {
2247 descending: true,
2248 nulls_first: false,
2249 }),
2250 vec![
2251 (0, 1, Some(1), Some(1)),
2252 (1, 2, Some(2), Some(1)),
2253 (3, 1, Some(3), Some(3)),
2254 ],
2255 ),
2256 (
2257 vec![Some(1), Some(2), None, Some(3)],
2258 Some(SortOptions {
2259 descending: false,
2260 nulls_first: true,
2261 }),
2262 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2263 ),
2264 (
2265 vec![Some(1), Some(2), None, Some(3)],
2266 Some(SortOptions {
2267 descending: false,
2268 nulls_first: false,
2269 }),
2270 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2271 ),
2272 (
2273 vec![Some(2), Some(1), None, Some(3)],
2274 Some(SortOptions {
2275 descending: true,
2276 nulls_first: true,
2277 }),
2278 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2279 ),
2280 (
2281 vec![],
2282 Some(SortOptions {
2283 descending: false,
2284 nulls_first: true,
2285 }),
2286 vec![(0, 0, None, None)],
2287 ),
2288 (
2289 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2290 Some(SortOptions {
2291 descending: true,
2292 nulls_first: true,
2293 }),
2294 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2295 ),
2296 (
2297 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2298 Some(SortOptions {
2299 descending: true,
2300 nulls_first: false,
2301 }),
2302 vec![
2303 (0, 2, None, None),
2304 (2, 3, Some(2), Some(1)),
2305 (5, 2, Some(5), Some(4)),
2306 ],
2307 ),
2308 ];
2309 for (input, sort_opts, expected) in testcases {
2310 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2311 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2312 assert_eq!(
2313 ret, expected,
2314 "input: {:?}, opt: {:?},expected: {:?}",
2315 input, sort_opts, expected
2316 );
2317 }
2318 }
2319
2320 #[test]
2321 fn test_cmp_with_opts() {
2322 let testcases = vec![
2323 (
2325 Some(1),
2326 Some(2),
2327 Some(SortOptions {
2328 descending: false,
2329 nulls_first: false,
2330 }),
2331 std::cmp::Ordering::Less,
2332 ),
2333 (
2334 Some(1),
2335 Some(2),
2336 Some(SortOptions {
2337 descending: true,
2338 nulls_first: false,
2339 }),
2340 std::cmp::Ordering::Greater,
2341 ),
2342 (
2344 Some(1),
2345 None,
2346 Some(SortOptions {
2347 descending: false,
2348 nulls_first: true,
2349 }),
2350 std::cmp::Ordering::Greater,
2351 ),
2352 (
2353 Some(1),
2354 None,
2355 Some(SortOptions {
2356 descending: true,
2357 nulls_first: true,
2358 }),
2359 std::cmp::Ordering::Greater,
2360 ),
2361 (
2363 Some(1),
2364 None,
2365 Some(SortOptions {
2366 descending: true,
2367 nulls_first: false,
2368 }),
2369 std::cmp::Ordering::Less,
2370 ),
2371 (
2372 Some(1),
2373 None,
2374 Some(SortOptions {
2375 descending: false,
2376 nulls_first: false,
2377 }),
2378 std::cmp::Ordering::Less,
2379 ),
2380 (
2382 None,
2383 None,
2384 Some(SortOptions {
2385 descending: false,
2386 nulls_first: true,
2387 }),
2388 std::cmp::Ordering::Equal,
2389 ),
2390 ];
2391 for (a, b, opts, expected) in testcases {
2392 assert_eq!(
2393 cmp_with_opts(&a, &b, &opts),
2394 expected,
2395 "a: {:?}, b: {:?}, opts: {:?}",
2396 a,
2397 b,
2398 opts
2399 );
2400 }
2401 }
2402
2403 #[test]
2404 fn test_find_slice_from_range() {
2405 let test_cases = vec![
2406 (
2408 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2409 false,
2410 TimeRange {
2411 start: Timestamp::new_millisecond(2),
2412 end: Timestamp::new_millisecond(4),
2413 },
2414 Ok((1, 2)),
2415 ),
2416 (
2417 Arc::new(TimestampMillisecondArray::from_iter_values([
2418 -2, -1, 0, 1, 2, 3, 4, 5,
2419 ])) as ArrayRef,
2420 false,
2421 TimeRange {
2422 start: Timestamp::new_millisecond(-1),
2423 end: Timestamp::new_millisecond(4),
2424 },
2425 Ok((1, 5)),
2426 ),
2427 (
2428 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2429 false,
2430 TimeRange {
2431 start: Timestamp::new_millisecond(2),
2432 end: Timestamp::new_millisecond(5),
2433 },
2434 Ok((1, 2)),
2435 ),
2436 (
2437 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2438 false,
2439 TimeRange {
2440 start: Timestamp::new_millisecond(2),
2441 end: Timestamp::new_millisecond(5),
2442 },
2443 Ok((1, 3)),
2444 ),
2445 (
2446 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2447 false,
2448 TimeRange {
2449 start: Timestamp::new_millisecond(2),
2450 end: Timestamp::new_millisecond(5),
2451 },
2452 Ok((1, 2)),
2453 ),
2454 (
2455 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2456 false,
2457 TimeRange {
2458 start: Timestamp::new_millisecond(6),
2459 end: Timestamp::new_millisecond(7),
2460 },
2461 Ok((5, 0)),
2462 ),
2463 (
2465 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2466 true,
2467 TimeRange {
2468 end: Timestamp::new_millisecond(4),
2469 start: Timestamp::new_millisecond(1),
2470 },
2471 Ok((1, 3)),
2472 ),
2473 (
2474 Arc::new(TimestampMillisecondArray::from_iter_values([
2475 5, 4, 3, 2, 1, 0,
2476 ])) as ArrayRef,
2477 true,
2478 TimeRange {
2479 end: Timestamp::new_millisecond(4),
2480 start: Timestamp::new_millisecond(1),
2481 },
2482 Ok((2, 3)),
2483 ),
2484 (
2485 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2486 true,
2487 TimeRange {
2488 end: Timestamp::new_millisecond(4),
2489 start: Timestamp::new_millisecond(1),
2490 },
2491 Ok((1, 2)),
2492 ),
2493 (
2494 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2495 true,
2496 TimeRange {
2497 end: Timestamp::new_millisecond(4),
2498 start: Timestamp::new_millisecond(1),
2499 },
2500 Ok((2, 2)),
2501 ),
2502 (
2503 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2504 true,
2505 TimeRange {
2506 end: Timestamp::new_millisecond(5),
2507 start: Timestamp::new_millisecond(2),
2508 },
2509 Ok((1, 3)),
2510 ),
2511 (
2512 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2513 true,
2514 TimeRange {
2515 end: Timestamp::new_millisecond(5),
2516 start: Timestamp::new_millisecond(2),
2517 },
2518 Ok((1, 2)),
2519 ),
2520 (
2521 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2522 true,
2523 TimeRange {
2524 end: Timestamp::new_millisecond(5),
2525 start: Timestamp::new_millisecond(2),
2526 },
2527 Ok((1, 3)),
2528 ),
2529 (
2530 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2531 true,
2532 TimeRange {
2533 end: Timestamp::new_millisecond(5),
2534 start: Timestamp::new_millisecond(2),
2535 },
2536 Ok((1, 2)),
2537 ),
2538 (
2539 Arc::new(TimestampMillisecondArray::from_iter_values([
2540 10, 9, 8, 7, 6,
2541 ])) as ArrayRef,
2542 true,
2543 TimeRange {
2544 end: Timestamp::new_millisecond(5),
2545 start: Timestamp::new_millisecond(2),
2546 },
2547 Ok((5, 0)),
2548 ),
2549 (
2551 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2552 true,
2553 TimeRange {
2554 end: Timestamp::new_millisecond(4),
2555 start: Timestamp::new_millisecond(3),
2556 },
2557 Ok((0, 1)),
2558 ),
2559 (
2560 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2561 true,
2562 TimeRange {
2563 end: Timestamp::new_millisecond(4),
2564 start: Timestamp::new_millisecond(3),
2565 },
2566 Ok((1, 1)),
2567 ),
2568 (
2569 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2570 true,
2571 TimeRange {
2572 end: Timestamp::new_millisecond(4),
2573 start: Timestamp::new_millisecond(3),
2574 },
2575 Ok((2, 1)),
2576 ),
2577 ];
2578
2579 for (sort_vals, descending, range, expected) in test_cases {
2580 let sort_column = SortColumn {
2581 values: sort_vals,
2582 options: Some(SortOptions {
2583 descending,
2584 ..Default::default()
2585 }),
2586 };
2587 let ret = find_slice_from_range(&sort_column, &range);
2588 match (ret, expected) {
2589 (Ok(ret), Ok(expected)) => {
2590 assert_eq!(
2591 ret, expected,
2592 "sort_vals: {:?}, range: {:?}",
2593 sort_column, range
2594 )
2595 }
2596 (Err(err), Err(expected)) => {
2597 let expected: &str = expected;
2598 assert!(
2599 err.to_string().contains(expected),
2600 "err: {:?}, expected: {:?}",
2601 err,
2602 expected
2603 );
2604 }
2605 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2606 }
2607 }
2608 }
2609
2610 #[derive(Debug)]
2611 struct TestStream {
2612 expression: PhysicalSortExpr,
2613 fetch: Option<usize>,
2614 input: Vec<(PartitionRange, DfRecordBatch)>,
2615 output: Vec<DfRecordBatch>,
2616 schema: SchemaRef,
2617 }
2618
2619 impl TestStream {
2620 fn new(
2621 opt: SortOptions,
2622 fetch: Option<usize>,
2623 unit: TimeUnit,
2624 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2625 expected: Vec<Vec<ArrayRef>>,
2626 ) -> Self {
2627 let expression = PhysicalSortExpr {
2628 expr: Arc::new(helpers::ts_column()),
2629 options: opt,
2630 };
2631 let schema = Schema::new(vec![helpers::ts_field(unit)]);
2632 let schema = Arc::new(schema);
2633 let input = input
2634 .into_iter()
2635 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2636 .collect_vec();
2637 let output_batchs = expected
2638 .into_iter()
2639 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2640 .collect_vec();
2641 Self {
2642 expression,
2643 fetch,
2644 input,
2645 output: output_batchs,
2646 schema,
2647 }
2648 }
2649
2650 fn new_simple(
2651 descending: bool,
2652 fetch: Option<usize>,
2653 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2654 expected: Vec<Vec<ArrayRef>>,
2655 ) -> Self {
2656 Self::new(
2657 helpers::default_sort_opts(descending),
2658 fetch,
2659 TimeUnit::Millisecond,
2660 input,
2661 expected,
2662 )
2663 }
2664
2665 async fn run_test(&self) -> Vec<DfRecordBatch> {
2666 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2667
2668 let mock_input = MockInputExec::new(vec![batches], self.schema.clone());
2669
2670 let exec = WindowedSortExec::try_new(
2671 self.expression.clone(),
2672 self.fetch,
2673 vec![ranges],
2674 Arc::new(mock_input),
2675 )
2676 .unwrap();
2677
2678 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2679
2680 let real_output = exec_stream.collect::<Vec<_>>().await;
2681 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2682 real_output
2683 }
2684 }
2685
2686 #[tokio::test]
2687 async fn test_window_sort_empty_and_minimal() {
2688 use helpers::*;
2689 let test_cases = [
2690 TestStream::new_simple(false, None, vec![], vec![]),
2692 TestStream::new_simple(
2694 false,
2695 None,
2696 vec![
2697 (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2698 (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2699 ],
2700 vec![vec![ts_array([2])]],
2701 ),
2702 TestStream::new_simple(
2704 false,
2705 None,
2706 vec![
2707 (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2708 (partition_range(1, 3, 1, 0), vec![ts_array([])]),
2709 ],
2710 vec![],
2711 ),
2712 TestStream::new_simple(
2714 false,
2715 None,
2716 vec![
2717 (partition_range(1, 2, 1, 0), vec![ts_array([1])]),
2718 (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2719 ],
2720 vec![vec![ts_array([1])], vec![ts_array([2])]],
2721 ),
2722 ];
2723
2724 for (idx, testcase) in test_cases.iter().enumerate() {
2725 let output = testcase.run_test().await;
2726 assert_eq!(output, testcase.output, "empty/minimal case {idx} failed");
2727 }
2728 }
2729
2730 #[tokio::test]
2731 async fn test_window_sort_overlapping() {
2732 use helpers::*;
2733 let test_cases = [
2734 TestStream::new_simple(
2736 false,
2737 None,
2738 vec![
2739 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2740 (partition_range(1, 4, 1, 0), vec![ts_array([2, 3])]),
2741 ],
2742 vec![
2743 vec![ts_array([1, 2])],
2744 vec![ts_array([2])],
2745 vec![ts_array([3])],
2746 ],
2747 ),
2748 TestStream::new_simple(
2750 false,
2751 None,
2752 vec![
2753 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2754 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2755 ],
2756 vec![vec![ts_array([1, 1, 2, 2])], vec![ts_array([3])]],
2757 ),
2758 TestStream::new_simple(
2760 false,
2761 None,
2762 vec![
2763 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2764 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2765 (partition_range(4, 6, 1, 1), vec![ts_array([4, 5])]),
2766 ],
2767 vec![
2768 vec![ts_array([1, 1, 2, 2])],
2769 vec![ts_array([3])],
2770 vec![ts_array([4, 5])],
2771 ],
2772 ),
2773 ];
2774
2775 for (idx, testcase) in test_cases.iter().enumerate() {
2776 let output = testcase.run_test().await;
2777 assert_eq!(output, testcase.output, "overlapping case {idx} failed");
2778 }
2779 }
2780
2781 #[tokio::test]
2782 async fn test_window_sort_with_fetch() {
2783 use helpers::*;
2784 let test_cases = [
2785 TestStream::new_simple(
2787 false,
2788 Some(6),
2789 vec![
2790 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2791 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2792 (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2793 ],
2794 vec![
2795 vec![ts_array([1, 1, 2, 2])],
2796 vec![ts_array([3])],
2797 vec![ts_array([4])],
2798 ],
2799 ),
2800 TestStream::new_simple(
2802 false,
2803 Some(3),
2804 vec![
2805 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2806 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2807 (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2808 ],
2809 vec![vec![ts_array([1, 1, 2])]],
2810 ),
2811 ];
2812
2813 for (idx, testcase) in test_cases.iter().enumerate() {
2814 let output = testcase.run_test().await;
2815 assert_eq!(output, testcase.output, "fetch case {idx} failed");
2816 }
2817 }
2818
2819 #[tokio::test]
2820 async fn test_window_sort_descending() {
2821 use helpers::*;
2822 let test_cases = [
2823 TestStream::new_simple(
2825 true,
2826 None,
2827 vec![
2828 (partition_range(3, 6, 1, 1), vec![ts_array([5, 4])]),
2829 (partition_range(1, 4, 1, 1), vec![ts_array([3, 2, 1])]),
2830 (partition_range(1, 3, 1, 0), vec![ts_array([2, 1])]),
2831 ],
2832 vec![
2833 vec![ts_array([5, 4])],
2834 vec![ts_array([3])],
2835 vec![ts_array([2, 2, 1, 1])],
2836 ],
2837 ),
2838 ];
2839
2840 for (idx, testcase) in test_cases.iter().enumerate() {
2841 let output = testcase.run_test().await;
2842 assert_eq!(output, testcase.output, "descending case {idx} failed");
2843 }
2844 }
2845
2846 #[tokio::test]
2847 async fn test_window_sort_complex() {
2848 use helpers::*;
2849 let test_cases = [
2850 TestStream::new_simple(
2852 false,
2853 None,
2854 vec![
2855 (partition_range(1, 10, 1, 0), vec![ts_array([1, 5, 9])]),
2856 (partition_range(3, 7, 1, 1), vec![ts_array([3, 4, 5, 6])]),
2857 ],
2858 vec![vec![ts_array([1])], vec![ts_array([3, 4, 5, 5, 6, 9])]],
2859 ),
2860 TestStream::new_simple(
2862 false,
2863 None,
2864 vec![
2865 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2866 (
2867 partition_range(1, 10, 1, 1),
2868 vec![ts_array([1, 3, 4, 5, 6, 8])],
2869 ),
2870 (partition_range(7, 10, 1, 1), vec![ts_array([7, 8, 9])]),
2871 ],
2872 vec![
2873 vec![ts_array([1, 1, 2])],
2874 vec![ts_array([3, 4, 5, 6])],
2875 vec![ts_array([7, 8, 8, 9])],
2876 ],
2877 ),
2878 TestStream::new_simple(
2880 false,
2881 None,
2882 vec![
2883 (
2884 partition_range(1, 11, 1, 0),
2885 vec![ts_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])],
2886 ),
2887 (partition_range(5, 7, 1, 1), vec![ts_array([5, 6])]),
2888 ],
2889 vec![
2890 vec![ts_array([1, 2, 3, 4])],
2891 vec![ts_array([5, 5, 6, 6, 7, 8, 9, 10])],
2892 ],
2893 ),
2894 ];
2895
2896 for (idx, testcase) in test_cases.iter().enumerate() {
2897 let output = testcase.run_test().await;
2898 assert_eq!(output, testcase.output, "complex case {idx} failed");
2899 }
2900 }
2901
2902 #[tokio::test]
2903 async fn fuzzy_ish_test_window_sort_stream() {
2904 let test_cnt = 100;
2905 let part_cnt_bound = 100;
2906 let range_size_bound = 100;
2907 let range_offset_bound = 100;
2908 let in_range_datapoint_cnt_bound = 100;
2909 let fetch_bound = 100;
2910
2911 let mut rng = fastrand::Rng::new();
2912 let rng_seed = rng.u64(..);
2913 rng.seed(rng_seed);
2914 let mut bound_val = None;
2915 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
2917 let mut full_testcase_list = Vec::new();
2918 for _case_id in 0..test_cnt {
2919 let descending = rng.bool();
2920 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
2921 if descending {
2922 return Box::new(|a: &T, b: &T| b.cmp(a));
2923 }
2924 Box::new(|a: &T, b: &T| a.cmp(b))
2925 }
2926 let unit = match rng.u8(0..3) {
2927 0 => TimeUnit::Second,
2928 1 => TimeUnit::Millisecond,
2929 2 => TimeUnit::Microsecond,
2930 _ => TimeUnit::Nanosecond,
2931 };
2932 let fetch = if rng.bool() {
2933 Some(rng.usize(0..fetch_bound))
2934 } else {
2935 None
2936 };
2937
2938 let mut input_ranged_data = vec![];
2939 let mut output_data: Vec<i64> = vec![];
2940 for part_id in 0..rng.usize(0..part_cnt_bound) {
2942 let (start, end) = if descending {
2943 let end = bound_val
2945 .map(|i| i - rng.i64(1..=range_offset_bound))
2946 .unwrap_or_else(|| rng.i64(..));
2947 bound_val = Some(end);
2948 let start = end - rng.i64(1..range_size_bound);
2949 let start = Timestamp::new(start, unit.into());
2950 let end = Timestamp::new(end, unit.into());
2951 (start, end)
2952 } else {
2953 let start = bound_val
2955 .map(|i| i + rng.i64(1..=range_offset_bound))
2956 .unwrap_or_else(|| rng.i64(..));
2957 bound_val = Some(start);
2958 let end = start + rng.i64(1..range_size_bound);
2959 let start = Timestamp::new(start, unit.into());
2960 let end = Timestamp::new(end, unit.into());
2961 (start, end)
2962 };
2963
2964 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
2965 let data_gen = iter
2966 .map(|_| rng.i64(start.value()..end.value()))
2967 .sorted_by(ret_cmp_fn(descending))
2968 .collect_vec();
2969 output_data.extend(data_gen.clone());
2970 let arr = new_ts_array(unit, data_gen);
2971 let range = PartitionRange {
2972 start,
2973 end,
2974 num_rows: arr.len(),
2975 identifier: part_id,
2976 };
2977 input_ranged_data.push((range, vec![arr]));
2978 }
2979
2980 output_data.sort_by(ret_cmp_fn(descending));
2981 if let Some(fetch) = fetch {
2982 output_data.truncate(fetch);
2983 }
2984 let output_arr = new_ts_array(unit, output_data);
2985
2986 let test_stream = TestStream::new(
2987 helpers::default_sort_opts(descending),
2988 fetch,
2989 unit,
2990 input_ranged_data.clone(),
2991 vec![vec![output_arr]],
2992 );
2993 full_testcase_list.push(test_stream);
2994 }
2995
2996 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
2997 let res = test_stream.run_test().await;
2998 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
2999 let expected = test_stream.output;
3000 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3001
3002 if res_concat != expected_concat {
3003 {
3004 let mut f_input = std::io::stderr();
3005 f_input.write_all(b"[").unwrap();
3006 for (input_range, input_arr) in test_stream.input {
3007 let range_json = json!({
3008 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3009 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3010 "num_rows": input_range.num_rows,
3011 "identifier": input_range.identifier,
3012 });
3013 let buf = Vec::new();
3014 let mut input_writer = ArrayWriter::new(buf);
3015 input_writer.write(&input_arr).unwrap();
3016 input_writer.finish().unwrap();
3017 let res_str =
3018 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3019 let whole_json =
3020 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3021 f_input.write_all(whole_json.as_bytes()).unwrap();
3022 }
3023 f_input.write_all(b"]").unwrap();
3024 }
3025 {
3026 let mut f_res = std::io::stderr();
3027 f_res.write_all(b"[").unwrap();
3028 for batch in &res {
3029 let mut res_writer = ArrayWriter::new(f_res);
3030 res_writer.write(batch).unwrap();
3031 res_writer.finish().unwrap();
3032 f_res = res_writer.into_inner();
3033 f_res.write_all(b",").unwrap();
3034 }
3035 f_res.write_all(b"]").unwrap();
3036
3037 let f_res_concat = std::io::stderr();
3038 let mut res_writer = ArrayWriter::new(f_res_concat);
3039 res_writer.write(&res_concat).unwrap();
3040 res_writer.finish().unwrap();
3041
3042 let f_expected = std::io::stderr();
3043 let mut expected_writer = ArrayWriter::new(f_expected);
3044 expected_writer.write(&expected_concat).unwrap();
3045 expected_writer.finish().unwrap();
3046 }
3047 panic!(
3048 "case failed, case id: {0}, output and expected output to stderr",
3049 case_id
3050 );
3051 }
3052 assert_eq!(
3053 res_concat, expected_concat,
3054 "case failed, case id: {}, rng seed: {}",
3055 case_id, rng_seed
3056 );
3057 }
3058 }
3059}