1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{internal_err, DataFusionError};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68 expression: PhysicalSortExpr,
70 fetch: Option<usize>,
72 ranges: Vec<Vec<PartitionRange>>,
76 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81 input: Arc<dyn ExecutionPlan>,
82 metrics: ExecutionPlanMetricsSet,
84 properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88 ranges: &[Vec<PartitionRange>],
89 descending: bool,
90) -> Result<()> {
91 let is_valid = ranges.iter().all(|r| {
92 if descending {
93 r.windows(2).all(|w| w[0].end >= w[1].end)
94 } else {
95 r.windows(2).all(|w| w[0].start <= w[1].start)
96 }
97 });
98
99 if !is_valid {
100 let msg = if descending {
101 "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102 } else {
103 "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104 };
105 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107 } else {
108 Ok(())
109 }
110}
111
112impl WindowedSortExec {
113 pub fn try_new(
114 expression: PhysicalSortExpr,
115 fetch: Option<usize>,
116 ranges: Vec<Vec<PartitionRange>>,
117 input: Arc<dyn ExecutionPlan>,
118 ) -> Result<Self> {
119 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121 let mut eq_properties = input.equivalence_properties().clone();
122 eq_properties.reorder(vec![expression.clone()])?;
123
124 let properties = input.properties();
125 let properties = PlanProperties::new(
126 eq_properties,
127 input.output_partitioning().clone(),
128 properties.emission_type,
129 properties.boundedness,
130 );
131
132 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133 for r in &ranges {
134 let overlap_counts = split_overlapping_ranges(r);
135 let working_ranges =
136 compute_all_working_ranges(&overlap_counts, expression.options.descending);
137 all_avail_working_range.push(working_ranges);
138 }
139
140 Ok(Self {
141 expression,
142 fetch,
143 ranges,
144 all_avail_working_range,
145 input,
146 metrics: ExecutionPlanMetricsSet::new(),
147 properties,
148 })
149 }
150
151 pub fn to_stream(
155 &self,
156 context: Arc<TaskContext>,
157 partition: usize,
158 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159 let input_stream: DfSendableRecordBatchStream =
160 self.input.execute(partition, context.clone())?;
161
162 let df_stream = Box::pin(WindowedSortStream::new(
163 context,
164 self,
165 input_stream,
166 partition,
167 )) as _;
168
169 Ok(df_stream)
170 }
171}
172
173impl DisplayAs for WindowedSortExec {
174 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(
176 f,
177 "WindowedSortExec: expr={} num_ranges={}",
178 self.expression,
179 self.ranges.len()
180 )?;
181 if let Some(fetch) = self.fetch {
182 write!(f, " fetch={}", fetch)?;
183 }
184 Ok(())
185 }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189 fn as_any(&self) -> &dyn Any {
190 self
191 }
192
193 fn schema(&self) -> SchemaRef {
194 self.input.schema()
195 }
196
197 fn properties(&self) -> &PlanProperties {
198 &self.properties
199 }
200
201 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202 vec![&self.input]
203 }
204
205 fn with_new_children(
206 self: Arc<Self>,
207 children: Vec<Arc<dyn ExecutionPlan>>,
208 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209 let new_input = if let Some(first) = children.first() {
210 first
211 } else {
212 internal_err!("No children found")?
213 };
214 let new = Self::try_new(
215 self.expression.clone(),
216 self.fetch,
217 self.ranges.clone(),
218 new_input.clone(),
219 )?;
220 Ok(Arc::new(new))
221 }
222
223 fn execute(
224 &self,
225 partition: usize,
226 context: Arc<TaskContext>,
227 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228 self.to_stream(context, partition)
229 }
230
231 fn metrics(&self) -> Option<MetricsSet> {
232 Some(self.metrics.clone_inner())
233 }
234
235 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241 vec![false]
242 }
243
244 fn name(&self) -> &str {
245 "WindowedSortExec"
246 }
247}
248
249pub struct WindowedSortStream {
256 memory_pool: Arc<dyn MemoryPool>,
258 in_progress: Vec<DfRecordBatch>,
260 last_value: Option<Timestamp>,
262 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264 merge_stream: VecDeque<DfSendableRecordBatchStream>,
266 merge_count: usize,
268 working_idx: usize,
270 input: DfSendableRecordBatchStream,
272 is_terminated: bool,
274 schema: SchemaRef,
276 expression: PhysicalSortExpr,
278 fetch: Option<usize>,
280 produced: usize,
282 batch_size: usize,
284 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288 #[allow(dead_code)] ranges: Vec<PartitionRange>,
291 metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296 pub fn new(
297 context: Arc<TaskContext>,
298 exec: &WindowedSortExec,
299 input: DfSendableRecordBatchStream,
300 partition: usize,
301 ) -> Self {
302 Self {
303 memory_pool: context.runtime_env().memory_pool.clone(),
304 in_progress: Vec::new(),
305 last_value: None,
306 sorted_input_runs: Vec::new(),
307 merge_stream: VecDeque::new(),
308 merge_count: 0,
309 working_idx: 0,
310 schema: input.schema(),
311 input,
312 is_terminated: false,
313 expression: exec.expression.clone(),
314 fetch: exec.fetch,
315 produced: 0,
316 batch_size: context.session_config().batch_size(),
317 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318 ranges: exec.ranges[partition].clone(),
319 metrics: BaselineMetrics::new(&exec.metrics, partition),
320 }
321 }
322}
323
324impl WindowedSortStream {
325 #[cfg(debug_assertions)]
326 fn check_subset_ranges(&self, cur_range: &TimeRange) {
327 let cur_is_subset_to = self
328 .ranges
329 .iter()
330 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331 .collect_vec();
332 if cur_is_subset_to.is_empty() {
333 error!("Current range is not a subset of any PartitionRange");
334 let subset_ranges = self
336 .ranges
337 .iter()
338 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339 .collect_vec();
340 let only_overlap = self
341 .ranges
342 .iter()
343 .filter(|r| {
344 let r = TimeRange::from(*r);
345 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346 })
347 .collect_vec();
348 error!(
349 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350 subset_ranges.len(),
351 only_overlap.len(),
352 subset_ranges,
353 only_overlap
354 );
355 } else {
356 let only_overlap = self
357 .ranges
358 .iter()
359 .filter(|r| {
360 let r = TimeRange::from(*r);
361 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362 })
363 .collect_vec();
364 error!(
365 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366 cur_is_subset_to.len(),
367 only_overlap.len(),
368 cur_is_subset_to,
369 only_overlap
370 );
371 }
372 let all_overlap_working_range = self
373 .all_avail_working_range
374 .iter()
375 .filter(|(range, _)| range.is_overlapping(cur_range))
376 .map(|(range, _)| range)
377 .collect_vec();
378 error!(
379 "Found {} working ranges that overlap with current range: {:?}",
380 all_overlap_working_range.len(),
381 all_overlap_working_range
382 );
383 }
384
385 fn poll_result_stream(
387 &mut self,
388 cx: &mut Context<'_>,
389 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391 match merge_stream.as_mut().poll_next(cx) {
392 Poll::Ready(Some(Ok(batch))) => {
393 let ret = if let Some(remaining) = self.remaining_fetch() {
394 if remaining == 0 {
395 self.is_terminated = true;
396 None
397 } else if remaining < batch.num_rows() {
398 self.produced += remaining;
399 Some(Ok(batch.slice(0, remaining)))
400 } else {
401 self.produced += batch.num_rows();
402 Some(Ok(batch))
403 }
404 } else {
405 self.produced += batch.num_rows();
406 Some(Ok(batch))
407 };
408 return Poll::Ready(ret);
409 }
410 Poll::Ready(Some(Err(e))) => {
411 return Poll::Ready(Some(Err(e)));
412 }
413 Poll::Ready(None) => {
414 self.merge_stream.pop_front();
417 continue;
418 }
419 Poll::Pending => {
420 return Poll::Pending;
421 }
422 }
423 }
424 Poll::Ready(None)
426 }
427
428 pub fn poll_next_inner(
432 mut self: Pin<&mut Self>,
433 cx: &mut Context<'_>,
434 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435 match self.poll_result_stream(cx) {
437 Poll::Ready(None) => {
438 if self.is_terminated {
439 return Poll::Ready(None);
440 }
441 }
442 x => return x,
443 };
444
445 while !self.is_terminated {
447 let SortedRunSet {
449 runs_with_batch,
450 sort_column,
451 } = match self.input.as_mut().poll_next(cx) {
452 Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
453 Poll::Ready(Some(Err(e))) => {
454 return Poll::Ready(Some(Err(e)));
455 }
456 Poll::Ready(None) => {
457 self.is_terminated = true;
459 self.build_sorted_stream()?;
460 self.start_new_merge_sort()?;
461 break;
462 }
463 Poll::Pending => return Poll::Pending,
464 };
465
466 let mut last_remaining = None;
472 let mut run_iter = runs_with_batch.into_iter();
473 loop {
474 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
475 break;
476 };
477 if sorted_rb.num_rows() == 0 {
478 continue;
479 }
480 let Some(cur_range) = run_info.get_time_range() else {
482 internal_err!("Found NULL in time index column")?
483 };
484 let Some(working_range) = self.get_working_range() else {
485 internal_err!("No working range found")?
486 };
487
488 if sort_column.options.unwrap_or_default().descending {
490 if cur_range.end > working_range.end {
491 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
492 #[cfg(debug_assertions)]
493 self.check_subset_ranges(&cur_range);
494 internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
495 }
496 } else if cur_range.start < working_range.start {
497 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
498 #[cfg(debug_assertions)]
499 self.check_subset_ranges(&cur_range);
500 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
501 }
502
503 if cur_range.is_subset(&working_range) {
504 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
507 } else if let Some(intersection) = cur_range.intersection(&working_range) {
508 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
510 let (offset, len) = find_slice_from_range(
511 &SortColumn {
512 values: cur_sort_column.clone(),
513 options: sort_column.options,
514 },
515 &intersection,
516 )?;
517
518 if offset != 0 {
519 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
520 }
521
522 let sliced_rb = sorted_rb.slice(offset, len);
523
524 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
526 self.build_sorted_stream()?;
528
529 self.start_new_merge_sort()?;
531
532 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
533 if r_len != 0 {
534 let remaining_rb = sorted_rb.slice(r_offset, r_len);
536 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
537 let new_run_info = SucRun {
538 offset: run_info.offset + r_offset,
539 len: r_len,
540 first_val: new_first_val,
541 last_val: run_info.last_val,
542 };
543 last_remaining = Some((remaining_rb, new_run_info));
544 }
545 } else {
551 self.build_sorted_stream()?;
554 self.start_new_merge_sort()?;
555
556 last_remaining = Some((sorted_rb, run_info));
558 }
559 }
560
561 match self.poll_result_stream(cx) {
563 Poll::Ready(None) => {
564 if self.is_terminated {
565 return Poll::Ready(None);
566 }
567 }
568 x => return x,
569 };
570 }
571 self.poll_result_stream(cx)
573 }
574
575 fn push_batch(&mut self, batch: DfRecordBatch) {
576 self.in_progress.push(batch);
577 }
578
579 fn try_concat_batch(
583 &mut self,
584 batch: DfRecordBatch,
585 run_info: &SucRun<Timestamp>,
586 opt: Option<SortOptions>,
587 ) -> datafusion_common::Result<()> {
588 let is_ok_to_concat =
589 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
590
591 if is_ok_to_concat {
592 self.push_batch(batch);
593 } else {
595 self.build_sorted_stream()?;
597 self.push_batch(batch);
598 }
599 self.last_value = run_info.last_val;
600 Ok(())
601 }
602
603 fn get_working_range(&self) -> Option<TimeRange> {
605 self.all_avail_working_range
606 .get(self.working_idx)
607 .map(|(range, _)| *range)
608 }
609
610 fn set_next_working_range(&mut self) {
612 self.working_idx += 1;
613 }
614
615 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
617 if self.in_progress.is_empty() {
618 return Ok(());
619 }
620 let data = std::mem::take(&mut self.in_progress);
621
622 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
623 self.sorted_input_runs.push(Box::pin(new_stream));
624 Ok(())
625 }
626
627 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
629 if !self.in_progress.is_empty() {
630 return internal_err!("Starting a merge sort when in_progress is not empty")?;
631 }
632
633 self.set_next_working_range();
634
635 let streams = std::mem::take(&mut self.sorted_input_runs);
636 if streams.is_empty() {
637 return Ok(());
638 } else if streams.len() == 1 {
639 self.merge_stream
640 .push_back(streams.into_iter().next().unwrap());
641 return Ok(());
642 }
643
644 let fetch = self.remaining_fetch();
645 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
646 .register(&self.memory_pool);
647 self.merge_count += 1;
648
649 let resulting_stream = StreamingMergeBuilder::new()
650 .with_streams(streams)
651 .with_schema(self.schema())
652 .with_expressions(&[self.expression.clone()].into())
653 .with_metrics(self.metrics.clone())
654 .with_batch_size(self.batch_size)
655 .with_fetch(fetch)
656 .with_reservation(reservation)
657 .build()?;
658 self.merge_stream.push_back(resulting_stream);
659 Ok(())
661 }
662
663 fn remaining_fetch(&self) -> Option<usize> {
666 let total_now = self.produced;
667 self.fetch.map(|p| p.saturating_sub(total_now))
668 }
669}
670
671impl Stream for WindowedSortStream {
672 type Item = datafusion_common::Result<DfRecordBatch>;
673
674 fn poll_next(
675 mut self: Pin<&mut Self>,
676 cx: &mut Context<'_>,
677 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
678 let result = self.as_mut().poll_next_inner(cx);
679 self.metrics.record_poll(result)
680 }
681}
682
683impl RecordBatchStream for WindowedSortStream {
684 fn schema(&self) -> SchemaRef {
685 self.schema.clone()
686 }
687}
688
689fn split_batch_to_sorted_run(
691 batch: DfRecordBatch,
692 expression: &PhysicalSortExpr,
693) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
694 let sort_column = expression.evaluate_to_sort_column(&batch)?;
696 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
697 if let Some(run) = sorted_runs_offset.first()
698 && sorted_runs_offset.len() == 1
699 {
700 if !(run.offset == 0 && run.len == batch.num_rows()) {
701 internal_err!(
702 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
703 run.offset,
704 run.len,
705 batch.num_rows()
706 )?;
707 }
708 Ok(SortedRunSet {
710 runs_with_batch: vec![(batch, run.clone())],
711 sort_column,
712 })
713 } else {
714 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
716 for run in sorted_runs_offset {
717 if run.offset + run.len > batch.num_rows() {
718 internal_err!(
719 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
720 run.offset,
721 run.len,
722 batch.num_rows()
723 )?;
724 }
725 let new_rb = batch.slice(run.offset, run.len);
726 ret.push((new_rb, run));
727 }
728 Ok(SortedRunSet {
729 runs_with_batch: ret,
730 sort_column,
731 })
732 }
733}
734
735#[macro_export]
739macro_rules! downcast_ts_array {
740 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
741 {
742 match $data_type {
743 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
744 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
745 }
746 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
747 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
748 }
749 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
750 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
751 }
752 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
753 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
754 }
755 $($p => $fallback,)*
756 }
757 };
758}
759
760fn find_slice_from_range(
764 sort_column: &SortColumn,
765 range: &TimeRange,
766) -> datafusion_common::Result<(usize, usize)> {
767 let ty = sort_column.values.data_type();
768 let time_unit = if let DataType::Timestamp(unit, _) = ty {
769 unit
770 } else {
771 return Err(DataFusionError::Internal(format!(
772 "Unsupported sort column type: {}",
773 sort_column.values.data_type()
774 )));
775 };
776 let array = &sort_column.values;
777 let opt = &sort_column.options.unwrap_or_default();
778 let descending = opt.descending;
779
780 let typed_sorted_range = [range.start, range.end]
781 .iter()
782 .map(|t| {
783 t.convert_to(time_unit.into())
784 .ok_or_else(|| {
785 DataFusionError::Internal(format!(
786 "Failed to convert timestamp from {:?} to {:?}",
787 t.unit(),
788 time_unit
789 ))
790 })
791 .and_then(|typed_ts| {
792 let value = Value::Timestamp(typed_ts);
793 value
794 .try_to_scalar_value(&value.data_type())
795 .map_err(|e| DataFusionError::External(Box::new(e) as _))
796 })
797 })
798 .try_collect::<_, Vec<_>, _>()?;
799
800 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
801
802 let (start, end) = if descending {
804 let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
808 let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
811 (start, end)
812 } else {
813 let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
816 let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
819 (start, end)
820 };
821
822 Ok((start, end - start))
823}
824
825#[macro_export]
829macro_rules! array_iter_helper {
830 ($t:ty, $unit:expr, $arr:expr) => {{
831 let typed = $arr
832 .as_any()
833 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
834 .unwrap();
835 let iter = typed.iter().enumerate();
836 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
837 }};
838}
839
840fn cmp_with_opts<T: Ord>(
844 a: &Option<T>,
845 b: &Option<T>,
846 opt: &Option<SortOptions>,
847) -> std::cmp::Ordering {
848 let opt = opt.unwrap_or_default();
849
850 if let (Some(a), Some(b)) = (a, b) {
851 if opt.descending {
852 b.cmp(a)
853 } else {
854 a.cmp(b)
855 }
856 } else if opt.nulls_first {
857 a.cmp(b)
860 } else {
861 match (a, b) {
862 (Some(a), Some(b)) => a.cmp(b),
863 (Some(_), None) => std::cmp::Ordering::Less,
864 (None, Some(_)) => std::cmp::Ordering::Greater,
865 (None, None) => std::cmp::Ordering::Equal,
866 }
867 }
868}
869
870#[derive(Debug, Clone)]
871struct SortedRunSet<N: Ord> {
872 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
874 sort_column: SortColumn,
876}
877
878#[derive(Debug, Clone, PartialEq)]
880struct SucRun<N: Ord> {
881 offset: usize,
883 len: usize,
885 first_val: Option<N>,
887 last_val: Option<N>,
889}
890
891impl SucRun<Timestamp> {
892 fn get_time_range(&self) -> Option<TimeRange> {
894 let start = self.first_val.min(self.last_val);
895 let end = self
896 .first_val
897 .max(self.last_val)
898 .map(|i| Timestamp::new(i.value() + 1, i.unit()));
899 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
900 }
901}
902
903fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
905 iter: T,
906 sort_opts: &Option<SortOptions>,
907) -> Vec<SucRun<N>> {
908 let mut runs = Vec::new();
909 let mut last_value = None;
910 let mut iter_len = None;
911
912 let mut last_offset = 0;
913 let mut first_val: Option<N> = None;
914 let mut last_val: Option<N> = None;
915
916 for (idx, t) in iter {
917 if let Some(last_value) = &last_value {
918 if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
919 let len = idx - last_offset;
921 let run = SucRun {
922 offset: last_offset,
923 len,
924 first_val,
925 last_val,
926 };
927 runs.push(run);
928 first_val = None;
929 last_val = None;
930
931 last_offset = idx;
932 }
933 }
934 last_value = Some(t);
935 if let Some(t) = t {
936 first_val = first_val.or(Some(t));
937 last_val = Some(t).or(last_val);
938 }
939 iter_len = Some(idx);
940 }
941 let run = SucRun {
942 offset: last_offset,
943 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
944 first_val,
945 last_val,
946 };
947 runs.push(run);
948
949 runs
950}
951
952fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
956 let ty = sort_column.values.data_type();
957 if let DataType::Timestamp(unit, _) = ty {
958 let array = &sort_column.values;
959 let iter = downcast_ts_array!(
960 array.data_type() => (array_iter_helper, array),
961 _ => internal_err!("Unsupported sort column type: {ty}")?
962 );
963
964 let raw = find_successive_runs(iter, &sort_column.options);
965 let ts_runs = raw
966 .into_iter()
967 .map(|run| SucRun {
968 offset: run.offset,
969 len: run.len,
970 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
971 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
972 })
973 .collect_vec();
974 Ok(ts_runs)
975 } else {
976 Err(DataFusionError::Internal(format!(
977 "Unsupported sort column type: {ty}"
978 )))
979 }
980}
981
982#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
986struct TimeRange {
987 start: Timestamp,
988 end: Timestamp,
989}
990
991impl From<&PartitionRange> for TimeRange {
992 fn from(range: &PartitionRange) -> Self {
993 Self::new(range.start, range.end)
994 }
995}
996
997impl From<(Timestamp, Timestamp)> for TimeRange {
998 fn from(range: (Timestamp, Timestamp)) -> Self {
999 Self::new(range.0, range.1)
1000 }
1001}
1002
1003impl From<&(Timestamp, Timestamp)> for TimeRange {
1004 fn from(range: &(Timestamp, Timestamp)) -> Self {
1005 Self::new(range.0, range.1)
1006 }
1007}
1008
1009impl TimeRange {
1010 fn new(start: Timestamp, end: Timestamp) -> Self {
1012 if start > end {
1013 Self {
1014 start: end,
1015 end: start,
1016 }
1017 } else {
1018 Self { start, end }
1019 }
1020 }
1021
1022 fn is_subset(&self, other: &Self) -> bool {
1023 self.start >= other.start && self.end <= other.end
1024 }
1025
1026 fn is_overlapping(&self, other: &Self) -> bool {
1028 !(self.start >= other.end || self.end <= other.start)
1029 }
1030
1031 fn intersection(&self, other: &Self) -> Option<Self> {
1032 if self.is_overlapping(other) {
1033 Some(Self::new(
1034 self.start.max(other.start),
1035 self.end.min(other.end),
1036 ))
1037 } else {
1038 None
1039 }
1040 }
1041
1042 fn difference(&self, other: &Self) -> Vec<Self> {
1043 if !self.is_overlapping(other) {
1044 vec![*self]
1045 } else {
1046 let mut ret = Vec::new();
1047 if self.start < other.start && self.end > other.end {
1048 ret.push(Self::new(self.start, other.start));
1049 ret.push(Self::new(other.end, self.end));
1050 } else if self.start < other.start {
1051 ret.push(Self::new(self.start, other.start));
1052 } else if self.end > other.end {
1053 ret.push(Self::new(other.end, self.end));
1054 }
1055 ret
1056 }
1057 }
1058}
1059
1060fn split_range_by(
1062 input_range: &TimeRange,
1063 input_parts: &[usize],
1064 split_by: &TimeRange,
1065 split_idx: usize,
1066) -> Vec<Action> {
1067 let mut ret = Vec::new();
1068 if input_range.is_overlapping(split_by) {
1069 let input_parts = input_parts.to_vec();
1070 let new_parts = {
1071 let mut new_parts = input_parts.clone();
1072 new_parts.push(split_idx);
1073 new_parts
1074 };
1075
1076 ret.push(Action::Pop(*input_range));
1077 if let Some(intersection) = input_range.intersection(split_by) {
1078 ret.push(Action::Push(intersection, new_parts.clone()));
1079 }
1080 for diff in input_range.difference(split_by) {
1081 ret.push(Action::Push(diff, input_parts.clone()));
1082 }
1083 }
1084 ret
1085}
1086
1087#[derive(Debug, Clone, PartialEq, Eq)]
1088enum Action {
1089 Pop(TimeRange),
1090 Push(TimeRange, Vec<usize>),
1091}
1092
1093fn compute_all_working_ranges(
1101 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1102 descending: bool,
1103) -> Vec<(TimeRange, BTreeSet<usize>)> {
1104 let mut ret = Vec::new();
1105 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1106 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1107 Box::new(overlap_counts.iter().rev()) as _
1108 } else {
1109 Box::new(overlap_counts.iter()) as _
1110 };
1111 for (range, set) in overlap_iter {
1112 match &mut cur_range_set {
1113 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1114 Some((working_range, working_set)) => {
1115 let need_expand = {
1120 let last_part = working_set.last();
1121 let inter: BTreeSet<usize> = working_set
1122 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1123 .cloned()
1124 .collect();
1125 if let Some(one) = inter.first()
1126 && inter.len() == 1
1127 && Some(one) == last_part
1128 {
1129 if set.iter().all(|p| Some(p) >= last_part) {
1131 false
1133 } else {
1134 true
1136 }
1137 } else if inter.is_empty() {
1138 false
1140 } else {
1141 true
1143 }
1144 };
1145
1146 if need_expand {
1147 if descending {
1148 working_range.start = range.start;
1149 } else {
1150 working_range.end = range.end;
1151 }
1152 working_set.extend(set.iter().cloned());
1153 } else {
1154 ret.push((*working_range, std::mem::take(working_set)));
1155 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1156 }
1157 }
1158 }
1159 }
1160
1161 if let Some(cur_range_set) = cur_range_set {
1162 ret.push(cur_range_set)
1163 }
1164
1165 ret
1166}
1167
1168fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1171 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1173 for (idx, range) in ranges.iter().enumerate() {
1174 let key: TimeRange = (range.start, range.end).into();
1175 let mut actions = Vec::new();
1176 let mut untouched = vec![key];
1177 let forward_iter = ret
1181 .range(key..)
1182 .take_while(|(range, _)| range.is_overlapping(&key));
1183 let backward_iter = ret
1184 .range(..key)
1185 .rev()
1186 .take_while(|(range, _)| range.is_overlapping(&key));
1187
1188 for (range, parts) in forward_iter.chain(backward_iter) {
1189 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1190 let act = split_range_by(range, parts, &key, idx);
1191 actions.extend(act.into_iter());
1192 }
1193
1194 for action in actions {
1195 match action {
1196 Action::Pop(range) => {
1197 ret.remove(&range);
1198 }
1199 Action::Push(range, parts) => {
1200 ret.insert(range, parts);
1201 }
1202 }
1203 }
1204
1205 for range in untouched {
1207 ret.insert(range, vec![idx]);
1208 }
1209 }
1210 ret
1211}
1212
1213fn get_timestamp_from_idx(
1215 array: &ArrayRef,
1216 offset: usize,
1217) -> datafusion_common::Result<Option<Timestamp>> {
1218 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1219 unit
1220 } else {
1221 return Err(DataFusionError::Internal(format!(
1222 "Unsupported sort column type: {}",
1223 array.data_type()
1224 )));
1225 };
1226 let ty = array.data_type();
1227 let array = array.slice(offset, 1);
1228 let mut iter = downcast_ts_array!(
1229 array.data_type() => (array_iter_helper, array),
1230 _ => internal_err!("Unsupported sort column type: {ty}")?
1231 );
1232 let (_idx, val) = iter.next().ok_or_else(|| {
1233 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1234 })?;
1235 let val = if let Some(val) = val {
1236 val
1237 } else {
1238 return Ok(None);
1239 };
1240 let gt_timestamp = Timestamp::new(val, time_unit.into());
1241 Ok(Some(gt_timestamp))
1242}
1243
1244#[cfg(test)]
1245mod test {
1246 use std::io::Write;
1247 use std::sync::Arc;
1248
1249 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1250 use arrow::compute::concat_batches;
1251 use arrow::json::ArrayWriter;
1252 use arrow_schema::{Field, Schema, TimeUnit};
1253 use futures::StreamExt;
1254 use pretty_assertions::assert_eq;
1255 use serde_json::json;
1256
1257 use super::*;
1258 use crate::test_util::{new_ts_array, MockInputExec};
1259
1260 #[test]
1261 fn test_overlapping() {
1262 let testcases = [
1263 (
1264 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1265 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1266 false,
1267 ),
1268 (
1269 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1270 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1271 true,
1272 ),
1273 (
1274 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1275 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1276 true,
1277 ),
1278 (
1279 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1280 (
1281 Timestamp::new_millisecond(1000),
1282 Timestamp::new_millisecond(1002),
1283 ),
1284 true,
1285 ),
1286 (
1287 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1288 (
1289 Timestamp::new_millisecond(1001),
1290 Timestamp::new_millisecond(1002),
1291 ),
1292 false,
1293 ),
1294 (
1295 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1296 (
1297 Timestamp::new_millisecond(1002),
1298 Timestamp::new_millisecond(1003),
1299 ),
1300 false,
1301 ),
1302 ];
1303
1304 for (range1, range2, expected) in testcases.iter() {
1305 assert_eq!(
1306 TimeRange::from(range1).is_overlapping(&range2.into()),
1307 *expected,
1308 "range1: {:?}, range2: {:?}",
1309 range1,
1310 range2
1311 );
1312 }
1313 }
1314
1315 #[test]
1316 fn test_split() {
1317 let testcases = [
1318 (
1320 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1321 vec![0],
1322 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1323 1,
1324 vec![],
1325 ),
1326 (
1328 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1329 vec![0],
1330 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1331 1,
1332 vec![
1333 Action::Pop(
1334 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1335 ),
1336 Action::Push(
1337 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1338 vec![0, 1],
1339 ),
1340 ],
1341 ),
1342 (
1343 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1344 vec![0],
1345 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1346 1,
1347 vec![
1348 Action::Pop(
1349 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1350 ),
1351 Action::Push(
1352 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1353 vec![0, 1],
1354 ),
1355 ],
1356 ),
1357 (
1358 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1359 vec![0],
1360 (
1361 Timestamp::new_millisecond(1000),
1362 Timestamp::new_millisecond(1002),
1363 ),
1364 1,
1365 vec![
1366 Action::Pop(
1367 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1368 ),
1369 Action::Push(
1370 (
1371 Timestamp::new_millisecond(1000),
1372 Timestamp::new_millisecond(1001),
1373 )
1374 .into(),
1375 vec![0, 1],
1376 ),
1377 ],
1378 ),
1379 (
1381 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1382 vec![0],
1383 (
1384 Timestamp::new_millisecond(1001),
1385 Timestamp::new_millisecond(1002),
1386 ),
1387 1,
1388 vec![
1389 Action::Pop(
1390 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1391 ),
1392 Action::Push(
1393 (
1394 Timestamp::new_millisecond(1001),
1395 Timestamp::new_millisecond(1002),
1396 )
1397 .into(),
1398 vec![0, 1],
1399 ),
1400 Action::Push(
1401 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1402 vec![0],
1403 ),
1404 ],
1405 ),
1406 (
1408 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1409 vec![0],
1410 (
1411 Timestamp::new_millisecond(1001),
1412 Timestamp::new_millisecond(1002),
1413 ),
1414 1,
1415 vec![
1416 Action::Pop(
1417 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1418 ),
1419 Action::Push(
1420 (
1421 Timestamp::new_millisecond(1001),
1422 Timestamp::new_millisecond(1002),
1423 )
1424 .into(),
1425 vec![0, 1],
1426 ),
1427 Action::Push(
1428 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1429 vec![0],
1430 ),
1431 Action::Push(
1432 (
1433 Timestamp::new_millisecond(1002),
1434 Timestamp::new_millisecond(1004),
1435 )
1436 .into(),
1437 vec![0],
1438 ),
1439 ],
1440 ),
1441 ];
1442
1443 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1444 assert_eq!(
1445 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1446 *expected,
1447 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1448 range,
1449 parts,
1450 split_by,
1451 split_idx
1452 );
1453 }
1454 }
1455
1456 #[test]
1457 fn test_compute_working_ranges_rev() {
1458 let testcases = vec![
1459 (
1460 BTreeMap::from([(
1461 (Timestamp::new_second(1), Timestamp::new_second(2)),
1462 vec![0],
1463 )]),
1464 vec![(
1465 (Timestamp::new_second(1), Timestamp::new_second(2)),
1466 BTreeSet::from([0]),
1467 )],
1468 ),
1469 (
1470 BTreeMap::from([(
1471 (Timestamp::new_second(1), Timestamp::new_second(2)),
1472 vec![0, 1],
1473 )]),
1474 vec![(
1475 (Timestamp::new_second(1), Timestamp::new_second(2)),
1476 BTreeSet::from([0, 1]),
1477 )],
1478 ),
1479 (
1480 BTreeMap::from([
1481 (
1482 (Timestamp::new_second(2), Timestamp::new_second(3)),
1483 vec![0],
1484 ),
1485 (
1486 (Timestamp::new_second(1), Timestamp::new_second(2)),
1487 vec![0, 1],
1488 ),
1489 ]),
1490 vec![
1491 (
1492 (Timestamp::new_second(2), Timestamp::new_second(3)),
1493 BTreeSet::from([0]),
1494 ),
1495 (
1496 (Timestamp::new_second(1), Timestamp::new_second(2)),
1497 BTreeSet::from([0, 1]),
1498 ),
1499 ],
1500 ),
1501 (
1502 BTreeMap::from([
1503 (
1504 (Timestamp::new_second(2), Timestamp::new_second(3)),
1505 vec![0, 1],
1506 ),
1507 (
1508 (Timestamp::new_second(1), Timestamp::new_second(2)),
1509 vec![1],
1510 ),
1511 ]),
1512 vec![
1513 (
1514 (Timestamp::new_second(2), Timestamp::new_second(3)),
1515 BTreeSet::from([0, 1]),
1516 ),
1517 (
1518 (Timestamp::new_second(1), Timestamp::new_second(2)),
1519 BTreeSet::from([1]),
1520 ),
1521 ],
1522 ),
1523 (
1524 BTreeMap::from([
1525 (
1526 (Timestamp::new_second(3), Timestamp::new_second(4)),
1527 vec![0],
1528 ),
1529 (
1530 (Timestamp::new_second(2), Timestamp::new_second(3)),
1531 vec![0, 1],
1532 ),
1533 (
1534 (Timestamp::new_second(1), Timestamp::new_second(2)),
1535 vec![1],
1536 ),
1537 ]),
1538 vec![
1539 (
1540 (Timestamp::new_second(3), Timestamp::new_second(4)),
1541 BTreeSet::from([0]),
1542 ),
1543 (
1544 (Timestamp::new_second(2), Timestamp::new_second(3)),
1545 BTreeSet::from([0, 1]),
1546 ),
1547 (
1548 (Timestamp::new_second(1), Timestamp::new_second(2)),
1549 BTreeSet::from([1]),
1550 ),
1551 ],
1552 ),
1553 (
1554 BTreeMap::from([
1555 (
1556 (Timestamp::new_second(3), Timestamp::new_second(4)),
1557 vec![0, 2],
1558 ),
1559 (
1560 (Timestamp::new_second(2), Timestamp::new_second(3)),
1561 vec![0, 1, 2],
1562 ),
1563 (
1564 (Timestamp::new_second(1), Timestamp::new_second(2)),
1565 vec![1, 2],
1566 ),
1567 ]),
1568 vec![(
1569 (Timestamp::new_second(1), Timestamp::new_second(4)),
1570 BTreeSet::from([0, 1, 2]),
1571 )],
1572 ),
1573 (
1574 BTreeMap::from([
1575 (
1576 (Timestamp::new_second(2), Timestamp::new_second(3)),
1577 vec![0, 2],
1578 ),
1579 (
1580 (Timestamp::new_second(1), Timestamp::new_second(2)),
1581 vec![1, 2],
1582 ),
1583 ]),
1584 vec![(
1585 (Timestamp::new_second(1), Timestamp::new_second(3)),
1586 BTreeSet::from([0, 1, 2]),
1587 )],
1588 ),
1589 (
1590 BTreeMap::from([
1591 (
1592 (Timestamp::new_second(3), Timestamp::new_second(4)),
1593 vec![0, 1],
1594 ),
1595 (
1596 (Timestamp::new_second(2), Timestamp::new_second(3)),
1597 vec![0, 1, 2],
1598 ),
1599 (
1600 (Timestamp::new_second(1), Timestamp::new_second(2)),
1601 vec![1, 2],
1602 ),
1603 ]),
1604 vec![(
1605 (Timestamp::new_second(1), Timestamp::new_second(4)),
1606 BTreeSet::from([0, 1, 2]),
1607 )],
1608 ),
1609 (
1610 BTreeMap::from([
1611 (
1612 (Timestamp::new_second(2), Timestamp::new_second(3)),
1613 vec![0, 1],
1614 ),
1615 (
1616 (Timestamp::new_second(1), Timestamp::new_second(2)),
1617 vec![1, 2],
1618 ),
1619 ]),
1620 vec![
1621 (
1622 (Timestamp::new_second(2), Timestamp::new_second(3)),
1623 BTreeSet::from([0, 1]),
1624 ),
1625 (
1626 (Timestamp::new_second(1), Timestamp::new_second(2)),
1627 BTreeSet::from([1, 2]),
1628 ),
1629 ],
1630 ),
1631 (
1633 BTreeMap::from([
1634 (
1635 (Timestamp::new_second(2), Timestamp::new_second(3)),
1636 vec![0],
1637 ),
1638 (
1639 (Timestamp::new_second(1), Timestamp::new_second(2)),
1640 vec![1, 2],
1641 ),
1642 ]),
1643 vec![
1644 (
1645 (Timestamp::new_second(2), Timestamp::new_second(3)),
1646 BTreeSet::from([0]),
1647 ),
1648 (
1649 (Timestamp::new_second(1), Timestamp::new_second(2)),
1650 BTreeSet::from([1, 2]),
1651 ),
1652 ],
1653 ),
1654 ];
1655
1656 for (input, expected) in testcases {
1657 let expected = expected
1658 .into_iter()
1659 .map(|(r, s)| (r.into(), s))
1660 .collect_vec();
1661 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1662 assert_eq!(
1663 compute_all_working_ranges(&input, true),
1664 expected,
1665 "input: {:?}",
1666 input
1667 );
1668 }
1669 }
1670
1671 #[test]
1672 fn test_compute_working_ranges() {
1673 let testcases = vec![
1674 (
1675 BTreeMap::from([(
1676 (Timestamp::new_second(1), Timestamp::new_second(2)),
1677 vec![0],
1678 )]),
1679 vec![(
1680 (Timestamp::new_second(1), Timestamp::new_second(2)),
1681 BTreeSet::from([0]),
1682 )],
1683 ),
1684 (
1685 BTreeMap::from([(
1686 (Timestamp::new_second(1), Timestamp::new_second(2)),
1687 vec![0, 1],
1688 )]),
1689 vec![(
1690 (Timestamp::new_second(1), Timestamp::new_second(2)),
1691 BTreeSet::from([0, 1]),
1692 )],
1693 ),
1694 (
1695 BTreeMap::from([
1696 (
1697 (Timestamp::new_second(1), Timestamp::new_second(2)),
1698 vec![0, 1],
1699 ),
1700 (
1701 (Timestamp::new_second(2), Timestamp::new_second(3)),
1702 vec![1],
1703 ),
1704 ]),
1705 vec![
1706 (
1707 (Timestamp::new_second(1), Timestamp::new_second(2)),
1708 BTreeSet::from([0, 1]),
1709 ),
1710 (
1711 (Timestamp::new_second(2), Timestamp::new_second(3)),
1712 BTreeSet::from([1]),
1713 ),
1714 ],
1715 ),
1716 (
1717 BTreeMap::from([
1718 (
1719 (Timestamp::new_second(1), Timestamp::new_second(2)),
1720 vec![0],
1721 ),
1722 (
1723 (Timestamp::new_second(2), Timestamp::new_second(3)),
1724 vec![0, 1],
1725 ),
1726 ]),
1727 vec![
1728 (
1729 (Timestamp::new_second(1), Timestamp::new_second(2)),
1730 BTreeSet::from([0]),
1731 ),
1732 (
1733 (Timestamp::new_second(2), Timestamp::new_second(3)),
1734 BTreeSet::from([0, 1]),
1735 ),
1736 ],
1737 ),
1738 (
1740 BTreeMap::from([
1741 (
1742 (Timestamp::new_second(1), Timestamp::new_second(2)),
1743 vec![0],
1744 ),
1745 (
1746 (Timestamp::new_second(2), Timestamp::new_second(3)),
1747 vec![0, 1],
1748 ),
1749 (
1750 (Timestamp::new_second(3), Timestamp::new_second(4)),
1751 vec![1],
1752 ),
1753 ]),
1754 vec![
1755 (
1756 (Timestamp::new_second(1), Timestamp::new_second(2)),
1757 BTreeSet::from([0]),
1758 ),
1759 (
1760 (Timestamp::new_second(2), Timestamp::new_second(3)),
1761 BTreeSet::from([0, 1]),
1762 ),
1763 (
1764 (Timestamp::new_second(3), Timestamp::new_second(4)),
1765 BTreeSet::from([1]),
1766 ),
1767 ],
1768 ),
1769 (
1770 BTreeMap::from([
1771 (
1772 (Timestamp::new_second(1), Timestamp::new_second(2)),
1773 vec![0, 2],
1774 ),
1775 (
1776 (Timestamp::new_second(2), Timestamp::new_second(3)),
1777 vec![0, 1, 2],
1778 ),
1779 (
1780 (Timestamp::new_second(3), Timestamp::new_second(4)),
1781 vec![1, 2],
1782 ),
1783 ]),
1784 vec![(
1785 (Timestamp::new_second(1), Timestamp::new_second(4)),
1786 BTreeSet::from([0, 1, 2]),
1787 )],
1788 ),
1789 (
1790 BTreeMap::from([
1791 (
1792 (Timestamp::new_second(1), Timestamp::new_second(2)),
1793 vec![0, 2],
1794 ),
1795 (
1796 (Timestamp::new_second(2), Timestamp::new_second(3)),
1797 vec![1, 2],
1798 ),
1799 ]),
1800 vec![(
1801 (Timestamp::new_second(1), Timestamp::new_second(3)),
1802 BTreeSet::from([0, 1, 2]),
1803 )],
1804 ),
1805 (
1806 BTreeMap::from([
1807 (
1808 (Timestamp::new_second(1), Timestamp::new_second(2)),
1809 vec![0, 1],
1810 ),
1811 (
1812 (Timestamp::new_second(2), Timestamp::new_second(3)),
1813 vec![0, 1, 2],
1814 ),
1815 (
1816 (Timestamp::new_second(3), Timestamp::new_second(4)),
1817 vec![1, 2],
1818 ),
1819 ]),
1820 vec![(
1821 (Timestamp::new_second(1), Timestamp::new_second(4)),
1822 BTreeSet::from([0, 1, 2]),
1823 )],
1824 ),
1825 (
1826 BTreeMap::from([
1827 (
1828 (Timestamp::new_second(1), Timestamp::new_second(2)),
1829 vec![0, 1],
1830 ),
1831 (
1832 (Timestamp::new_second(2), Timestamp::new_second(3)),
1833 vec![1, 2],
1834 ),
1835 ]),
1836 vec![
1837 (
1838 (Timestamp::new_second(1), Timestamp::new_second(2)),
1839 BTreeSet::from([0, 1]),
1840 ),
1841 (
1842 (Timestamp::new_second(2), Timestamp::new_second(3)),
1843 BTreeSet::from([1, 2]),
1844 ),
1845 ],
1846 ),
1847 (
1849 BTreeMap::from([
1850 (
1851 (Timestamp::new_second(1), Timestamp::new_second(2)),
1852 vec![0, 1],
1853 ),
1854 (
1855 (Timestamp::new_second(2), Timestamp::new_second(3)),
1856 vec![2],
1857 ),
1858 ]),
1859 vec![
1860 (
1861 (Timestamp::new_second(1), Timestamp::new_second(2)),
1862 BTreeSet::from([0, 1]),
1863 ),
1864 (
1865 (Timestamp::new_second(2), Timestamp::new_second(3)),
1866 BTreeSet::from([2]),
1867 ),
1868 ],
1869 ),
1870 ];
1871
1872 for (input, expected) in testcases {
1873 let expected = expected
1874 .into_iter()
1875 .map(|(r, s)| (r.into(), s))
1876 .collect_vec();
1877 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1878 assert_eq!(
1879 compute_all_working_ranges(&input, false),
1880 expected,
1881 "input: {:?}",
1882 input
1883 );
1884 }
1885 }
1886
1887 #[test]
1888 fn test_split_overlap_range() {
1889 let testcases = vec![
1890 (
1892 vec![PartitionRange {
1893 start: Timestamp::new_second(1),
1894 end: Timestamp::new_second(2),
1895 num_rows: 2,
1896 identifier: 0,
1897 }],
1898 BTreeMap::from_iter(
1899 vec![(
1900 (Timestamp::new_second(1), Timestamp::new_second(2)),
1901 vec![0],
1902 )]
1903 .into_iter(),
1904 ),
1905 ),
1906 (
1908 vec![
1909 PartitionRange {
1910 start: Timestamp::new_second(1),
1911 end: Timestamp::new_second(2),
1912 num_rows: 2,
1913 identifier: 0,
1914 },
1915 PartitionRange {
1916 start: Timestamp::new_second(1),
1917 end: Timestamp::new_second(2),
1918 num_rows: 2,
1919 identifier: 1,
1920 },
1921 ],
1922 BTreeMap::from_iter(
1923 vec![(
1924 (Timestamp::new_second(1), Timestamp::new_second(2)),
1925 vec![0, 1],
1926 )]
1927 .into_iter(),
1928 ),
1929 ),
1930 (
1931 vec![
1932 PartitionRange {
1933 start: Timestamp::new_second(1),
1934 end: Timestamp::new_second(3),
1935 num_rows: 2,
1936 identifier: 0,
1937 },
1938 PartitionRange {
1939 start: Timestamp::new_second(2),
1940 end: Timestamp::new_second(4),
1941 num_rows: 2,
1942 identifier: 1,
1943 },
1944 ],
1945 BTreeMap::from_iter(
1946 vec![
1947 (
1948 (Timestamp::new_second(1), Timestamp::new_second(2)),
1949 vec![0],
1950 ),
1951 (
1952 (Timestamp::new_second(2), Timestamp::new_second(3)),
1953 vec![0, 1],
1954 ),
1955 (
1956 (Timestamp::new_second(3), Timestamp::new_second(4)),
1957 vec![1],
1958 ),
1959 ]
1960 .into_iter(),
1961 ),
1962 ),
1963 (
1965 vec![
1966 PartitionRange {
1967 start: Timestamp::new_second(1),
1968 end: Timestamp::new_second(3),
1969 num_rows: 2,
1970 identifier: 0,
1971 },
1972 PartitionRange {
1973 start: Timestamp::new_second(2),
1974 end: Timestamp::new_second(4),
1975 num_rows: 2,
1976 identifier: 1,
1977 },
1978 PartitionRange {
1979 start: Timestamp::new_second(1),
1980 end: Timestamp::new_second(4),
1981 num_rows: 2,
1982 identifier: 2,
1983 },
1984 ],
1985 BTreeMap::from_iter(
1986 vec![
1987 (
1988 (Timestamp::new_second(1), Timestamp::new_second(2)),
1989 vec![0, 2],
1990 ),
1991 (
1992 (Timestamp::new_second(2), Timestamp::new_second(3)),
1993 vec![0, 1, 2],
1994 ),
1995 (
1996 (Timestamp::new_second(3), Timestamp::new_second(4)),
1997 vec![1, 2],
1998 ),
1999 ]
2000 .into_iter(),
2001 ),
2002 ),
2003 (
2004 vec![
2005 PartitionRange {
2006 start: Timestamp::new_second(1),
2007 end: Timestamp::new_second(3),
2008 num_rows: 2,
2009 identifier: 0,
2010 },
2011 PartitionRange {
2012 start: Timestamp::new_second(1),
2013 end: Timestamp::new_second(4),
2014 num_rows: 2,
2015 identifier: 1,
2016 },
2017 PartitionRange {
2018 start: Timestamp::new_second(2),
2019 end: Timestamp::new_second(4),
2020 num_rows: 2,
2021 identifier: 2,
2022 },
2023 ],
2024 BTreeMap::from_iter(
2025 vec![
2026 (
2027 (Timestamp::new_second(1), Timestamp::new_second(2)),
2028 vec![0, 1],
2029 ),
2030 (
2031 (Timestamp::new_second(2), Timestamp::new_second(3)),
2032 vec![0, 1, 2],
2033 ),
2034 (
2035 (Timestamp::new_second(3), Timestamp::new_second(4)),
2036 vec![1, 2],
2037 ),
2038 ]
2039 .into_iter(),
2040 ),
2041 ),
2042 ];
2043
2044 for (input, expected) in testcases {
2045 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2046 assert_eq!(split_overlapping_ranges(&input), expected);
2047 }
2048 }
2049
2050 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2051 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2052 Self {
2053 offset: offset as usize,
2054 len: len as usize,
2055 first_val: min_val,
2056 last_val: max_val,
2057 }
2058 }
2059 }
2060
2061 #[test]
2062 fn test_find_successive_runs() {
2063 let testcases = vec![
2064 (
2065 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2066 Some(SortOptions {
2067 descending: false,
2068 nulls_first: false,
2069 }),
2070 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2071 ),
2072 (
2073 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2074 Some(SortOptions {
2075 descending: false,
2076 nulls_first: false,
2077 }),
2078 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2079 ),
2080 (
2081 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2082 Some(SortOptions {
2083 descending: false,
2084 nulls_first: false,
2085 }),
2086 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2087 ),
2088 (
2089 vec![Some(1), Some(2), Some(1), Some(3)],
2090 Some(SortOptions {
2091 descending: false,
2092 nulls_first: false,
2093 }),
2094 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2095 ),
2096 (
2097 vec![Some(1), Some(2), Some(1), Some(3)],
2098 Some(SortOptions {
2099 descending: true,
2100 nulls_first: false,
2101 }),
2102 vec![
2103 (0, 1, Some(1), Some(1)),
2104 (1, 2, Some(2), Some(1)),
2105 (3, 1, Some(3), Some(3)),
2106 ],
2107 ),
2108 (
2109 vec![Some(1), Some(2), None, Some(3)],
2110 Some(SortOptions {
2111 descending: false,
2112 nulls_first: true,
2113 }),
2114 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2115 ),
2116 (
2117 vec![Some(1), Some(2), None, Some(3)],
2118 Some(SortOptions {
2119 descending: false,
2120 nulls_first: false,
2121 }),
2122 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2123 ),
2124 (
2125 vec![Some(2), Some(1), None, Some(3)],
2126 Some(SortOptions {
2127 descending: true,
2128 nulls_first: true,
2129 }),
2130 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2131 ),
2132 (
2133 vec![],
2134 Some(SortOptions {
2135 descending: false,
2136 nulls_first: true,
2137 }),
2138 vec![(0, 0, None, None)],
2139 ),
2140 (
2141 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2142 Some(SortOptions {
2143 descending: true,
2144 nulls_first: true,
2145 }),
2146 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2147 ),
2148 (
2149 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2150 Some(SortOptions {
2151 descending: true,
2152 nulls_first: false,
2153 }),
2154 vec![
2155 (0, 2, None, None),
2156 (2, 3, Some(2), Some(1)),
2157 (5, 2, Some(5), Some(4)),
2158 ],
2159 ),
2160 ];
2161 for (input, sort_opts, expected) in testcases {
2162 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2163 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2164 assert_eq!(
2165 ret, expected,
2166 "input: {:?}, opt: {:?},expected: {:?}",
2167 input, sort_opts, expected
2168 );
2169 }
2170 }
2171
2172 #[test]
2173 fn test_cmp_with_opts() {
2174 let testcases = vec![
2175 (
2176 Some(1),
2177 Some(2),
2178 Some(SortOptions {
2179 descending: false,
2180 nulls_first: false,
2181 }),
2182 std::cmp::Ordering::Less,
2183 ),
2184 (
2185 Some(1),
2186 Some(2),
2187 Some(SortOptions {
2188 descending: true,
2189 nulls_first: false,
2190 }),
2191 std::cmp::Ordering::Greater,
2192 ),
2193 (
2194 Some(1),
2195 None,
2196 Some(SortOptions {
2197 descending: false,
2198 nulls_first: true,
2199 }),
2200 std::cmp::Ordering::Greater,
2201 ),
2202 (
2203 Some(1),
2204 None,
2205 Some(SortOptions {
2206 descending: true,
2207 nulls_first: true,
2208 }),
2209 std::cmp::Ordering::Greater,
2210 ),
2211 (
2212 Some(1),
2213 None,
2214 Some(SortOptions {
2215 descending: true,
2216 nulls_first: false,
2217 }),
2218 std::cmp::Ordering::Less,
2219 ),
2220 (
2221 Some(1),
2222 None,
2223 Some(SortOptions {
2224 descending: false,
2225 nulls_first: false,
2226 }),
2227 std::cmp::Ordering::Less,
2228 ),
2229 (
2230 None,
2231 None,
2232 Some(SortOptions {
2233 descending: true,
2234 nulls_first: true,
2235 }),
2236 std::cmp::Ordering::Equal,
2237 ),
2238 (
2239 None,
2240 None,
2241 Some(SortOptions {
2242 descending: false,
2243 nulls_first: true,
2244 }),
2245 std::cmp::Ordering::Equal,
2246 ),
2247 (
2248 None,
2249 None,
2250 Some(SortOptions {
2251 descending: true,
2252 nulls_first: false,
2253 }),
2254 std::cmp::Ordering::Equal,
2255 ),
2256 (
2257 None,
2258 None,
2259 Some(SortOptions {
2260 descending: false,
2261 nulls_first: false,
2262 }),
2263 std::cmp::Ordering::Equal,
2264 ),
2265 ];
2266 for (a, b, opts, expected) in testcases {
2267 assert_eq!(
2268 cmp_with_opts(&a, &b, &opts),
2269 expected,
2270 "a: {:?}, b: {:?}, opts: {:?}",
2271 a,
2272 b,
2273 opts
2274 );
2275 }
2276 }
2277
2278 #[test]
2279 fn test_find_slice_from_range() {
2280 let test_cases = vec![
2281 (
2283 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2284 false,
2285 TimeRange {
2286 start: Timestamp::new_millisecond(2),
2287 end: Timestamp::new_millisecond(4),
2288 },
2289 Ok((1, 2)),
2290 ),
2291 (
2292 Arc::new(TimestampMillisecondArray::from_iter_values([
2293 -2, -1, 0, 1, 2, 3, 4, 5,
2294 ])) as ArrayRef,
2295 false,
2296 TimeRange {
2297 start: Timestamp::new_millisecond(-1),
2298 end: Timestamp::new_millisecond(4),
2299 },
2300 Ok((1, 5)),
2301 ),
2302 (
2303 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2304 false,
2305 TimeRange {
2306 start: Timestamp::new_millisecond(2),
2307 end: Timestamp::new_millisecond(5),
2308 },
2309 Ok((1, 2)),
2310 ),
2311 (
2312 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2313 false,
2314 TimeRange {
2315 start: Timestamp::new_millisecond(2),
2316 end: Timestamp::new_millisecond(5),
2317 },
2318 Ok((1, 3)),
2319 ),
2320 (
2321 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2322 false,
2323 TimeRange {
2324 start: Timestamp::new_millisecond(2),
2325 end: Timestamp::new_millisecond(5),
2326 },
2327 Ok((1, 2)),
2328 ),
2329 (
2330 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2331 false,
2332 TimeRange {
2333 start: Timestamp::new_millisecond(6),
2334 end: Timestamp::new_millisecond(7),
2335 },
2336 Ok((5, 0)),
2337 ),
2338 (
2340 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2341 true,
2342 TimeRange {
2343 end: Timestamp::new_millisecond(4),
2344 start: Timestamp::new_millisecond(1),
2345 },
2346 Ok((1, 3)),
2347 ),
2348 (
2349 Arc::new(TimestampMillisecondArray::from_iter_values([
2350 5, 4, 3, 2, 1, 0,
2351 ])) as ArrayRef,
2352 true,
2353 TimeRange {
2354 end: Timestamp::new_millisecond(4),
2355 start: Timestamp::new_millisecond(1),
2356 },
2357 Ok((2, 3)),
2358 ),
2359 (
2360 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2361 true,
2362 TimeRange {
2363 end: Timestamp::new_millisecond(4),
2364 start: Timestamp::new_millisecond(1),
2365 },
2366 Ok((1, 2)),
2367 ),
2368 (
2369 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2370 true,
2371 TimeRange {
2372 end: Timestamp::new_millisecond(4),
2373 start: Timestamp::new_millisecond(1),
2374 },
2375 Ok((2, 2)),
2376 ),
2377 (
2378 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2379 true,
2380 TimeRange {
2381 end: Timestamp::new_millisecond(5),
2382 start: Timestamp::new_millisecond(2),
2383 },
2384 Ok((1, 3)),
2385 ),
2386 (
2387 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2388 true,
2389 TimeRange {
2390 end: Timestamp::new_millisecond(5),
2391 start: Timestamp::new_millisecond(2),
2392 },
2393 Ok((1, 2)),
2394 ),
2395 (
2396 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2397 true,
2398 TimeRange {
2399 end: Timestamp::new_millisecond(5),
2400 start: Timestamp::new_millisecond(2),
2401 },
2402 Ok((1, 3)),
2403 ),
2404 (
2405 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2406 true,
2407 TimeRange {
2408 end: Timestamp::new_millisecond(5),
2409 start: Timestamp::new_millisecond(2),
2410 },
2411 Ok((1, 2)),
2412 ),
2413 (
2414 Arc::new(TimestampMillisecondArray::from_iter_values([
2415 10, 9, 8, 7, 6,
2416 ])) as ArrayRef,
2417 true,
2418 TimeRange {
2419 end: Timestamp::new_millisecond(5),
2420 start: Timestamp::new_millisecond(2),
2421 },
2422 Ok((5, 0)),
2423 ),
2424 (
2426 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2427 true,
2428 TimeRange {
2429 end: Timestamp::new_millisecond(4),
2430 start: Timestamp::new_millisecond(3),
2431 },
2432 Ok((0, 1)),
2433 ),
2434 (
2435 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2436 true,
2437 TimeRange {
2438 end: Timestamp::new_millisecond(4),
2439 start: Timestamp::new_millisecond(3),
2440 },
2441 Ok((1, 1)),
2442 ),
2443 (
2444 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2445 true,
2446 TimeRange {
2447 end: Timestamp::new_millisecond(4),
2448 start: Timestamp::new_millisecond(3),
2449 },
2450 Ok((2, 1)),
2451 ),
2452 ];
2453
2454 for (sort_vals, descending, range, expected) in test_cases {
2455 let sort_column = SortColumn {
2456 values: sort_vals,
2457 options: Some(SortOptions {
2458 descending,
2459 ..Default::default()
2460 }),
2461 };
2462 let ret = find_slice_from_range(&sort_column, &range);
2463 match (ret, expected) {
2464 (Ok(ret), Ok(expected)) => {
2465 assert_eq!(
2466 ret, expected,
2467 "sort_vals: {:?}, range: {:?}",
2468 sort_column, range
2469 )
2470 }
2471 (Err(err), Err(expected)) => {
2472 let expected: &str = expected;
2473 assert!(
2474 err.to_string().contains(expected),
2475 "err: {:?}, expected: {:?}",
2476 err,
2477 expected
2478 );
2479 }
2480 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2481 }
2482 }
2483 }
2484
2485 #[derive(Debug)]
2486 struct TestStream {
2487 expression: PhysicalSortExpr,
2488 fetch: Option<usize>,
2489 input: Vec<(PartitionRange, DfRecordBatch)>,
2490 output: Vec<DfRecordBatch>,
2491 schema: SchemaRef,
2492 }
2493 use datafusion::physical_plan::expressions::Column;
2494 impl TestStream {
2495 fn new(
2496 ts_col: Column,
2497 opt: SortOptions,
2498 fetch: Option<usize>,
2499 schema: impl Into<arrow_schema::Fields>,
2500 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2501 expected: Vec<Vec<ArrayRef>>,
2502 ) -> Self {
2503 let expression = PhysicalSortExpr {
2504 expr: Arc::new(ts_col),
2505 options: opt,
2506 };
2507 let schema = Schema::new(schema.into());
2508 let schema = Arc::new(schema);
2509 let input = input
2510 .into_iter()
2511 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2512 .collect_vec();
2513 let output_batchs = expected
2514 .into_iter()
2515 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2516 .collect_vec();
2517 Self {
2518 expression,
2519 fetch,
2520 input,
2521 output: output_batchs,
2522 schema,
2523 }
2524 }
2525
2526 async fn run_test(&self) -> Vec<DfRecordBatch> {
2527 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2528
2529 let mock_input = MockInputExec::new(batches, self.schema.clone());
2530
2531 let exec = WindowedSortExec::try_new(
2532 self.expression.clone(),
2533 self.fetch,
2534 vec![ranges],
2535 Arc::new(mock_input),
2536 )
2537 .unwrap();
2538
2539 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2540
2541 let real_output = exec_stream.collect::<Vec<_>>().await;
2542 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2543 real_output
2544 }
2545 }
2546
2547 #[tokio::test]
2548 async fn test_window_sort_stream() {
2549 let test_cases = [
2550 TestStream::new(
2551 Column::new("ts", 0),
2552 SortOptions {
2553 descending: false,
2554 nulls_first: true,
2555 },
2556 None,
2557 vec![Field::new(
2558 "ts",
2559 DataType::Timestamp(TimeUnit::Millisecond, None),
2560 false,
2561 )],
2562 vec![],
2563 vec![],
2564 ),
2565 TestStream::new(
2566 Column::new("ts", 0),
2567 SortOptions {
2568 descending: false,
2569 nulls_first: true,
2570 },
2571 None,
2572 vec![Field::new(
2573 "ts",
2574 DataType::Timestamp(TimeUnit::Millisecond, None),
2575 false,
2576 )],
2577 vec![
2578 (
2580 PartitionRange {
2581 start: Timestamp::new_millisecond(1),
2582 end: Timestamp::new_millisecond(2),
2583 num_rows: 1,
2584 identifier: 0,
2585 },
2586 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2587 ),
2588 (
2589 PartitionRange {
2590 start: Timestamp::new_millisecond(1),
2591 end: Timestamp::new_millisecond(3),
2592 num_rows: 1,
2593 identifier: 0,
2594 },
2595 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2596 ),
2597 ],
2598 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2599 [2],
2600 ))]],
2601 ),
2602 TestStream::new(
2603 Column::new("ts", 0),
2604 SortOptions {
2605 descending: false,
2606 nulls_first: true,
2607 },
2608 None,
2609 vec![Field::new(
2610 "ts",
2611 DataType::Timestamp(TimeUnit::Millisecond, None),
2612 false,
2613 )],
2614 vec![
2615 (
2617 PartitionRange {
2618 start: Timestamp::new_millisecond(1),
2619 end: Timestamp::new_millisecond(2),
2620 num_rows: 1,
2621 identifier: 0,
2622 },
2623 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2624 ),
2625 (
2626 PartitionRange {
2627 start: Timestamp::new_millisecond(1),
2628 end: Timestamp::new_millisecond(3),
2629 num_rows: 1,
2630 identifier: 0,
2631 },
2632 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2633 ),
2634 ],
2635 vec![],
2636 ),
2637 TestStream::new(
2638 Column::new("ts", 0),
2639 SortOptions {
2640 descending: false,
2641 nulls_first: true,
2642 },
2643 None,
2644 vec![Field::new(
2645 "ts",
2646 DataType::Timestamp(TimeUnit::Millisecond, None),
2647 false,
2648 )],
2649 vec![
2650 (
2653 PartitionRange {
2654 start: Timestamp::new_millisecond(1),
2655 end: Timestamp::new_millisecond(2),
2656 num_rows: 1,
2657 identifier: 0,
2658 },
2659 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2660 ),
2661 (
2662 PartitionRange {
2663 start: Timestamp::new_millisecond(1),
2664 end: Timestamp::new_millisecond(3),
2665 num_rows: 1,
2666 identifier: 0,
2667 },
2668 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2669 ),
2670 ],
2671 vec![
2672 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2673 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2674 ],
2675 ),
2676 TestStream::new(
2677 Column::new("ts", 0),
2678 SortOptions {
2679 descending: false,
2680 nulls_first: true,
2681 },
2682 None,
2683 vec![Field::new(
2684 "ts",
2685 DataType::Timestamp(TimeUnit::Millisecond, None),
2686 false,
2687 )],
2688 vec![
2689 (
2691 PartitionRange {
2692 start: Timestamp::new_millisecond(1),
2693 end: Timestamp::new_millisecond(3),
2694 num_rows: 1,
2695 identifier: 0,
2696 },
2697 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2698 1, 2,
2699 ]))],
2700 ),
2701 (
2702 PartitionRange {
2703 start: Timestamp::new_millisecond(1),
2704 end: Timestamp::new_millisecond(4),
2705 num_rows: 1,
2706 identifier: 0,
2707 },
2708 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2709 2, 3,
2710 ]))],
2711 ),
2712 ],
2713 vec![
2714 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2715 1, 2,
2716 ]))],
2717 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2719 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2720 ],
2721 ),
2722 TestStream::new(
2723 Column::new("ts", 0),
2724 SortOptions {
2725 descending: false,
2726 nulls_first: true,
2727 },
2728 None,
2729 vec![Field::new(
2730 "ts",
2731 DataType::Timestamp(TimeUnit::Millisecond, None),
2732 false,
2733 )],
2734 vec![
2735 (
2737 PartitionRange {
2738 start: Timestamp::new_millisecond(1),
2739 end: Timestamp::new_millisecond(3),
2740 num_rows: 1,
2741 identifier: 0,
2742 },
2743 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2744 1, 2,
2745 ]))],
2746 ),
2747 (
2748 PartitionRange {
2749 start: Timestamp::new_millisecond(1),
2750 end: Timestamp::new_millisecond(4),
2751 num_rows: 1,
2752 identifier: 1,
2753 },
2754 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2755 1, 2, 3,
2756 ]))],
2757 ),
2758 ],
2759 vec![
2760 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2761 1, 1, 2, 2,
2762 ]))],
2763 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2764 ],
2765 ),
2766 TestStream::new(
2767 Column::new("ts", 0),
2768 SortOptions {
2769 descending: false,
2770 nulls_first: true,
2771 },
2772 None,
2773 vec![Field::new(
2774 "ts",
2775 DataType::Timestamp(TimeUnit::Millisecond, None),
2776 false,
2777 )],
2778 vec![
2779 (
2781 PartitionRange {
2782 start: Timestamp::new_millisecond(1),
2783 end: Timestamp::new_millisecond(3),
2784 num_rows: 1,
2785 identifier: 0,
2786 },
2787 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2788 1, 2,
2789 ]))],
2790 ),
2791 (
2792 PartitionRange {
2793 start: Timestamp::new_millisecond(1),
2794 end: Timestamp::new_millisecond(4),
2795 num_rows: 1,
2796 identifier: 1,
2797 },
2798 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2799 1, 2, 3,
2800 ]))],
2801 ),
2802 (
2803 PartitionRange {
2804 start: Timestamp::new_millisecond(4),
2805 end: Timestamp::new_millisecond(6),
2806 num_rows: 1,
2807 identifier: 1,
2808 },
2809 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2810 4, 5,
2811 ]))],
2812 ),
2813 ],
2814 vec![
2815 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2816 1, 1, 2, 2,
2817 ]))],
2818 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2819 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2820 4, 5,
2821 ]))],
2822 ],
2823 ),
2824 TestStream::new(
2826 Column::new("ts", 0),
2827 SortOptions {
2828 descending: false,
2829 nulls_first: true,
2830 },
2831 Some(6),
2832 vec![Field::new(
2833 "ts",
2834 DataType::Timestamp(TimeUnit::Millisecond, None),
2835 false,
2836 )],
2837 vec![
2838 (
2840 PartitionRange {
2841 start: Timestamp::new_millisecond(1),
2842 end: Timestamp::new_millisecond(3),
2843 num_rows: 1,
2844 identifier: 0,
2845 },
2846 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2847 1, 2,
2848 ]))],
2849 ),
2850 (
2851 PartitionRange {
2852 start: Timestamp::new_millisecond(1),
2853 end: Timestamp::new_millisecond(4),
2854 num_rows: 1,
2855 identifier: 1,
2856 },
2857 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2858 1, 2, 3,
2859 ]))],
2860 ),
2861 (
2862 PartitionRange {
2863 start: Timestamp::new_millisecond(3),
2864 end: Timestamp::new_millisecond(6),
2865 num_rows: 1,
2866 identifier: 1,
2867 },
2868 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2869 4, 5,
2870 ]))],
2871 ),
2872 ],
2873 vec![
2874 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2875 1, 1, 2, 2,
2876 ]))],
2877 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2878 vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2879 ],
2880 ),
2881 TestStream::new(
2882 Column::new("ts", 0),
2883 SortOptions {
2884 descending: false,
2885 nulls_first: true,
2886 },
2887 Some(3),
2888 vec![Field::new(
2889 "ts",
2890 DataType::Timestamp(TimeUnit::Millisecond, None),
2891 false,
2892 )],
2893 vec![
2894 (
2896 PartitionRange {
2897 start: Timestamp::new_millisecond(1),
2898 end: Timestamp::new_millisecond(3),
2899 num_rows: 1,
2900 identifier: 0,
2901 },
2902 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2903 1, 2,
2904 ]))],
2905 ),
2906 (
2907 PartitionRange {
2908 start: Timestamp::new_millisecond(1),
2909 end: Timestamp::new_millisecond(4),
2910 num_rows: 1,
2911 identifier: 1,
2912 },
2913 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2914 1, 2, 3,
2915 ]))],
2916 ),
2917 (
2918 PartitionRange {
2919 start: Timestamp::new_millisecond(3),
2920 end: Timestamp::new_millisecond(6),
2921 num_rows: 1,
2922 identifier: 1,
2923 },
2924 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2925 4, 5,
2926 ]))],
2927 ),
2928 ],
2929 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2930 [1, 1, 2],
2931 ))]],
2932 ),
2933 TestStream::new(
2935 Column::new("ts", 0),
2936 SortOptions {
2937 descending: true,
2938 nulls_first: true,
2939 },
2940 None,
2941 vec![Field::new(
2942 "ts",
2943 DataType::Timestamp(TimeUnit::Millisecond, None),
2944 false,
2945 )],
2946 vec![
2947 (
2949 PartitionRange {
2950 start: Timestamp::new_millisecond(3),
2951 end: Timestamp::new_millisecond(6),
2952 num_rows: 1,
2953 identifier: 1,
2954 },
2955 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2956 5, 4,
2957 ]))],
2958 ),
2959 (
2960 PartitionRange {
2961 start: Timestamp::new_millisecond(1),
2962 end: Timestamp::new_millisecond(4),
2963 num_rows: 1,
2964 identifier: 1,
2965 },
2966 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2967 3, 2, 1,
2968 ]))],
2969 ),
2970 (
2971 PartitionRange {
2972 start: Timestamp::new_millisecond(1),
2973 end: Timestamp::new_millisecond(3),
2974 num_rows: 1,
2975 identifier: 0,
2976 },
2977 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2978 2, 1,
2979 ]))],
2980 ),
2981 ],
2982 vec![
2983 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2984 5, 4,
2985 ]))],
2986 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2987 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2988 2, 2, 1, 1,
2989 ]))],
2990 ],
2991 ),
2992 TestStream::new(
2993 Column::new("ts", 0),
2994 SortOptions {
2995 descending: false,
2996 nulls_first: true,
2997 },
2998 None,
2999 vec![Field::new(
3000 "ts",
3001 DataType::Timestamp(TimeUnit::Millisecond, None),
3002 false,
3003 )],
3004 vec![
3005 (
3007 PartitionRange {
3008 start: Timestamp::new_millisecond(1),
3009 end: Timestamp::new_millisecond(10),
3010 num_rows: 1,
3011 identifier: 0,
3012 },
3013 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3014 1, 5, 9,
3015 ]))],
3016 ),
3017 (
3018 PartitionRange {
3019 start: Timestamp::new_millisecond(3),
3020 end: Timestamp::new_millisecond(7),
3021 num_rows: 1,
3022 identifier: 1,
3023 },
3024 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3025 3, 4, 5, 6,
3026 ]))],
3027 ),
3028 ],
3029 vec![
3030 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3031 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3032 3, 4, 5, 5, 6, 9,
3033 ]))],
3034 ],
3035 ),
3036 TestStream::new(
3037 Column::new("ts", 0),
3038 SortOptions {
3039 descending: false,
3040 nulls_first: true,
3041 },
3042 None,
3043 vec![Field::new(
3044 "ts",
3045 DataType::Timestamp(TimeUnit::Millisecond, None),
3046 false,
3047 )],
3048 vec![
3049 (
3051 PartitionRange {
3052 start: Timestamp::new_millisecond(1),
3053 end: Timestamp::new_millisecond(3),
3054 num_rows: 1,
3055 identifier: 0,
3056 },
3057 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3058 1, 2,
3059 ]))],
3060 ),
3061 (
3062 PartitionRange {
3063 start: Timestamp::new_millisecond(1),
3064 end: Timestamp::new_millisecond(10),
3065 num_rows: 1,
3066 identifier: 1,
3067 },
3068 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3069 1, 3, 4, 5, 6, 8,
3070 ]))],
3071 ),
3072 (
3073 PartitionRange {
3074 start: Timestamp::new_millisecond(7),
3075 end: Timestamp::new_millisecond(10),
3076 num_rows: 1,
3077 identifier: 1,
3078 },
3079 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3080 7, 8, 9,
3081 ]))],
3082 ),
3083 ],
3084 vec![
3085 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3086 1, 1, 2,
3087 ]))],
3088 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3089 3, 4, 5, 6,
3090 ]))],
3091 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3092 7, 8, 8, 9,
3093 ]))],
3094 ],
3095 ),
3096 TestStream::new(
3097 Column::new("ts", 0),
3098 SortOptions {
3099 descending: false,
3100 nulls_first: true,
3101 },
3102 None,
3103 vec![Field::new(
3104 "ts",
3105 DataType::Timestamp(TimeUnit::Millisecond, None),
3106 false,
3107 )],
3108 vec![
3109 (
3111 PartitionRange {
3112 start: Timestamp::new_millisecond(1),
3113 end: Timestamp::new_millisecond(11),
3114 num_rows: 1,
3115 identifier: 0,
3116 },
3117 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3118 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3119 ]))],
3120 ),
3121 (
3122 PartitionRange {
3123 start: Timestamp::new_millisecond(5),
3124 end: Timestamp::new_millisecond(7),
3125 num_rows: 1,
3126 identifier: 1,
3127 },
3128 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3129 5, 6,
3130 ]))],
3131 ),
3132 ],
3133 vec![
3134 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3135 1, 2, 3, 4,
3136 ]))],
3137 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3138 5, 5, 6, 6, 7, 8, 9, 10,
3139 ]))],
3140 ],
3141 ),
3142 ];
3143
3144 let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3145
3146 for (idx, testcase) in &indexed_test_cases {
3147 let output = testcase.run_test().await;
3148 assert_eq!(output, testcase.output, "case {idx} failed.");
3149 }
3150 }
3151
3152 #[tokio::test]
3153 async fn fuzzy_ish_test_window_sort_stream() {
3154 let test_cnt = 100;
3155 let part_cnt_bound = 100;
3156 let range_size_bound = 100;
3157 let range_offset_bound = 100;
3158 let in_range_datapoint_cnt_bound = 100;
3159 let fetch_bound = 100;
3160
3161 let mut rng = fastrand::Rng::new();
3162 let rng_seed = rng.u64(..);
3163 rng.seed(rng_seed);
3164 let mut bound_val = None;
3165 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3167 let mut full_testcase_list = Vec::new();
3168 for _case_id in 0..test_cnt {
3169 let descending = rng.bool();
3170 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3171 if descending {
3172 return Box::new(|a: &T, b: &T| b.cmp(a));
3173 }
3174 Box::new(|a: &T, b: &T| a.cmp(b))
3175 }
3176 let unit = match rng.u8(0..3) {
3177 0 => TimeUnit::Second,
3178 1 => TimeUnit::Millisecond,
3179 2 => TimeUnit::Microsecond,
3180 _ => TimeUnit::Nanosecond,
3181 };
3182 let fetch = if rng.bool() {
3183 Some(rng.usize(0..fetch_bound))
3184 } else {
3185 None
3186 };
3187
3188 let mut input_ranged_data = vec![];
3189 let mut output_data: Vec<i64> = vec![];
3190 for part_id in 0..rng.usize(0..part_cnt_bound) {
3192 let (start, end) = if descending {
3193 let end = bound_val
3194 .map(|i| i - rng.i64(0..range_offset_bound))
3195 .unwrap_or_else(|| rng.i64(..));
3196 bound_val = Some(end);
3197 let start = end - rng.i64(1..range_size_bound);
3198 let start = Timestamp::new(start, unit.into());
3199 let end = Timestamp::new(end, unit.into());
3200 (start, end)
3201 } else {
3202 let start = bound_val
3203 .map(|i| i + rng.i64(0..range_offset_bound))
3204 .unwrap_or_else(|| rng.i64(..));
3205 bound_val = Some(start);
3206 let end = start + rng.i64(1..range_size_bound);
3207 let start = Timestamp::new(start, unit.into());
3208 let end = Timestamp::new(end, unit.into());
3209 (start, end)
3210 };
3211
3212 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3213 let data_gen = iter
3214 .map(|_| rng.i64(start.value()..end.value()))
3215 .sorted_by(ret_cmp_fn(descending))
3216 .collect_vec();
3217 output_data.extend(data_gen.clone());
3218 let arr = new_ts_array(unit, data_gen);
3219 let range = PartitionRange {
3220 start,
3221 end,
3222 num_rows: arr.len(),
3223 identifier: part_id,
3224 };
3225 input_ranged_data.push((range, vec![arr]));
3226 }
3227
3228 output_data.sort_by(ret_cmp_fn(descending));
3229 if let Some(fetch) = fetch {
3230 output_data.truncate(fetch);
3231 }
3232 let output_arr = new_ts_array(unit, output_data);
3233
3234 let test_stream = TestStream::new(
3235 Column::new("ts", 0),
3236 SortOptions {
3237 descending,
3238 nulls_first: true,
3239 },
3240 fetch,
3241 vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3242 input_ranged_data.clone(),
3243 vec![vec![output_arr]],
3244 );
3245 full_testcase_list.push(test_stream);
3246 }
3247
3248 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3249 let res = test_stream.run_test().await;
3250 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3251 let expected = test_stream.output;
3252 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3253
3254 if res_concat != expected_concat {
3255 {
3256 let mut f_input = std::io::stderr();
3257 f_input.write_all(b"[").unwrap();
3258 for (input_range, input_arr) in test_stream.input {
3259 let range_json = json!({
3260 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3261 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3262 "num_rows": input_range.num_rows,
3263 "identifier": input_range.identifier,
3264 });
3265 let buf = Vec::new();
3266 let mut input_writer = ArrayWriter::new(buf);
3267 input_writer.write(&input_arr).unwrap();
3268 input_writer.finish().unwrap();
3269 let res_str =
3270 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3271 let whole_json =
3272 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3273 f_input.write_all(whole_json.as_bytes()).unwrap();
3274 }
3275 f_input.write_all(b"]").unwrap();
3276 }
3277 {
3278 let mut f_res = std::io::stderr();
3279 f_res.write_all(b"[").unwrap();
3280 for batch in &res {
3281 let mut res_writer = ArrayWriter::new(f_res);
3282 res_writer.write(batch).unwrap();
3283 res_writer.finish().unwrap();
3284 f_res = res_writer.into_inner();
3285 f_res.write_all(b",").unwrap();
3286 }
3287 f_res.write_all(b"]").unwrap();
3288
3289 let f_res_concat = std::io::stderr();
3290 let mut res_writer = ArrayWriter::new(f_res_concat);
3291 res_writer.write(&res_concat).unwrap();
3292 res_writer.finish().unwrap();
3293
3294 let f_expected = std::io::stderr();
3295 let mut expected_writer = ArrayWriter::new(f_expected);
3296 expected_writer.write(&expected_concat).unwrap();
3297 expected_writer.finish().unwrap();
3298 }
3299 panic!(
3300 "case failed, case id: {0}, output and expected output to stderr",
3301 case_id
3302 );
3303 }
3304 assert_eq!(
3305 res_concat, expected_concat,
3306 "case failed, case id: {}, rng seed: {}",
3307 case_id, rng_seed
3308 );
3309 }
3310 }
3311}