1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{DataFusionError, internal_err};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68 expression: PhysicalSortExpr,
70 fetch: Option<usize>,
72 ranges: Vec<Vec<PartitionRange>>,
76 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81 input: Arc<dyn ExecutionPlan>,
82 metrics: ExecutionPlanMetricsSet,
84 properties: PlanProperties,
85}
86
87pub fn check_partition_range_monotonicity(
91 ranges: &[Vec<PartitionRange>],
92 descending: bool,
93) -> Result<()> {
94 let is_valid = ranges.iter().all(|r| {
95 if descending {
96 r.windows(2)
98 .all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
99 } else {
100 r.windows(2).all(|w| {
102 w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
103 })
104 }
105 });
106
107 if !is_valid {
108 let msg = if descending {
109 "Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
110 } else {
111 "Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
112 };
113 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
114 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
115 } else {
116 Ok(())
117 }
118}
119
120impl WindowedSortExec {
121 pub fn try_new(
122 expression: PhysicalSortExpr,
123 fetch: Option<usize>,
124 ranges: Vec<Vec<PartitionRange>>,
125 input: Arc<dyn ExecutionPlan>,
126 ) -> Result<Self> {
127 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
128
129 let mut eq_properties = input.equivalence_properties().clone();
130 eq_properties.reorder(vec![expression.clone()])?;
131
132 let properties = input.properties();
133 let properties = PlanProperties::new(
134 eq_properties,
135 input.output_partitioning().clone(),
136 properties.emission_type,
137 properties.boundedness,
138 );
139
140 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
141 for r in &ranges {
142 let overlap_counts = split_overlapping_ranges(r);
143 let working_ranges =
144 compute_all_working_ranges(&overlap_counts, expression.options.descending);
145 all_avail_working_range.push(working_ranges);
146 }
147
148 Ok(Self {
149 expression,
150 fetch,
151 ranges,
152 all_avail_working_range,
153 input,
154 metrics: ExecutionPlanMetricsSet::new(),
155 properties,
156 })
157 }
158
159 pub fn to_stream(
163 &self,
164 context: Arc<TaskContext>,
165 partition: usize,
166 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
167 let input_stream: DfSendableRecordBatchStream =
168 self.input.execute(partition, context.clone())?;
169
170 let df_stream = Box::pin(WindowedSortStream::new(
171 context,
172 self,
173 input_stream,
174 partition,
175 )) as _;
176
177 Ok(df_stream)
178 }
179}
180
181impl DisplayAs for WindowedSortExec {
182 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 write!(
184 f,
185 "WindowedSortExec: expr={} num_ranges={}",
186 self.expression,
187 self.ranges.len()
188 )?;
189 if let Some(fetch) = self.fetch {
190 write!(f, " fetch={}", fetch)?;
191 }
192 Ok(())
193 }
194}
195
196impl ExecutionPlan for WindowedSortExec {
197 fn as_any(&self) -> &dyn Any {
198 self
199 }
200
201 fn schema(&self) -> SchemaRef {
202 self.input.schema()
203 }
204
205 fn properties(&self) -> &PlanProperties {
206 &self.properties
207 }
208
209 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210 vec![&self.input]
211 }
212
213 fn with_new_children(
214 self: Arc<Self>,
215 children: Vec<Arc<dyn ExecutionPlan>>,
216 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
217 let new_input = if let Some(first) = children.first() {
218 first
219 } else {
220 internal_err!("No children found")?
221 };
222 let new = Self::try_new(
223 self.expression.clone(),
224 self.fetch,
225 self.ranges.clone(),
226 new_input.clone(),
227 )?;
228 Ok(Arc::new(new))
229 }
230
231 fn execute(
232 &self,
233 partition: usize,
234 context: Arc<TaskContext>,
235 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
236 self.to_stream(context, partition)
237 }
238
239 fn metrics(&self) -> Option<MetricsSet> {
240 Some(self.metrics.clone_inner())
241 }
242
243 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
249 vec![false]
250 }
251
252 fn name(&self) -> &str {
253 "WindowedSortExec"
254 }
255}
256
257pub struct WindowedSortStream {
264 memory_pool: Arc<dyn MemoryPool>,
266 in_progress: Vec<DfRecordBatch>,
268 last_value: Option<Timestamp>,
270 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
272 merge_stream: VecDeque<DfSendableRecordBatchStream>,
274 merge_count: usize,
276 working_idx: usize,
278 input: DfSendableRecordBatchStream,
280 is_terminated: bool,
282 schema: SchemaRef,
284 expression: PhysicalSortExpr,
286 fetch: Option<usize>,
288 produced: usize,
290 batch_size: usize,
292 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
296 #[allow(dead_code)] ranges: Vec<PartitionRange>,
299 metrics: BaselineMetrics,
301}
302
303impl WindowedSortStream {
304 pub fn new(
305 context: Arc<TaskContext>,
306 exec: &WindowedSortExec,
307 input: DfSendableRecordBatchStream,
308 partition: usize,
309 ) -> Self {
310 Self {
311 memory_pool: context.runtime_env().memory_pool.clone(),
312 in_progress: Vec::new(),
313 last_value: None,
314 sorted_input_runs: Vec::new(),
315 merge_stream: VecDeque::new(),
316 merge_count: 0,
317 working_idx: 0,
318 schema: input.schema(),
319 input,
320 is_terminated: false,
321 expression: exec.expression.clone(),
322 fetch: exec.fetch,
323 produced: 0,
324 batch_size: context.session_config().batch_size(),
325 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
326 ranges: exec.ranges[partition].clone(),
327 metrics: BaselineMetrics::new(&exec.metrics, partition),
328 }
329 }
330}
331
332impl WindowedSortStream {
333 #[cfg(debug_assertions)]
334 fn check_subset_ranges(&self, cur_range: &TimeRange) {
335 let cur_is_subset_to = self
336 .ranges
337 .iter()
338 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
339 .collect_vec();
340 if cur_is_subset_to.is_empty() {
341 error!("Current range is not a subset of any PartitionRange");
342 let subset_ranges = self
344 .ranges
345 .iter()
346 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
347 .collect_vec();
348 let only_overlap = self
349 .ranges
350 .iter()
351 .filter(|r| {
352 let r = TimeRange::from(*r);
353 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
354 })
355 .collect_vec();
356 error!(
357 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
358 subset_ranges.len(),
359 only_overlap.len(),
360 subset_ranges,
361 only_overlap
362 );
363 } else {
364 let only_overlap = self
365 .ranges
366 .iter()
367 .filter(|r| {
368 let r = TimeRange::from(*r);
369 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
370 })
371 .collect_vec();
372 error!(
373 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
374 cur_is_subset_to.len(),
375 only_overlap.len(),
376 cur_is_subset_to,
377 only_overlap
378 );
379 }
380 let all_overlap_working_range = self
381 .all_avail_working_range
382 .iter()
383 .filter(|(range, _)| range.is_overlapping(cur_range))
384 .map(|(range, _)| range)
385 .collect_vec();
386 error!(
387 "Found {} working ranges that overlap with current range: {:?}",
388 all_overlap_working_range.len(),
389 all_overlap_working_range
390 );
391 }
392
393 fn poll_result_stream(
395 &mut self,
396 cx: &mut Context<'_>,
397 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
398 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
399 match merge_stream.as_mut().poll_next(cx) {
400 Poll::Ready(Some(Ok(batch))) => {
401 let ret = if let Some(remaining) = self.remaining_fetch() {
402 if remaining == 0 {
403 self.is_terminated = true;
404 None
405 } else if remaining < batch.num_rows() {
406 self.produced += remaining;
407 Some(Ok(batch.slice(0, remaining)))
408 } else {
409 self.produced += batch.num_rows();
410 Some(Ok(batch))
411 }
412 } else {
413 self.produced += batch.num_rows();
414 Some(Ok(batch))
415 };
416 return Poll::Ready(ret);
417 }
418 Poll::Ready(Some(Err(e))) => {
419 return Poll::Ready(Some(Err(e)));
420 }
421 Poll::Ready(None) => {
422 self.merge_stream.pop_front();
425 continue;
426 }
427 Poll::Pending => {
428 return Poll::Pending;
429 }
430 }
431 }
432 Poll::Ready(None)
434 }
435
436 pub fn poll_next_inner(
440 mut self: Pin<&mut Self>,
441 cx: &mut Context<'_>,
442 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
443 match self.poll_result_stream(cx) {
445 Poll::Ready(None) => {
446 if self.is_terminated {
447 return Poll::Ready(None);
448 }
449 }
450 x => return x,
451 };
452
453 while !self.is_terminated {
455 let SortedRunSet {
457 runs_with_batch,
458 sort_column,
459 } = match self.input.as_mut().poll_next(cx) {
460 Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
461 Poll::Ready(Some(Err(e))) => {
462 return Poll::Ready(Some(Err(e)));
463 }
464 Poll::Ready(None) => {
465 self.is_terminated = true;
467 self.build_sorted_stream()?;
468 self.start_new_merge_sort()?;
469 break;
470 }
471 Poll::Pending => return Poll::Pending,
472 };
473
474 let mut last_remaining = None;
480 let mut run_iter = runs_with_batch.into_iter();
481 loop {
482 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
483 break;
484 };
485 if sorted_rb.num_rows() == 0 {
486 continue;
487 }
488 let Some(cur_range) = run_info.get_time_range() else {
490 internal_err!("Found NULL in time index column")?
491 };
492 let Some(working_range) = self.get_working_range() else {
493 internal_err!("No working range found")?
494 };
495
496 if sort_column.options.unwrap_or_default().descending {
498 if cur_range.end > working_range.end {
499 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
500 #[cfg(debug_assertions)]
501 self.check_subset_ranges(&cur_range);
502 internal_err!(
503 "Current batch have data on the right side of working range, something is very wrong"
504 )?;
505 }
506 } else if cur_range.start < working_range.start {
507 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
508 #[cfg(debug_assertions)]
509 self.check_subset_ranges(&cur_range);
510 internal_err!(
511 "Current batch have data on the left side of working range, something is very wrong"
512 )?;
513 }
514
515 if cur_range.is_subset(&working_range) {
516 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
519 } else if let Some(intersection) = cur_range.intersection(&working_range) {
520 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
522 let (offset, len) = find_slice_from_range(
523 &SortColumn {
524 values: cur_sort_column.clone(),
525 options: sort_column.options,
526 },
527 &intersection,
528 )?;
529
530 if offset != 0 {
531 internal_err!(
532 "Current batch have data on the left side of working range, something is very wrong"
533 )?;
534 }
535
536 let sliced_rb = sorted_rb.slice(offset, len);
537
538 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
540 self.build_sorted_stream()?;
542
543 self.start_new_merge_sort()?;
545
546 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
547 if r_len != 0 {
548 let remaining_rb = sorted_rb.slice(r_offset, r_len);
550 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
551 let new_run_info = SucRun {
552 offset: run_info.offset + r_offset,
553 len: r_len,
554 first_val: new_first_val,
555 last_val: run_info.last_val,
556 };
557 last_remaining = Some((remaining_rb, new_run_info));
558 }
559 } else {
565 self.build_sorted_stream()?;
568 self.start_new_merge_sort()?;
569
570 last_remaining = Some((sorted_rb, run_info));
572 }
573 }
574
575 match self.poll_result_stream(cx) {
577 Poll::Ready(None) => {
578 if self.is_terminated {
579 return Poll::Ready(None);
580 }
581 }
582 x => return x,
583 };
584 }
585 self.poll_result_stream(cx)
587 }
588
589 fn push_batch(&mut self, batch: DfRecordBatch) {
590 self.in_progress.push(batch);
591 }
592
593 fn try_concat_batch(
597 &mut self,
598 batch: DfRecordBatch,
599 run_info: &SucRun<Timestamp>,
600 opt: Option<SortOptions>,
601 ) -> datafusion_common::Result<()> {
602 let is_ok_to_concat =
603 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
604
605 if is_ok_to_concat {
606 self.push_batch(batch);
607 } else {
609 self.build_sorted_stream()?;
611 self.push_batch(batch);
612 }
613 self.last_value = run_info.last_val;
614 Ok(())
615 }
616
617 fn get_working_range(&self) -> Option<TimeRange> {
619 self.all_avail_working_range
620 .get(self.working_idx)
621 .map(|(range, _)| *range)
622 }
623
624 fn set_next_working_range(&mut self) {
626 self.working_idx += 1;
627 }
628
629 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
631 if self.in_progress.is_empty() {
632 return Ok(());
633 }
634 let data = std::mem::take(&mut self.in_progress);
635
636 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
637 self.sorted_input_runs.push(Box::pin(new_stream));
638 Ok(())
639 }
640
641 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
643 if !self.in_progress.is_empty() {
644 return internal_err!("Starting a merge sort when in_progress is not empty")?;
645 }
646
647 self.set_next_working_range();
648
649 let streams = std::mem::take(&mut self.sorted_input_runs);
650 if streams.is_empty() {
651 return Ok(());
652 } else if streams.len() == 1 {
653 self.merge_stream
654 .push_back(streams.into_iter().next().unwrap());
655 return Ok(());
656 }
657
658 let fetch = self.remaining_fetch();
659 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
660 .register(&self.memory_pool);
661 self.merge_count += 1;
662
663 let resulting_stream = StreamingMergeBuilder::new()
664 .with_streams(streams)
665 .with_schema(self.schema())
666 .with_expressions(&[self.expression.clone()].into())
667 .with_metrics(self.metrics.clone())
668 .with_batch_size(self.batch_size)
669 .with_fetch(fetch)
670 .with_reservation(reservation)
671 .build()?;
672 self.merge_stream.push_back(resulting_stream);
673 Ok(())
675 }
676
677 fn remaining_fetch(&self) -> Option<usize> {
680 let total_now = self.produced;
681 self.fetch.map(|p| p.saturating_sub(total_now))
682 }
683}
684
685impl Stream for WindowedSortStream {
686 type Item = datafusion_common::Result<DfRecordBatch>;
687
688 fn poll_next(
689 mut self: Pin<&mut Self>,
690 cx: &mut Context<'_>,
691 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
692 let result = self.as_mut().poll_next_inner(cx);
693 self.metrics.record_poll(result)
694 }
695}
696
697impl RecordBatchStream for WindowedSortStream {
698 fn schema(&self) -> SchemaRef {
699 self.schema.clone()
700 }
701}
702
703fn split_batch_to_sorted_run(
705 batch: DfRecordBatch,
706 expression: &PhysicalSortExpr,
707) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
708 let sort_column = expression.evaluate_to_sort_column(&batch)?;
710 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
711 if let Some(run) = sorted_runs_offset.first()
712 && sorted_runs_offset.len() == 1
713 {
714 if !(run.offset == 0 && run.len == batch.num_rows()) {
715 internal_err!(
716 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
717 run.offset,
718 run.len,
719 batch.num_rows()
720 )?;
721 }
722 Ok(SortedRunSet {
724 runs_with_batch: vec![(batch, run.clone())],
725 sort_column,
726 })
727 } else {
728 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
730 for run in sorted_runs_offset {
731 if run.offset + run.len > batch.num_rows() {
732 internal_err!(
733 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
734 run.offset,
735 run.len,
736 batch.num_rows()
737 )?;
738 }
739 let new_rb = batch.slice(run.offset, run.len);
740 ret.push((new_rb, run));
741 }
742 Ok(SortedRunSet {
743 runs_with_batch: ret,
744 sort_column,
745 })
746 }
747}
748
749#[macro_export]
753macro_rules! downcast_ts_array {
754 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
755 {
756 match $data_type {
757 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
758 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
759 }
760 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
761 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
762 }
763 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
764 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
765 }
766 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
767 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
768 }
769 $($p => $fallback,)*
770 }
771 };
772}
773
774fn find_slice_from_range(
778 sort_column: &SortColumn,
779 range: &TimeRange,
780) -> datafusion_common::Result<(usize, usize)> {
781 let ty = sort_column.values.data_type();
782 let time_unit = if let DataType::Timestamp(unit, _) = ty {
783 unit
784 } else {
785 return Err(DataFusionError::Internal(format!(
786 "Unsupported sort column type: {}",
787 sort_column.values.data_type()
788 )));
789 };
790 let array = &sort_column.values;
791 let opt = &sort_column.options.unwrap_or_default();
792 let descending = opt.descending;
793
794 let typed_sorted_range = [range.start, range.end]
795 .iter()
796 .map(|t| {
797 t.convert_to(time_unit.into())
798 .ok_or_else(|| {
799 DataFusionError::Internal(format!(
800 "Failed to convert timestamp from {:?} to {:?}",
801 t.unit(),
802 time_unit
803 ))
804 })
805 .and_then(|typed_ts| {
806 let value = Value::Timestamp(typed_ts);
807 value
808 .try_to_scalar_value(&value.data_type())
809 .map_err(|e| DataFusionError::External(Box::new(e) as _))
810 })
811 })
812 .try_collect::<_, Vec<_>, _>()?;
813
814 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
815
816 let (start, end) = if descending {
818 let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
822 let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
825 (start, end)
826 } else {
827 let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
830 let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
833 (start, end)
834 };
835
836 Ok((start, end - start))
837}
838
839#[macro_export]
843macro_rules! array_iter_helper {
844 ($t:ty, $unit:expr, $arr:expr) => {{
845 let typed = $arr
846 .as_any()
847 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
848 .unwrap();
849 let iter = typed.iter().enumerate();
850 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
851 }};
852}
853
854fn cmp_with_opts<T: Ord>(
858 a: &Option<T>,
859 b: &Option<T>,
860 opt: &Option<SortOptions>,
861) -> std::cmp::Ordering {
862 let opt = opt.unwrap_or_default();
863
864 if let (Some(a), Some(b)) = (a, b) {
865 if opt.descending { b.cmp(a) } else { a.cmp(b) }
866 } else if opt.nulls_first {
867 a.cmp(b)
870 } else {
871 match (a, b) {
872 (Some(a), Some(b)) => a.cmp(b),
873 (Some(_), None) => std::cmp::Ordering::Less,
874 (None, Some(_)) => std::cmp::Ordering::Greater,
875 (None, None) => std::cmp::Ordering::Equal,
876 }
877 }
878}
879
880#[derive(Debug, Clone)]
881struct SortedRunSet<N: Ord> {
882 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
884 sort_column: SortColumn,
886}
887
888#[derive(Debug, Clone, PartialEq)]
890struct SucRun<N: Ord> {
891 offset: usize,
893 len: usize,
895 first_val: Option<N>,
897 last_val: Option<N>,
899}
900
901impl SucRun<Timestamp> {
902 fn get_time_range(&self) -> Option<TimeRange> {
904 let start = self.first_val.min(self.last_val);
905 let end = self
906 .first_val
907 .max(self.last_val)
908 .map(|i| Timestamp::new(i.value() + 1, i.unit()));
909 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
910 }
911}
912
913fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
915 iter: T,
916 sort_opts: &Option<SortOptions>,
917) -> Vec<SucRun<N>> {
918 let mut runs = Vec::new();
919 let mut last_value = None;
920 let mut iter_len = None;
921
922 let mut last_offset = 0;
923 let mut first_val: Option<N> = None;
924 let mut last_val: Option<N> = None;
925
926 for (idx, t) in iter {
927 if let Some(last_value) = &last_value
928 && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
929 {
930 let len = idx - last_offset;
932 let run = SucRun {
933 offset: last_offset,
934 len,
935 first_val,
936 last_val,
937 };
938 runs.push(run);
939 first_val = None;
940 last_val = None;
941
942 last_offset = idx;
943 }
944 last_value = Some(t);
945 if let Some(t) = t {
946 first_val = first_val.or(Some(t));
947 last_val = Some(t).or(last_val);
948 }
949 iter_len = Some(idx);
950 }
951 let run = SucRun {
952 offset: last_offset,
953 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
954 first_val,
955 last_val,
956 };
957 runs.push(run);
958
959 runs
960}
961
962fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
966 let ty = sort_column.values.data_type();
967 if let DataType::Timestamp(unit, _) = ty {
968 let array = &sort_column.values;
969 let iter = downcast_ts_array!(
970 array.data_type() => (array_iter_helper, array),
971 _ => internal_err!("Unsupported sort column type: {ty}")?
972 );
973
974 let raw = find_successive_runs(iter, &sort_column.options);
975 let ts_runs = raw
976 .into_iter()
977 .map(|run| SucRun {
978 offset: run.offset,
979 len: run.len,
980 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
981 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
982 })
983 .collect_vec();
984 Ok(ts_runs)
985 } else {
986 Err(DataFusionError::Internal(format!(
987 "Unsupported sort column type: {ty}"
988 )))
989 }
990}
991
992#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
996struct TimeRange {
997 start: Timestamp,
998 end: Timestamp,
999}
1000
1001impl From<&PartitionRange> for TimeRange {
1002 fn from(range: &PartitionRange) -> Self {
1003 Self::new(range.start, range.end)
1004 }
1005}
1006
1007impl From<(Timestamp, Timestamp)> for TimeRange {
1008 fn from(range: (Timestamp, Timestamp)) -> Self {
1009 Self::new(range.0, range.1)
1010 }
1011}
1012
1013impl From<&(Timestamp, Timestamp)> for TimeRange {
1014 fn from(range: &(Timestamp, Timestamp)) -> Self {
1015 Self::new(range.0, range.1)
1016 }
1017}
1018
1019impl TimeRange {
1020 fn new(start: Timestamp, end: Timestamp) -> Self {
1022 if start > end {
1023 Self {
1024 start: end,
1025 end: start,
1026 }
1027 } else {
1028 Self { start, end }
1029 }
1030 }
1031
1032 fn is_subset(&self, other: &Self) -> bool {
1033 self.start >= other.start && self.end <= other.end
1034 }
1035
1036 fn is_overlapping(&self, other: &Self) -> bool {
1038 !(self.start >= other.end || self.end <= other.start)
1039 }
1040
1041 fn intersection(&self, other: &Self) -> Option<Self> {
1042 if self.is_overlapping(other) {
1043 Some(Self::new(
1044 self.start.max(other.start),
1045 self.end.min(other.end),
1046 ))
1047 } else {
1048 None
1049 }
1050 }
1051
1052 fn difference(&self, other: &Self) -> Vec<Self> {
1053 if !self.is_overlapping(other) {
1054 vec![*self]
1055 } else {
1056 let mut ret = Vec::new();
1057 if self.start < other.start && self.end > other.end {
1058 ret.push(Self::new(self.start, other.start));
1059 ret.push(Self::new(other.end, self.end));
1060 } else if self.start < other.start {
1061 ret.push(Self::new(self.start, other.start));
1062 } else if self.end > other.end {
1063 ret.push(Self::new(other.end, self.end));
1064 }
1065 ret
1066 }
1067 }
1068}
1069
1070fn split_range_by(
1072 input_range: &TimeRange,
1073 input_parts: &[usize],
1074 split_by: &TimeRange,
1075 split_idx: usize,
1076) -> Vec<Action> {
1077 let mut ret = Vec::new();
1078 if input_range.is_overlapping(split_by) {
1079 let input_parts = input_parts.to_vec();
1080 let new_parts = {
1081 let mut new_parts = input_parts.clone();
1082 new_parts.push(split_idx);
1083 new_parts
1084 };
1085
1086 ret.push(Action::Pop(*input_range));
1087 if let Some(intersection) = input_range.intersection(split_by) {
1088 ret.push(Action::Push(intersection, new_parts.clone()));
1089 }
1090 for diff in input_range.difference(split_by) {
1091 ret.push(Action::Push(diff, input_parts.clone()));
1092 }
1093 }
1094 ret
1095}
1096
1097#[derive(Debug, Clone, PartialEq, Eq)]
1098enum Action {
1099 Pop(TimeRange),
1100 Push(TimeRange, Vec<usize>),
1101}
1102
1103fn compute_all_working_ranges(
1111 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1112 descending: bool,
1113) -> Vec<(TimeRange, BTreeSet<usize>)> {
1114 let mut ret = Vec::new();
1115 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1116 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1117 Box::new(overlap_counts.iter().rev()) as _
1118 } else {
1119 Box::new(overlap_counts.iter()) as _
1120 };
1121 for (range, set) in overlap_iter {
1122 match &mut cur_range_set {
1123 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1124 Some((working_range, working_set)) => {
1125 let need_expand = {
1130 let last_part = working_set.last();
1131 let inter: BTreeSet<usize> = working_set
1132 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1133 .cloned()
1134 .collect();
1135 if let Some(one) = inter.first()
1136 && inter.len() == 1
1137 && Some(one) == last_part
1138 {
1139 if set.iter().all(|p| Some(p) >= last_part) {
1141 false
1143 } else {
1144 true
1146 }
1147 } else if inter.is_empty() {
1148 false
1150 } else {
1151 true
1153 }
1154 };
1155
1156 if need_expand {
1157 if descending {
1158 working_range.start = range.start;
1159 } else {
1160 working_range.end = range.end;
1161 }
1162 working_set.extend(set.iter().cloned());
1163 } else {
1164 ret.push((*working_range, std::mem::take(working_set)));
1165 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1166 }
1167 }
1168 }
1169 }
1170
1171 if let Some(cur_range_set) = cur_range_set {
1172 ret.push(cur_range_set)
1173 }
1174
1175 ret
1176}
1177
1178fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1181 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1183 for (idx, range) in ranges.iter().enumerate() {
1184 let key: TimeRange = (range.start, range.end).into();
1185 let mut actions = Vec::new();
1186 let mut untouched = vec![key];
1187 let forward_iter = ret
1191 .range(key..)
1192 .take_while(|(range, _)| range.is_overlapping(&key));
1193 let backward_iter = ret
1194 .range(..key)
1195 .rev()
1196 .take_while(|(range, _)| range.is_overlapping(&key));
1197
1198 for (range, parts) in forward_iter.chain(backward_iter) {
1199 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1200 let act = split_range_by(range, parts, &key, idx);
1201 actions.extend(act.into_iter());
1202 }
1203
1204 for action in actions {
1205 match action {
1206 Action::Pop(range) => {
1207 ret.remove(&range);
1208 }
1209 Action::Push(range, parts) => {
1210 ret.insert(range, parts);
1211 }
1212 }
1213 }
1214
1215 for range in untouched {
1217 ret.insert(range, vec![idx]);
1218 }
1219 }
1220 ret
1221}
1222
1223fn get_timestamp_from_idx(
1225 array: &ArrayRef,
1226 offset: usize,
1227) -> datafusion_common::Result<Option<Timestamp>> {
1228 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1229 unit
1230 } else {
1231 return Err(DataFusionError::Internal(format!(
1232 "Unsupported sort column type: {}",
1233 array.data_type()
1234 )));
1235 };
1236 let ty = array.data_type();
1237 let array = array.slice(offset, 1);
1238 let mut iter = downcast_ts_array!(
1239 array.data_type() => (array_iter_helper, array),
1240 _ => internal_err!("Unsupported sort column type: {ty}")?
1241 );
1242 let (_idx, val) = iter.next().ok_or_else(|| {
1243 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1244 })?;
1245 let val = if let Some(val) = val {
1246 val
1247 } else {
1248 return Ok(None);
1249 };
1250 let gt_timestamp = Timestamp::new(val, time_unit.into());
1251 Ok(Some(gt_timestamp))
1252}
1253
1254#[cfg(test)]
1255mod test {
1256 use std::io::Write;
1257 use std::sync::Arc;
1258
1259 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1260 use arrow::compute::concat_batches;
1261 use arrow::json::ArrayWriter;
1262 use arrow_schema::{Field, Schema, TimeUnit};
1263 use futures::StreamExt;
1264 use pretty_assertions::assert_eq;
1265 use serde_json::json;
1266
1267 use super::*;
1268 use crate::test_util::{MockInputExec, new_ts_array};
1269
1270 mod helpers {
1272 use datafusion::physical_plan::expressions::Column;
1273
1274 use super::*;
1275
1276 pub fn default_sort_opts(descending: bool) -> SortOptions {
1277 SortOptions {
1278 descending,
1279 nulls_first: true,
1280 }
1281 }
1282
1283 pub fn ts_field(unit: TimeUnit) -> Field {
1284 Field::new("ts", DataType::Timestamp(unit, None), false)
1285 }
1286
1287 pub fn ts_column() -> Column {
1288 Column::new("ts", 0)
1289 }
1290
1291 pub fn partition_range(start: i64, end: i64, num_rows: usize, id: usize) -> PartitionRange {
1292 PartitionRange {
1293 start: Timestamp::new_millisecond(start),
1294 end: Timestamp::new_millisecond(end),
1295 num_rows,
1296 identifier: id,
1297 }
1298 }
1299
1300 pub fn ts_array(values: impl IntoIterator<Item = i64>) -> ArrayRef {
1301 Arc::new(TimestampMillisecondArray::from_iter_values(values))
1302 }
1303 }
1304
1305 #[test]
1306 fn test_overlapping() {
1307 let testcases = [
1308 (
1309 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1310 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1311 false,
1312 ),
1313 (
1314 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1315 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1316 true,
1317 ),
1318 (
1319 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1320 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1321 true,
1322 ),
1323 (
1324 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1325 (
1326 Timestamp::new_millisecond(1000),
1327 Timestamp::new_millisecond(1002),
1328 ),
1329 true,
1330 ),
1331 (
1332 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1333 (
1334 Timestamp::new_millisecond(1001),
1335 Timestamp::new_millisecond(1002),
1336 ),
1337 false,
1338 ),
1339 (
1340 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1341 (
1342 Timestamp::new_millisecond(1002),
1343 Timestamp::new_millisecond(1003),
1344 ),
1345 false,
1346 ),
1347 ];
1348
1349 for (range1, range2, expected) in testcases.iter() {
1350 assert_eq!(
1351 TimeRange::from(range1).is_overlapping(&range2.into()),
1352 *expected,
1353 "range1: {:?}, range2: {:?}",
1354 range1,
1355 range2
1356 );
1357 }
1358 }
1359
1360 #[test]
1361 fn test_split() {
1362 let testcases = [
1363 (
1365 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1366 vec![0],
1367 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1368 1,
1369 vec![],
1370 ),
1371 (
1373 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1374 vec![0],
1375 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1376 1,
1377 vec![
1378 Action::Pop(
1379 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1380 ),
1381 Action::Push(
1382 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1383 vec![0, 1],
1384 ),
1385 ],
1386 ),
1387 (
1388 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1389 vec![0],
1390 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1391 1,
1392 vec![
1393 Action::Pop(
1394 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1395 ),
1396 Action::Push(
1397 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1398 vec![0, 1],
1399 ),
1400 ],
1401 ),
1402 (
1403 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1404 vec![0],
1405 (
1406 Timestamp::new_millisecond(1000),
1407 Timestamp::new_millisecond(1002),
1408 ),
1409 1,
1410 vec![
1411 Action::Pop(
1412 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1413 ),
1414 Action::Push(
1415 (
1416 Timestamp::new_millisecond(1000),
1417 Timestamp::new_millisecond(1001),
1418 )
1419 .into(),
1420 vec![0, 1],
1421 ),
1422 ],
1423 ),
1424 (
1426 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1427 vec![0],
1428 (
1429 Timestamp::new_millisecond(1001),
1430 Timestamp::new_millisecond(1002),
1431 ),
1432 1,
1433 vec![
1434 Action::Pop(
1435 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1436 ),
1437 Action::Push(
1438 (
1439 Timestamp::new_millisecond(1001),
1440 Timestamp::new_millisecond(1002),
1441 )
1442 .into(),
1443 vec![0, 1],
1444 ),
1445 Action::Push(
1446 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1447 vec![0],
1448 ),
1449 ],
1450 ),
1451 (
1453 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1454 vec![0],
1455 (
1456 Timestamp::new_millisecond(1001),
1457 Timestamp::new_millisecond(1002),
1458 ),
1459 1,
1460 vec![
1461 Action::Pop(
1462 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1463 ),
1464 Action::Push(
1465 (
1466 Timestamp::new_millisecond(1001),
1467 Timestamp::new_millisecond(1002),
1468 )
1469 .into(),
1470 vec![0, 1],
1471 ),
1472 Action::Push(
1473 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1474 vec![0],
1475 ),
1476 Action::Push(
1477 (
1478 Timestamp::new_millisecond(1002),
1479 Timestamp::new_millisecond(1004),
1480 )
1481 .into(),
1482 vec![0],
1483 ),
1484 ],
1485 ),
1486 ];
1487
1488 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1489 assert_eq!(
1490 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1491 *expected,
1492 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1493 range,
1494 parts,
1495 split_by,
1496 split_idx
1497 );
1498 }
1499 }
1500
1501 #[allow(clippy::type_complexity)]
1502 fn run_compute_working_ranges_test(
1503 testcases: Vec<(
1504 BTreeMap<(Timestamp, Timestamp), Vec<usize>>,
1505 Vec<((Timestamp, Timestamp), BTreeSet<usize>)>,
1506 )>,
1507 descending: bool,
1508 ) {
1509 for (input, expected) in testcases {
1510 let expected = expected
1511 .into_iter()
1512 .map(|(r, s)| (r.into(), s))
1513 .collect_vec();
1514 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1515 assert_eq!(
1516 compute_all_working_ranges(&input, descending),
1517 expected,
1518 "input: {:?}, descending: {}",
1519 input,
1520 descending
1521 );
1522 }
1523 }
1524
1525 #[test]
1526 fn test_compute_working_ranges_descending() {
1527 let testcases = vec![
1528 (
1529 BTreeMap::from([(
1530 (Timestamp::new_second(1), Timestamp::new_second(2)),
1531 vec![0],
1532 )]),
1533 vec![(
1534 (Timestamp::new_second(1), Timestamp::new_second(2)),
1535 BTreeSet::from([0]),
1536 )],
1537 ),
1538 (
1539 BTreeMap::from([(
1540 (Timestamp::new_second(1), Timestamp::new_second(2)),
1541 vec![0, 1],
1542 )]),
1543 vec![(
1544 (Timestamp::new_second(1), Timestamp::new_second(2)),
1545 BTreeSet::from([0, 1]),
1546 )],
1547 ),
1548 (
1549 BTreeMap::from([
1550 (
1551 (Timestamp::new_second(2), Timestamp::new_second(3)),
1552 vec![0],
1553 ),
1554 (
1555 (Timestamp::new_second(1), Timestamp::new_second(2)),
1556 vec![0, 1],
1557 ),
1558 ]),
1559 vec![
1560 (
1561 (Timestamp::new_second(2), Timestamp::new_second(3)),
1562 BTreeSet::from([0]),
1563 ),
1564 (
1565 (Timestamp::new_second(1), Timestamp::new_second(2)),
1566 BTreeSet::from([0, 1]),
1567 ),
1568 ],
1569 ),
1570 (
1571 BTreeMap::from([
1572 (
1573 (Timestamp::new_second(2), Timestamp::new_second(3)),
1574 vec![0, 1],
1575 ),
1576 (
1577 (Timestamp::new_second(1), Timestamp::new_second(2)),
1578 vec![1],
1579 ),
1580 ]),
1581 vec![
1582 (
1583 (Timestamp::new_second(2), Timestamp::new_second(3)),
1584 BTreeSet::from([0, 1]),
1585 ),
1586 (
1587 (Timestamp::new_second(1), Timestamp::new_second(2)),
1588 BTreeSet::from([1]),
1589 ),
1590 ],
1591 ),
1592 (
1593 BTreeMap::from([
1594 (
1595 (Timestamp::new_second(3), Timestamp::new_second(4)),
1596 vec![0],
1597 ),
1598 (
1599 (Timestamp::new_second(2), Timestamp::new_second(3)),
1600 vec![0, 1],
1601 ),
1602 (
1603 (Timestamp::new_second(1), Timestamp::new_second(2)),
1604 vec![1],
1605 ),
1606 ]),
1607 vec![
1608 (
1609 (Timestamp::new_second(3), Timestamp::new_second(4)),
1610 BTreeSet::from([0]),
1611 ),
1612 (
1613 (Timestamp::new_second(2), Timestamp::new_second(3)),
1614 BTreeSet::from([0, 1]),
1615 ),
1616 (
1617 (Timestamp::new_second(1), Timestamp::new_second(2)),
1618 BTreeSet::from([1]),
1619 ),
1620 ],
1621 ),
1622 (
1623 BTreeMap::from([
1624 (
1625 (Timestamp::new_second(3), Timestamp::new_second(4)),
1626 vec![0, 2],
1627 ),
1628 (
1629 (Timestamp::new_second(2), Timestamp::new_second(3)),
1630 vec![0, 1, 2],
1631 ),
1632 (
1633 (Timestamp::new_second(1), Timestamp::new_second(2)),
1634 vec![1, 2],
1635 ),
1636 ]),
1637 vec![(
1638 (Timestamp::new_second(1), Timestamp::new_second(4)),
1639 BTreeSet::from([0, 1, 2]),
1640 )],
1641 ),
1642 (
1643 BTreeMap::from([
1644 (
1645 (Timestamp::new_second(2), Timestamp::new_second(3)),
1646 vec![0, 2],
1647 ),
1648 (
1649 (Timestamp::new_second(1), Timestamp::new_second(2)),
1650 vec![1, 2],
1651 ),
1652 ]),
1653 vec![(
1654 (Timestamp::new_second(1), Timestamp::new_second(3)),
1655 BTreeSet::from([0, 1, 2]),
1656 )],
1657 ),
1658 (
1659 BTreeMap::from([
1660 (
1661 (Timestamp::new_second(3), Timestamp::new_second(4)),
1662 vec![0, 1],
1663 ),
1664 (
1665 (Timestamp::new_second(2), Timestamp::new_second(3)),
1666 vec![0, 1, 2],
1667 ),
1668 (
1669 (Timestamp::new_second(1), Timestamp::new_second(2)),
1670 vec![1, 2],
1671 ),
1672 ]),
1673 vec![(
1674 (Timestamp::new_second(1), Timestamp::new_second(4)),
1675 BTreeSet::from([0, 1, 2]),
1676 )],
1677 ),
1678 (
1679 BTreeMap::from([
1680 (
1681 (Timestamp::new_second(2), Timestamp::new_second(3)),
1682 vec![0, 1],
1683 ),
1684 (
1685 (Timestamp::new_second(1), Timestamp::new_second(2)),
1686 vec![1, 2],
1687 ),
1688 ]),
1689 vec![
1690 (
1691 (Timestamp::new_second(2), Timestamp::new_second(3)),
1692 BTreeSet::from([0, 1]),
1693 ),
1694 (
1695 (Timestamp::new_second(1), Timestamp::new_second(2)),
1696 BTreeSet::from([1, 2]),
1697 ),
1698 ],
1699 ),
1700 (
1702 BTreeMap::from([
1703 (
1704 (Timestamp::new_second(2), Timestamp::new_second(3)),
1705 vec![0],
1706 ),
1707 (
1708 (Timestamp::new_second(1), Timestamp::new_second(2)),
1709 vec![1, 2],
1710 ),
1711 ]),
1712 vec![
1713 (
1714 (Timestamp::new_second(2), Timestamp::new_second(3)),
1715 BTreeSet::from([0]),
1716 ),
1717 (
1718 (Timestamp::new_second(1), Timestamp::new_second(2)),
1719 BTreeSet::from([1, 2]),
1720 ),
1721 ],
1722 ),
1723 ];
1724
1725 run_compute_working_ranges_test(testcases, true);
1726 }
1727
1728 #[test]
1729 fn test_compute_working_ranges_ascending() {
1730 let testcases = vec![
1731 (
1732 BTreeMap::from([(
1733 (Timestamp::new_second(1), Timestamp::new_second(2)),
1734 vec![0],
1735 )]),
1736 vec![(
1737 (Timestamp::new_second(1), Timestamp::new_second(2)),
1738 BTreeSet::from([0]),
1739 )],
1740 ),
1741 (
1742 BTreeMap::from([(
1743 (Timestamp::new_second(1), Timestamp::new_second(2)),
1744 vec![0, 1],
1745 )]),
1746 vec![(
1747 (Timestamp::new_second(1), Timestamp::new_second(2)),
1748 BTreeSet::from([0, 1]),
1749 )],
1750 ),
1751 (
1752 BTreeMap::from([
1753 (
1754 (Timestamp::new_second(1), Timestamp::new_second(2)),
1755 vec![0, 1],
1756 ),
1757 (
1758 (Timestamp::new_second(2), Timestamp::new_second(3)),
1759 vec![1],
1760 ),
1761 ]),
1762 vec![
1763 (
1764 (Timestamp::new_second(1), Timestamp::new_second(2)),
1765 BTreeSet::from([0, 1]),
1766 ),
1767 (
1768 (Timestamp::new_second(2), Timestamp::new_second(3)),
1769 BTreeSet::from([1]),
1770 ),
1771 ],
1772 ),
1773 (
1774 BTreeMap::from([
1775 (
1776 (Timestamp::new_second(1), Timestamp::new_second(2)),
1777 vec![0],
1778 ),
1779 (
1780 (Timestamp::new_second(2), Timestamp::new_second(3)),
1781 vec![0, 1],
1782 ),
1783 ]),
1784 vec![
1785 (
1786 (Timestamp::new_second(1), Timestamp::new_second(2)),
1787 BTreeSet::from([0]),
1788 ),
1789 (
1790 (Timestamp::new_second(2), Timestamp::new_second(3)),
1791 BTreeSet::from([0, 1]),
1792 ),
1793 ],
1794 ),
1795 (
1797 BTreeMap::from([
1798 (
1799 (Timestamp::new_second(1), Timestamp::new_second(2)),
1800 vec![0],
1801 ),
1802 (
1803 (Timestamp::new_second(2), Timestamp::new_second(3)),
1804 vec![0, 1],
1805 ),
1806 (
1807 (Timestamp::new_second(3), Timestamp::new_second(4)),
1808 vec![1],
1809 ),
1810 ]),
1811 vec![
1812 (
1813 (Timestamp::new_second(1), Timestamp::new_second(2)),
1814 BTreeSet::from([0]),
1815 ),
1816 (
1817 (Timestamp::new_second(2), Timestamp::new_second(3)),
1818 BTreeSet::from([0, 1]),
1819 ),
1820 (
1821 (Timestamp::new_second(3), Timestamp::new_second(4)),
1822 BTreeSet::from([1]),
1823 ),
1824 ],
1825 ),
1826 (
1827 BTreeMap::from([
1828 (
1829 (Timestamp::new_second(1), Timestamp::new_second(2)),
1830 vec![0, 2],
1831 ),
1832 (
1833 (Timestamp::new_second(2), Timestamp::new_second(3)),
1834 vec![0, 1, 2],
1835 ),
1836 (
1837 (Timestamp::new_second(3), Timestamp::new_second(4)),
1838 vec![1, 2],
1839 ),
1840 ]),
1841 vec![(
1842 (Timestamp::new_second(1), Timestamp::new_second(4)),
1843 BTreeSet::from([0, 1, 2]),
1844 )],
1845 ),
1846 (
1847 BTreeMap::from([
1848 (
1849 (Timestamp::new_second(1), Timestamp::new_second(2)),
1850 vec![0, 2],
1851 ),
1852 (
1853 (Timestamp::new_second(2), Timestamp::new_second(3)),
1854 vec![1, 2],
1855 ),
1856 ]),
1857 vec![(
1858 (Timestamp::new_second(1), Timestamp::new_second(3)),
1859 BTreeSet::from([0, 1, 2]),
1860 )],
1861 ),
1862 (
1863 BTreeMap::from([
1864 (
1865 (Timestamp::new_second(1), Timestamp::new_second(2)),
1866 vec![0, 1],
1867 ),
1868 (
1869 (Timestamp::new_second(2), Timestamp::new_second(3)),
1870 vec![0, 1, 2],
1871 ),
1872 (
1873 (Timestamp::new_second(3), Timestamp::new_second(4)),
1874 vec![1, 2],
1875 ),
1876 ]),
1877 vec![(
1878 (Timestamp::new_second(1), Timestamp::new_second(4)),
1879 BTreeSet::from([0, 1, 2]),
1880 )],
1881 ),
1882 (
1883 BTreeMap::from([
1884 (
1885 (Timestamp::new_second(1), Timestamp::new_second(2)),
1886 vec![0, 1],
1887 ),
1888 (
1889 (Timestamp::new_second(2), Timestamp::new_second(3)),
1890 vec![1, 2],
1891 ),
1892 ]),
1893 vec![
1894 (
1895 (Timestamp::new_second(1), Timestamp::new_second(2)),
1896 BTreeSet::from([0, 1]),
1897 ),
1898 (
1899 (Timestamp::new_second(2), Timestamp::new_second(3)),
1900 BTreeSet::from([1, 2]),
1901 ),
1902 ],
1903 ),
1904 (
1906 BTreeMap::from([
1907 (
1908 (Timestamp::new_second(1), Timestamp::new_second(2)),
1909 vec![0, 1],
1910 ),
1911 (
1912 (Timestamp::new_second(2), Timestamp::new_second(3)),
1913 vec![2],
1914 ),
1915 ]),
1916 vec![
1917 (
1918 (Timestamp::new_second(1), Timestamp::new_second(2)),
1919 BTreeSet::from([0, 1]),
1920 ),
1921 (
1922 (Timestamp::new_second(2), Timestamp::new_second(3)),
1923 BTreeSet::from([2]),
1924 ),
1925 ],
1926 ),
1927 ];
1928
1929 run_compute_working_ranges_test(testcases, false);
1930 }
1931
1932 #[test]
1933 fn test_split_overlap_range() {
1934 let testcases = vec![
1935 (
1937 vec![PartitionRange {
1938 start: Timestamp::new_second(1),
1939 end: Timestamp::new_second(2),
1940 num_rows: 2,
1941 identifier: 0,
1942 }],
1943 BTreeMap::from_iter(
1944 vec![(
1945 (Timestamp::new_second(1), Timestamp::new_second(2)),
1946 vec![0],
1947 )]
1948 .into_iter(),
1949 ),
1950 ),
1951 (
1953 vec![
1954 PartitionRange {
1955 start: Timestamp::new_second(1),
1956 end: Timestamp::new_second(2),
1957 num_rows: 2,
1958 identifier: 0,
1959 },
1960 PartitionRange {
1961 start: Timestamp::new_second(1),
1962 end: Timestamp::new_second(2),
1963 num_rows: 2,
1964 identifier: 1,
1965 },
1966 ],
1967 BTreeMap::from_iter(
1968 vec![(
1969 (Timestamp::new_second(1), Timestamp::new_second(2)),
1970 vec![0, 1],
1971 )]
1972 .into_iter(),
1973 ),
1974 ),
1975 (
1976 vec![
1977 PartitionRange {
1978 start: Timestamp::new_second(1),
1979 end: Timestamp::new_second(3),
1980 num_rows: 2,
1981 identifier: 0,
1982 },
1983 PartitionRange {
1984 start: Timestamp::new_second(2),
1985 end: Timestamp::new_second(4),
1986 num_rows: 2,
1987 identifier: 1,
1988 },
1989 ],
1990 BTreeMap::from_iter(
1991 vec![
1992 (
1993 (Timestamp::new_second(1), Timestamp::new_second(2)),
1994 vec![0],
1995 ),
1996 (
1997 (Timestamp::new_second(2), Timestamp::new_second(3)),
1998 vec![0, 1],
1999 ),
2000 (
2001 (Timestamp::new_second(3), Timestamp::new_second(4)),
2002 vec![1],
2003 ),
2004 ]
2005 .into_iter(),
2006 ),
2007 ),
2008 (
2010 vec![
2011 PartitionRange {
2012 start: Timestamp::new_second(1),
2013 end: Timestamp::new_second(3),
2014 num_rows: 2,
2015 identifier: 0,
2016 },
2017 PartitionRange {
2018 start: Timestamp::new_second(2),
2019 end: Timestamp::new_second(4),
2020 num_rows: 2,
2021 identifier: 1,
2022 },
2023 PartitionRange {
2024 start: Timestamp::new_second(1),
2025 end: Timestamp::new_second(4),
2026 num_rows: 2,
2027 identifier: 2,
2028 },
2029 ],
2030 BTreeMap::from_iter(
2031 vec![
2032 (
2033 (Timestamp::new_second(1), Timestamp::new_second(2)),
2034 vec![0, 2],
2035 ),
2036 (
2037 (Timestamp::new_second(2), Timestamp::new_second(3)),
2038 vec![0, 1, 2],
2039 ),
2040 (
2041 (Timestamp::new_second(3), Timestamp::new_second(4)),
2042 vec![1, 2],
2043 ),
2044 ]
2045 .into_iter(),
2046 ),
2047 ),
2048 (
2049 vec![
2050 PartitionRange {
2051 start: Timestamp::new_second(1),
2052 end: Timestamp::new_second(3),
2053 num_rows: 2,
2054 identifier: 0,
2055 },
2056 PartitionRange {
2057 start: Timestamp::new_second(1),
2058 end: Timestamp::new_second(4),
2059 num_rows: 2,
2060 identifier: 1,
2061 },
2062 PartitionRange {
2063 start: Timestamp::new_second(2),
2064 end: Timestamp::new_second(4),
2065 num_rows: 2,
2066 identifier: 2,
2067 },
2068 ],
2069 BTreeMap::from_iter(
2070 vec![
2071 (
2072 (Timestamp::new_second(1), Timestamp::new_second(2)),
2073 vec![0, 1],
2074 ),
2075 (
2076 (Timestamp::new_second(2), Timestamp::new_second(3)),
2077 vec![0, 1, 2],
2078 ),
2079 (
2080 (Timestamp::new_second(3), Timestamp::new_second(4)),
2081 vec![1, 2],
2082 ),
2083 ]
2084 .into_iter(),
2085 ),
2086 ),
2087 ];
2088
2089 for (input, expected) in testcases {
2090 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2091 assert_eq!(split_overlapping_ranges(&input), expected);
2092 }
2093 }
2094
2095 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2096 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2097 Self {
2098 offset: offset as usize,
2099 len: len as usize,
2100 first_val: min_val,
2101 last_val: max_val,
2102 }
2103 }
2104 }
2105
2106 #[test]
2107 fn test_find_successive_runs() {
2108 let testcases = vec![
2109 (
2110 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2111 Some(SortOptions {
2112 descending: false,
2113 nulls_first: false,
2114 }),
2115 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2116 ),
2117 (
2118 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2119 Some(SortOptions {
2120 descending: false,
2121 nulls_first: false,
2122 }),
2123 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2124 ),
2125 (
2126 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2127 Some(SortOptions {
2128 descending: false,
2129 nulls_first: false,
2130 }),
2131 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2132 ),
2133 (
2134 vec![Some(1), Some(2), Some(1), Some(3)],
2135 Some(SortOptions {
2136 descending: false,
2137 nulls_first: false,
2138 }),
2139 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2140 ),
2141 (
2142 vec![Some(1), Some(2), Some(1), Some(3)],
2143 Some(SortOptions {
2144 descending: true,
2145 nulls_first: false,
2146 }),
2147 vec![
2148 (0, 1, Some(1), Some(1)),
2149 (1, 2, Some(2), Some(1)),
2150 (3, 1, Some(3), Some(3)),
2151 ],
2152 ),
2153 (
2154 vec![Some(1), Some(2), None, Some(3)],
2155 Some(SortOptions {
2156 descending: false,
2157 nulls_first: true,
2158 }),
2159 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2160 ),
2161 (
2162 vec![Some(1), Some(2), None, Some(3)],
2163 Some(SortOptions {
2164 descending: false,
2165 nulls_first: false,
2166 }),
2167 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2168 ),
2169 (
2170 vec![Some(2), Some(1), None, Some(3)],
2171 Some(SortOptions {
2172 descending: true,
2173 nulls_first: true,
2174 }),
2175 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2176 ),
2177 (
2178 vec![],
2179 Some(SortOptions {
2180 descending: false,
2181 nulls_first: true,
2182 }),
2183 vec![(0, 0, None, None)],
2184 ),
2185 (
2186 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2187 Some(SortOptions {
2188 descending: true,
2189 nulls_first: true,
2190 }),
2191 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2192 ),
2193 (
2194 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2195 Some(SortOptions {
2196 descending: true,
2197 nulls_first: false,
2198 }),
2199 vec![
2200 (0, 2, None, None),
2201 (2, 3, Some(2), Some(1)),
2202 (5, 2, Some(5), Some(4)),
2203 ],
2204 ),
2205 ];
2206 for (input, sort_opts, expected) in testcases {
2207 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2208 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2209 assert_eq!(
2210 ret, expected,
2211 "input: {:?}, opt: {:?},expected: {:?}",
2212 input, sort_opts, expected
2213 );
2214 }
2215 }
2216
2217 #[test]
2218 fn test_cmp_with_opts() {
2219 let testcases = vec![
2220 (
2222 Some(1),
2223 Some(2),
2224 Some(SortOptions {
2225 descending: false,
2226 nulls_first: false,
2227 }),
2228 std::cmp::Ordering::Less,
2229 ),
2230 (
2231 Some(1),
2232 Some(2),
2233 Some(SortOptions {
2234 descending: true,
2235 nulls_first: false,
2236 }),
2237 std::cmp::Ordering::Greater,
2238 ),
2239 (
2241 Some(1),
2242 None,
2243 Some(SortOptions {
2244 descending: false,
2245 nulls_first: true,
2246 }),
2247 std::cmp::Ordering::Greater,
2248 ),
2249 (
2250 Some(1),
2251 None,
2252 Some(SortOptions {
2253 descending: true,
2254 nulls_first: true,
2255 }),
2256 std::cmp::Ordering::Greater,
2257 ),
2258 (
2260 Some(1),
2261 None,
2262 Some(SortOptions {
2263 descending: true,
2264 nulls_first: false,
2265 }),
2266 std::cmp::Ordering::Less,
2267 ),
2268 (
2269 Some(1),
2270 None,
2271 Some(SortOptions {
2272 descending: false,
2273 nulls_first: false,
2274 }),
2275 std::cmp::Ordering::Less,
2276 ),
2277 (
2279 None,
2280 None,
2281 Some(SortOptions {
2282 descending: false,
2283 nulls_first: true,
2284 }),
2285 std::cmp::Ordering::Equal,
2286 ),
2287 ];
2288 for (a, b, opts, expected) in testcases {
2289 assert_eq!(
2290 cmp_with_opts(&a, &b, &opts),
2291 expected,
2292 "a: {:?}, b: {:?}, opts: {:?}",
2293 a,
2294 b,
2295 opts
2296 );
2297 }
2298 }
2299
2300 #[test]
2301 fn test_find_slice_from_range() {
2302 let test_cases = vec![
2303 (
2305 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2306 false,
2307 TimeRange {
2308 start: Timestamp::new_millisecond(2),
2309 end: Timestamp::new_millisecond(4),
2310 },
2311 Ok((1, 2)),
2312 ),
2313 (
2314 Arc::new(TimestampMillisecondArray::from_iter_values([
2315 -2, -1, 0, 1, 2, 3, 4, 5,
2316 ])) as ArrayRef,
2317 false,
2318 TimeRange {
2319 start: Timestamp::new_millisecond(-1),
2320 end: Timestamp::new_millisecond(4),
2321 },
2322 Ok((1, 5)),
2323 ),
2324 (
2325 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2326 false,
2327 TimeRange {
2328 start: Timestamp::new_millisecond(2),
2329 end: Timestamp::new_millisecond(5),
2330 },
2331 Ok((1, 2)),
2332 ),
2333 (
2334 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2335 false,
2336 TimeRange {
2337 start: Timestamp::new_millisecond(2),
2338 end: Timestamp::new_millisecond(5),
2339 },
2340 Ok((1, 3)),
2341 ),
2342 (
2343 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2344 false,
2345 TimeRange {
2346 start: Timestamp::new_millisecond(2),
2347 end: Timestamp::new_millisecond(5),
2348 },
2349 Ok((1, 2)),
2350 ),
2351 (
2352 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2353 false,
2354 TimeRange {
2355 start: Timestamp::new_millisecond(6),
2356 end: Timestamp::new_millisecond(7),
2357 },
2358 Ok((5, 0)),
2359 ),
2360 (
2362 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2363 true,
2364 TimeRange {
2365 end: Timestamp::new_millisecond(4),
2366 start: Timestamp::new_millisecond(1),
2367 },
2368 Ok((1, 3)),
2369 ),
2370 (
2371 Arc::new(TimestampMillisecondArray::from_iter_values([
2372 5, 4, 3, 2, 1, 0,
2373 ])) as ArrayRef,
2374 true,
2375 TimeRange {
2376 end: Timestamp::new_millisecond(4),
2377 start: Timestamp::new_millisecond(1),
2378 },
2379 Ok((2, 3)),
2380 ),
2381 (
2382 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2383 true,
2384 TimeRange {
2385 end: Timestamp::new_millisecond(4),
2386 start: Timestamp::new_millisecond(1),
2387 },
2388 Ok((1, 2)),
2389 ),
2390 (
2391 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2392 true,
2393 TimeRange {
2394 end: Timestamp::new_millisecond(4),
2395 start: Timestamp::new_millisecond(1),
2396 },
2397 Ok((2, 2)),
2398 ),
2399 (
2400 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2401 true,
2402 TimeRange {
2403 end: Timestamp::new_millisecond(5),
2404 start: Timestamp::new_millisecond(2),
2405 },
2406 Ok((1, 3)),
2407 ),
2408 (
2409 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2410 true,
2411 TimeRange {
2412 end: Timestamp::new_millisecond(5),
2413 start: Timestamp::new_millisecond(2),
2414 },
2415 Ok((1, 2)),
2416 ),
2417 (
2418 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2419 true,
2420 TimeRange {
2421 end: Timestamp::new_millisecond(5),
2422 start: Timestamp::new_millisecond(2),
2423 },
2424 Ok((1, 3)),
2425 ),
2426 (
2427 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2428 true,
2429 TimeRange {
2430 end: Timestamp::new_millisecond(5),
2431 start: Timestamp::new_millisecond(2),
2432 },
2433 Ok((1, 2)),
2434 ),
2435 (
2436 Arc::new(TimestampMillisecondArray::from_iter_values([
2437 10, 9, 8, 7, 6,
2438 ])) as ArrayRef,
2439 true,
2440 TimeRange {
2441 end: Timestamp::new_millisecond(5),
2442 start: Timestamp::new_millisecond(2),
2443 },
2444 Ok((5, 0)),
2445 ),
2446 (
2448 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2449 true,
2450 TimeRange {
2451 end: Timestamp::new_millisecond(4),
2452 start: Timestamp::new_millisecond(3),
2453 },
2454 Ok((0, 1)),
2455 ),
2456 (
2457 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2458 true,
2459 TimeRange {
2460 end: Timestamp::new_millisecond(4),
2461 start: Timestamp::new_millisecond(3),
2462 },
2463 Ok((1, 1)),
2464 ),
2465 (
2466 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2467 true,
2468 TimeRange {
2469 end: Timestamp::new_millisecond(4),
2470 start: Timestamp::new_millisecond(3),
2471 },
2472 Ok((2, 1)),
2473 ),
2474 ];
2475
2476 for (sort_vals, descending, range, expected) in test_cases {
2477 let sort_column = SortColumn {
2478 values: sort_vals,
2479 options: Some(SortOptions {
2480 descending,
2481 ..Default::default()
2482 }),
2483 };
2484 let ret = find_slice_from_range(&sort_column, &range);
2485 match (ret, expected) {
2486 (Ok(ret), Ok(expected)) => {
2487 assert_eq!(
2488 ret, expected,
2489 "sort_vals: {:?}, range: {:?}",
2490 sort_column, range
2491 )
2492 }
2493 (Err(err), Err(expected)) => {
2494 let expected: &str = expected;
2495 assert!(
2496 err.to_string().contains(expected),
2497 "err: {:?}, expected: {:?}",
2498 err,
2499 expected
2500 );
2501 }
2502 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2503 }
2504 }
2505 }
2506
2507 #[derive(Debug)]
2508 struct TestStream {
2509 expression: PhysicalSortExpr,
2510 fetch: Option<usize>,
2511 input: Vec<(PartitionRange, DfRecordBatch)>,
2512 output: Vec<DfRecordBatch>,
2513 schema: SchemaRef,
2514 }
2515
2516 impl TestStream {
2517 fn new(
2518 opt: SortOptions,
2519 fetch: Option<usize>,
2520 unit: TimeUnit,
2521 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2522 expected: Vec<Vec<ArrayRef>>,
2523 ) -> Self {
2524 let expression = PhysicalSortExpr {
2525 expr: Arc::new(helpers::ts_column()),
2526 options: opt,
2527 };
2528 let schema = Schema::new(vec![helpers::ts_field(unit)]);
2529 let schema = Arc::new(schema);
2530 let input = input
2531 .into_iter()
2532 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2533 .collect_vec();
2534 let output_batchs = expected
2535 .into_iter()
2536 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2537 .collect_vec();
2538 Self {
2539 expression,
2540 fetch,
2541 input,
2542 output: output_batchs,
2543 schema,
2544 }
2545 }
2546
2547 fn new_simple(
2548 descending: bool,
2549 fetch: Option<usize>,
2550 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2551 expected: Vec<Vec<ArrayRef>>,
2552 ) -> Self {
2553 Self::new(
2554 helpers::default_sort_opts(descending),
2555 fetch,
2556 TimeUnit::Millisecond,
2557 input,
2558 expected,
2559 )
2560 }
2561
2562 async fn run_test(&self) -> Vec<DfRecordBatch> {
2563 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2564
2565 let mock_input = MockInputExec::new(vec![batches], self.schema.clone());
2566
2567 let exec = WindowedSortExec::try_new(
2568 self.expression.clone(),
2569 self.fetch,
2570 vec![ranges],
2571 Arc::new(mock_input),
2572 )
2573 .unwrap();
2574
2575 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2576
2577 let real_output = exec_stream.collect::<Vec<_>>().await;
2578 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2579 real_output
2580 }
2581 }
2582
2583 #[tokio::test]
2584 async fn test_window_sort_empty_and_minimal() {
2585 use helpers::*;
2586 let test_cases = [
2587 TestStream::new_simple(false, None, vec![], vec![]),
2589 TestStream::new_simple(
2591 false,
2592 None,
2593 vec![
2594 (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2595 (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2596 ],
2597 vec![vec![ts_array([2])]],
2598 ),
2599 TestStream::new_simple(
2601 false,
2602 None,
2603 vec![
2604 (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2605 (partition_range(1, 3, 1, 0), vec![ts_array([])]),
2606 ],
2607 vec![],
2608 ),
2609 TestStream::new_simple(
2611 false,
2612 None,
2613 vec![
2614 (partition_range(1, 2, 1, 0), vec![ts_array([1])]),
2615 (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2616 ],
2617 vec![vec![ts_array([1])], vec![ts_array([2])]],
2618 ),
2619 ];
2620
2621 for (idx, testcase) in test_cases.iter().enumerate() {
2622 let output = testcase.run_test().await;
2623 assert_eq!(output, testcase.output, "empty/minimal case {idx} failed");
2624 }
2625 }
2626
2627 #[tokio::test]
2628 async fn test_window_sort_overlapping() {
2629 use helpers::*;
2630 let test_cases = [
2631 TestStream::new_simple(
2633 false,
2634 None,
2635 vec![
2636 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2637 (partition_range(1, 4, 1, 0), vec![ts_array([2, 3])]),
2638 ],
2639 vec![
2640 vec![ts_array([1, 2])],
2641 vec![ts_array([2])],
2642 vec![ts_array([3])],
2643 ],
2644 ),
2645 TestStream::new_simple(
2647 false,
2648 None,
2649 vec![
2650 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2651 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2652 ],
2653 vec![vec![ts_array([1, 1, 2, 2])], vec![ts_array([3])]],
2654 ),
2655 TestStream::new_simple(
2657 false,
2658 None,
2659 vec![
2660 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2661 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2662 (partition_range(4, 6, 1, 1), vec![ts_array([4, 5])]),
2663 ],
2664 vec![
2665 vec![ts_array([1, 1, 2, 2])],
2666 vec![ts_array([3])],
2667 vec![ts_array([4, 5])],
2668 ],
2669 ),
2670 ];
2671
2672 for (idx, testcase) in test_cases.iter().enumerate() {
2673 let output = testcase.run_test().await;
2674 assert_eq!(output, testcase.output, "overlapping case {idx} failed");
2675 }
2676 }
2677
2678 #[tokio::test]
2679 async fn test_window_sort_with_fetch() {
2680 use helpers::*;
2681 let test_cases = [
2682 TestStream::new_simple(
2684 false,
2685 Some(6),
2686 vec![
2687 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2688 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2689 (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2690 ],
2691 vec![
2692 vec![ts_array([1, 1, 2, 2])],
2693 vec![ts_array([3])],
2694 vec![ts_array([4])],
2695 ],
2696 ),
2697 TestStream::new_simple(
2699 false,
2700 Some(3),
2701 vec![
2702 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2703 (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2704 (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2705 ],
2706 vec![vec![ts_array([1, 1, 2])]],
2707 ),
2708 ];
2709
2710 for (idx, testcase) in test_cases.iter().enumerate() {
2711 let output = testcase.run_test().await;
2712 assert_eq!(output, testcase.output, "fetch case {idx} failed");
2713 }
2714 }
2715
2716 #[tokio::test]
2717 async fn test_window_sort_descending() {
2718 use helpers::*;
2719 let test_cases = [
2720 TestStream::new_simple(
2722 true,
2723 None,
2724 vec![
2725 (partition_range(3, 6, 1, 1), vec![ts_array([5, 4])]),
2726 (partition_range(1, 4, 1, 1), vec![ts_array([3, 2, 1])]),
2727 (partition_range(1, 3, 1, 0), vec![ts_array([2, 1])]),
2728 ],
2729 vec![
2730 vec![ts_array([5, 4])],
2731 vec![ts_array([3])],
2732 vec![ts_array([2, 2, 1, 1])],
2733 ],
2734 ),
2735 ];
2736
2737 for (idx, testcase) in test_cases.iter().enumerate() {
2738 let output = testcase.run_test().await;
2739 assert_eq!(output, testcase.output, "descending case {idx} failed");
2740 }
2741 }
2742
2743 #[tokio::test]
2744 async fn test_window_sort_complex() {
2745 use helpers::*;
2746 let test_cases = [
2747 TestStream::new_simple(
2749 false,
2750 None,
2751 vec![
2752 (partition_range(1, 10, 1, 0), vec![ts_array([1, 5, 9])]),
2753 (partition_range(3, 7, 1, 1), vec![ts_array([3, 4, 5, 6])]),
2754 ],
2755 vec![vec![ts_array([1])], vec![ts_array([3, 4, 5, 5, 6, 9])]],
2756 ),
2757 TestStream::new_simple(
2759 false,
2760 None,
2761 vec![
2762 (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2763 (
2764 partition_range(1, 10, 1, 1),
2765 vec![ts_array([1, 3, 4, 5, 6, 8])],
2766 ),
2767 (partition_range(7, 10, 1, 1), vec![ts_array([7, 8, 9])]),
2768 ],
2769 vec![
2770 vec![ts_array([1, 1, 2])],
2771 vec![ts_array([3, 4, 5, 6])],
2772 vec![ts_array([7, 8, 8, 9])],
2773 ],
2774 ),
2775 TestStream::new_simple(
2777 false,
2778 None,
2779 vec![
2780 (
2781 partition_range(1, 11, 1, 0),
2782 vec![ts_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])],
2783 ),
2784 (partition_range(5, 7, 1, 1), vec![ts_array([5, 6])]),
2785 ],
2786 vec![
2787 vec![ts_array([1, 2, 3, 4])],
2788 vec![ts_array([5, 5, 6, 6, 7, 8, 9, 10])],
2789 ],
2790 ),
2791 ];
2792
2793 for (idx, testcase) in test_cases.iter().enumerate() {
2794 let output = testcase.run_test().await;
2795 assert_eq!(output, testcase.output, "complex case {idx} failed");
2796 }
2797 }
2798
2799 #[tokio::test]
2800 async fn fuzzy_ish_test_window_sort_stream() {
2801 let test_cnt = 100;
2802 let part_cnt_bound = 100;
2803 let range_size_bound = 100;
2804 let range_offset_bound = 100;
2805 let in_range_datapoint_cnt_bound = 100;
2806 let fetch_bound = 100;
2807
2808 let mut rng = fastrand::Rng::new();
2809 let rng_seed = rng.u64(..);
2810 rng.seed(rng_seed);
2811 let mut bound_val = None;
2812 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
2814 let mut full_testcase_list = Vec::new();
2815 for _case_id in 0..test_cnt {
2816 let descending = rng.bool();
2817 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
2818 if descending {
2819 return Box::new(|a: &T, b: &T| b.cmp(a));
2820 }
2821 Box::new(|a: &T, b: &T| a.cmp(b))
2822 }
2823 let unit = match rng.u8(0..3) {
2824 0 => TimeUnit::Second,
2825 1 => TimeUnit::Millisecond,
2826 2 => TimeUnit::Microsecond,
2827 _ => TimeUnit::Nanosecond,
2828 };
2829 let fetch = if rng.bool() {
2830 Some(rng.usize(0..fetch_bound))
2831 } else {
2832 None
2833 };
2834
2835 let mut input_ranged_data = vec![];
2836 let mut output_data: Vec<i64> = vec![];
2837 for part_id in 0..rng.usize(0..part_cnt_bound) {
2839 let (start, end) = if descending {
2840 let end = bound_val
2842 .map(|i| i - rng.i64(1..=range_offset_bound))
2843 .unwrap_or_else(|| rng.i64(..));
2844 bound_val = Some(end);
2845 let start = end - rng.i64(1..range_size_bound);
2846 let start = Timestamp::new(start, unit.into());
2847 let end = Timestamp::new(end, unit.into());
2848 (start, end)
2849 } else {
2850 let start = bound_val
2852 .map(|i| i + rng.i64(1..=range_offset_bound))
2853 .unwrap_or_else(|| rng.i64(..));
2854 bound_val = Some(start);
2855 let end = start + rng.i64(1..range_size_bound);
2856 let start = Timestamp::new(start, unit.into());
2857 let end = Timestamp::new(end, unit.into());
2858 (start, end)
2859 };
2860
2861 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
2862 let data_gen = iter
2863 .map(|_| rng.i64(start.value()..end.value()))
2864 .sorted_by(ret_cmp_fn(descending))
2865 .collect_vec();
2866 output_data.extend(data_gen.clone());
2867 let arr = new_ts_array(unit, data_gen);
2868 let range = PartitionRange {
2869 start,
2870 end,
2871 num_rows: arr.len(),
2872 identifier: part_id,
2873 };
2874 input_ranged_data.push((range, vec![arr]));
2875 }
2876
2877 output_data.sort_by(ret_cmp_fn(descending));
2878 if let Some(fetch) = fetch {
2879 output_data.truncate(fetch);
2880 }
2881 let output_arr = new_ts_array(unit, output_data);
2882
2883 let test_stream = TestStream::new(
2884 helpers::default_sort_opts(descending),
2885 fetch,
2886 unit,
2887 input_ranged_data.clone(),
2888 vec![vec![output_arr]],
2889 );
2890 full_testcase_list.push(test_stream);
2891 }
2892
2893 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
2894 let res = test_stream.run_test().await;
2895 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
2896 let expected = test_stream.output;
2897 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
2898
2899 if res_concat != expected_concat {
2900 {
2901 let mut f_input = std::io::stderr();
2902 f_input.write_all(b"[").unwrap();
2903 for (input_range, input_arr) in test_stream.input {
2904 let range_json = json!({
2905 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
2906 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
2907 "num_rows": input_range.num_rows,
2908 "identifier": input_range.identifier,
2909 });
2910 let buf = Vec::new();
2911 let mut input_writer = ArrayWriter::new(buf);
2912 input_writer.write(&input_arr).unwrap();
2913 input_writer.finish().unwrap();
2914 let res_str =
2915 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
2916 let whole_json =
2917 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
2918 f_input.write_all(whole_json.as_bytes()).unwrap();
2919 }
2920 f_input.write_all(b"]").unwrap();
2921 }
2922 {
2923 let mut f_res = std::io::stderr();
2924 f_res.write_all(b"[").unwrap();
2925 for batch in &res {
2926 let mut res_writer = ArrayWriter::new(f_res);
2927 res_writer.write(batch).unwrap();
2928 res_writer.finish().unwrap();
2929 f_res = res_writer.into_inner();
2930 f_res.write_all(b",").unwrap();
2931 }
2932 f_res.write_all(b"]").unwrap();
2933
2934 let f_res_concat = std::io::stderr();
2935 let mut res_writer = ArrayWriter::new(f_res_concat);
2936 res_writer.write(&res_concat).unwrap();
2937 res_writer.finish().unwrap();
2938
2939 let f_expected = std::io::stderr();
2940 let mut expected_writer = ArrayWriter::new(f_expected);
2941 expected_writer.write(&expected_concat).unwrap();
2942 expected_writer.finish().unwrap();
2943 }
2944 panic!(
2945 "case failed, case id: {0}, output and expected output to stderr",
2946 case_id
2947 );
2948 }
2949 assert_eq!(
2950 res_concat, expected_concat,
2951 "case failed, case id: {}, rng seed: {}",
2952 case_id, rng_seed
2953 );
2954 }
2955 }
2956}