1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{internal_err, DataFusionError};
43use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68 expression: PhysicalSortExpr,
70 fetch: Option<usize>,
72 ranges: Vec<Vec<PartitionRange>>,
76 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81 input: Arc<dyn ExecutionPlan>,
82 metrics: ExecutionPlanMetricsSet,
84 properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88 ranges: &[Vec<PartitionRange>],
89 descending: bool,
90) -> Result<()> {
91 let is_valid = ranges.iter().all(|r| {
92 if descending {
93 r.windows(2).all(|w| w[0].end >= w[1].end)
94 } else {
95 r.windows(2).all(|w| w[0].start <= w[1].start)
96 }
97 });
98
99 if !is_valid {
100 let msg = if descending {
101 "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102 } else {
103 "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104 };
105 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107 } else {
108 Ok(())
109 }
110}
111
112impl WindowedSortExec {
113 pub fn try_new(
114 expression: PhysicalSortExpr,
115 fetch: Option<usize>,
116 ranges: Vec<Vec<PartitionRange>>,
117 input: Arc<dyn ExecutionPlan>,
118 ) -> Result<Self> {
119 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121 let properties = input.properties();
122 let properties = PlanProperties::new(
123 input
124 .equivalence_properties()
125 .clone()
126 .with_reorder(LexOrdering::new(vec![expression.clone()])),
127 input.output_partitioning().clone(),
128 properties.emission_type,
129 properties.boundedness,
130 );
131
132 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133 for r in &ranges {
134 let overlap_counts = split_overlapping_ranges(r);
135 let working_ranges =
136 compute_all_working_ranges(&overlap_counts, expression.options.descending);
137 all_avail_working_range.push(working_ranges);
138 }
139
140 Ok(Self {
141 expression,
142 fetch,
143 ranges,
144 all_avail_working_range,
145 input,
146 metrics: ExecutionPlanMetricsSet::new(),
147 properties,
148 })
149 }
150
151 pub fn to_stream(
155 &self,
156 context: Arc<TaskContext>,
157 partition: usize,
158 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159 let input_stream: DfSendableRecordBatchStream =
160 self.input.execute(partition, context.clone())?;
161
162 let df_stream = Box::pin(WindowedSortStream::new(
163 context,
164 self,
165 input_stream,
166 partition,
167 )) as _;
168
169 Ok(df_stream)
170 }
171}
172
173impl DisplayAs for WindowedSortExec {
174 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(
176 f,
177 "WindowedSortExec: expr={} num_ranges={}",
178 self.expression,
179 self.ranges.len()
180 )?;
181 if let Some(fetch) = self.fetch {
182 write!(f, " fetch={}", fetch)?;
183 }
184 Ok(())
185 }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189 fn as_any(&self) -> &dyn Any {
190 self
191 }
192
193 fn schema(&self) -> SchemaRef {
194 self.input.schema()
195 }
196
197 fn properties(&self) -> &PlanProperties {
198 &self.properties
199 }
200
201 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202 vec![&self.input]
203 }
204
205 fn with_new_children(
206 self: Arc<Self>,
207 children: Vec<Arc<dyn ExecutionPlan>>,
208 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209 let new_input = if let Some(first) = children.first() {
210 first
211 } else {
212 internal_err!("No children found")?
213 };
214 let new = Self::try_new(
215 self.expression.clone(),
216 self.fetch,
217 self.ranges.clone(),
218 new_input.clone(),
219 )?;
220 Ok(Arc::new(new))
221 }
222
223 fn execute(
224 &self,
225 partition: usize,
226 context: Arc<TaskContext>,
227 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228 self.to_stream(context, partition)
229 }
230
231 fn metrics(&self) -> Option<MetricsSet> {
232 Some(self.metrics.clone_inner())
233 }
234
235 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241 vec![false; self.ranges.len()]
242 }
243
244 fn name(&self) -> &str {
245 "WindowedSortExec"
246 }
247}
248
249pub struct WindowedSortStream {
256 memory_pool: Arc<dyn MemoryPool>,
258 in_progress: Vec<DfRecordBatch>,
260 last_value: Option<Timestamp>,
262 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264 merge_stream: VecDeque<DfSendableRecordBatchStream>,
266 merge_count: usize,
268 working_idx: usize,
270 input: DfSendableRecordBatchStream,
272 is_terminated: bool,
274 schema: SchemaRef,
276 expression: PhysicalSortExpr,
278 fetch: Option<usize>,
280 produced: usize,
282 batch_size: usize,
284 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288 #[allow(dead_code)] ranges: Vec<PartitionRange>,
291 metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296 pub fn new(
297 context: Arc<TaskContext>,
298 exec: &WindowedSortExec,
299 input: DfSendableRecordBatchStream,
300 partition: usize,
301 ) -> Self {
302 Self {
303 memory_pool: context.runtime_env().memory_pool.clone(),
304 in_progress: Vec::new(),
305 last_value: None,
306 sorted_input_runs: Vec::new(),
307 merge_stream: VecDeque::new(),
308 merge_count: 0,
309 working_idx: 0,
310 schema: input.schema(),
311 input,
312 is_terminated: false,
313 expression: exec.expression.clone(),
314 fetch: exec.fetch,
315 produced: 0,
316 batch_size: context.session_config().batch_size(),
317 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318 ranges: exec.ranges[partition].clone(),
319 metrics: BaselineMetrics::new(&exec.metrics, partition),
320 }
321 }
322}
323
324impl WindowedSortStream {
325 #[cfg(debug_assertions)]
326 fn check_subset_ranges(&self, cur_range: &TimeRange) {
327 let cur_is_subset_to = self
328 .ranges
329 .iter()
330 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331 .collect_vec();
332 if cur_is_subset_to.is_empty() {
333 error!("Current range is not a subset of any PartitionRange");
334 let subset_ranges = self
336 .ranges
337 .iter()
338 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339 .collect_vec();
340 let only_overlap = self
341 .ranges
342 .iter()
343 .filter(|r| {
344 let r = TimeRange::from(*r);
345 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346 })
347 .collect_vec();
348 error!(
349 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350 subset_ranges.len(),
351 only_overlap.len(),
352 subset_ranges,
353 only_overlap
354 );
355 } else {
356 let only_overlap = self
357 .ranges
358 .iter()
359 .filter(|r| {
360 let r = TimeRange::from(*r);
361 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362 })
363 .collect_vec();
364 error!(
365 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366 cur_is_subset_to.len(),
367 only_overlap.len(),
368 cur_is_subset_to,
369 only_overlap
370 );
371 }
372 let all_overlap_working_range = self
373 .all_avail_working_range
374 .iter()
375 .filter(|(range, _)| range.is_overlapping(cur_range))
376 .map(|(range, _)| range)
377 .collect_vec();
378 error!(
379 "Found {} working ranges that overlap with current range: {:?}",
380 all_overlap_working_range.len(),
381 all_overlap_working_range
382 );
383 }
384
385 fn poll_result_stream(
387 &mut self,
388 cx: &mut Context<'_>,
389 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391 match merge_stream.as_mut().poll_next(cx) {
392 Poll::Ready(Some(Ok(batch))) => {
393 let ret = if let Some(remaining) = self.remaining_fetch() {
394 if remaining == 0 {
395 self.is_terminated = true;
396 None
397 } else if remaining < batch.num_rows() {
398 self.produced += remaining;
399 Some(Ok(batch.slice(0, remaining)))
400 } else {
401 self.produced += batch.num_rows();
402 Some(Ok(batch))
403 }
404 } else {
405 self.produced += batch.num_rows();
406 Some(Ok(batch))
407 };
408 return Poll::Ready(ret);
409 }
410 Poll::Ready(Some(Err(e))) => {
411 return Poll::Ready(Some(Err(e)));
412 }
413 Poll::Ready(None) => {
414 self.merge_stream.pop_front();
417 continue;
418 }
419 Poll::Pending => {
420 return Poll::Pending;
421 }
422 }
423 }
424 Poll::Ready(None)
426 }
427
428 pub fn poll_next_inner(
432 mut self: Pin<&mut Self>,
433 cx: &mut Context<'_>,
434 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435 match self.poll_result_stream(cx) {
437 Poll::Ready(None) => {
438 if self.is_terminated {
439 return Poll::Ready(None);
440 }
441 }
442 x => return x,
443 };
444
445 while !self.is_terminated {
447 let new_input_rbs = match self.input.as_mut().poll_next(cx) {
449 Poll::Ready(Some(Ok(batch))) => {
450 Some(split_batch_to_sorted_run(batch, &self.expression)?)
451 }
452 Poll::Ready(Some(Err(e))) => {
453 return Poll::Ready(Some(Err(e)));
454 }
455 Poll::Ready(None) => {
456 self.is_terminated = true;
458 None
459 }
460 Poll::Pending => return Poll::Pending,
461 };
462
463 let Some(SortedRunSet {
464 runs_with_batch,
465 sort_column,
466 }) = new_input_rbs
467 else {
468 self.build_sorted_stream()?;
470 self.start_new_merge_sort()?;
471 continue;
472 };
473 let mut last_remaining = None;
479 let mut run_iter = runs_with_batch.into_iter();
480 loop {
481 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
482 break;
483 };
484 if sorted_rb.num_rows() == 0 {
485 continue;
486 }
487 let Some(cur_range) = run_info.get_time_range() else {
489 internal_err!("Found NULL in time index column")?
490 };
491 let Some(working_range) = self.get_working_range() else {
492 internal_err!("No working range found")?
493 };
494
495 if sort_column.options.unwrap_or_default().descending {
497 if cur_range.end > working_range.end {
498 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
499 #[cfg(debug_assertions)]
500 self.check_subset_ranges(&cur_range);
501 internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
502 }
503 } else if cur_range.start < working_range.start {
504 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
505 #[cfg(debug_assertions)]
506 self.check_subset_ranges(&cur_range);
507 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
508 }
509
510 if cur_range.is_subset(&working_range) {
511 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
514 } else if let Some(intersection) = cur_range.intersection(&working_range) {
515 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
517 let (offset, len) = find_slice_from_range(
518 &SortColumn {
519 values: cur_sort_column.clone(),
520 options: sort_column.options,
521 },
522 &intersection,
523 )?;
524
525 if offset != 0 {
526 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
527 }
528
529 let sliced_rb = sorted_rb.slice(offset, len);
530
531 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
533 self.build_sorted_stream()?;
535
536 self.start_new_merge_sort()?;
538
539 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
540 if r_len != 0 {
541 let remaining_rb = sorted_rb.slice(r_offset, r_len);
543 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
544 let new_run_info = SucRun {
545 offset: run_info.offset + r_offset,
546 len: r_len,
547 first_val: new_first_val,
548 last_val: run_info.last_val,
549 };
550 last_remaining = Some((remaining_rb, new_run_info));
551 }
552 } else {
558 self.build_sorted_stream()?;
561 self.start_new_merge_sort()?;
562
563 last_remaining = Some((sorted_rb, run_info));
565 }
566 }
567 }
568 self.poll_result_stream(cx)
570 }
571
572 fn push_batch(&mut self, batch: DfRecordBatch) {
573 self.in_progress.push(batch);
574 }
575
576 fn try_concat_batch(
580 &mut self,
581 batch: DfRecordBatch,
582 run_info: &SucRun<Timestamp>,
583 opt: Option<SortOptions>,
584 ) -> datafusion_common::Result<()> {
585 let is_ok_to_concat =
586 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
587
588 if is_ok_to_concat {
589 self.push_batch(batch);
590 } else {
592 self.build_sorted_stream()?;
594 self.push_batch(batch);
595 }
596 self.last_value = run_info.last_val;
597 Ok(())
598 }
599
600 fn get_working_range(&self) -> Option<TimeRange> {
602 self.all_avail_working_range
603 .get(self.working_idx)
604 .map(|(range, _)| *range)
605 }
606
607 fn set_next_working_range(&mut self) {
609 self.working_idx += 1;
610 }
611
612 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
614 if self.in_progress.is_empty() {
615 return Ok(());
616 }
617 let data = std::mem::take(&mut self.in_progress);
618
619 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
620 self.sorted_input_runs.push(Box::pin(new_stream));
621 Ok(())
622 }
623
624 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
626 if !self.in_progress.is_empty() {
627 return internal_err!("Starting a merge sort when in_progress is not empty")?;
628 }
629
630 self.set_next_working_range();
631
632 let streams = std::mem::take(&mut self.sorted_input_runs);
633 if streams.is_empty() {
634 return Ok(());
635 } else if streams.len() == 1 {
636 self.merge_stream
637 .push_back(streams.into_iter().next().unwrap());
638 return Ok(());
639 }
640
641 let fetch = self.remaining_fetch();
642 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
643 .register(&self.memory_pool);
644 self.merge_count += 1;
645 let lex_ordering = LexOrdering::new(vec![self.expression.clone()]);
646
647 let resulting_stream = StreamingMergeBuilder::new()
648 .with_streams(streams)
649 .with_schema(self.schema())
650 .with_expressions(&lex_ordering)
651 .with_metrics(self.metrics.clone())
652 .with_batch_size(self.batch_size)
653 .with_fetch(fetch)
654 .with_reservation(reservation)
655 .build()?;
656 self.merge_stream.push_back(resulting_stream);
657 Ok(())
659 }
660
661 fn remaining_fetch(&self) -> Option<usize> {
664 let total_now = self.produced;
665 self.fetch.map(|p| p.saturating_sub(total_now))
666 }
667}
668
669impl Stream for WindowedSortStream {
670 type Item = datafusion_common::Result<DfRecordBatch>;
671
672 fn poll_next(
673 mut self: Pin<&mut Self>,
674 cx: &mut Context<'_>,
675 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
676 let result = self.as_mut().poll_next_inner(cx);
677 self.metrics.record_poll(result)
678 }
679}
680
681impl RecordBatchStream for WindowedSortStream {
682 fn schema(&self) -> SchemaRef {
683 self.schema.clone()
684 }
685}
686
687fn split_batch_to_sorted_run(
689 batch: DfRecordBatch,
690 expression: &PhysicalSortExpr,
691) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
692 let sort_column = expression.evaluate_to_sort_column(&batch)?;
694 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
695 if let Some(run) = sorted_runs_offset.first()
696 && sorted_runs_offset.len() == 1
697 {
698 if !(run.offset == 0 && run.len == batch.num_rows()) {
699 internal_err!(
700 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
701 run.offset,
702 run.len,
703 batch.num_rows()
704 )?;
705 }
706 Ok(SortedRunSet {
708 runs_with_batch: vec![(batch, run.clone())],
709 sort_column,
710 })
711 } else {
712 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
714 for run in sorted_runs_offset {
715 if run.offset + run.len > batch.num_rows() {
716 internal_err!(
717 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
718 run.offset,
719 run.len,
720 batch.num_rows()
721 )?;
722 }
723 let new_rb = batch.slice(run.offset, run.len);
724 ret.push((new_rb, run));
725 }
726 Ok(SortedRunSet {
727 runs_with_batch: ret,
728 sort_column,
729 })
730 }
731}
732
733#[macro_export]
737macro_rules! downcast_ts_array {
738 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
739 {
740 match $data_type {
741 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
742 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
743 }
744 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
745 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
746 }
747 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
748 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
749 }
750 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
751 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
752 }
753 $($p => $fallback,)*
754 }
755 };
756}
757
758fn find_slice_from_range(
762 sort_column: &SortColumn,
763 range: &TimeRange,
764) -> datafusion_common::Result<(usize, usize)> {
765 let ty = sort_column.values.data_type();
766 let time_unit = if let DataType::Timestamp(unit, _) = ty {
767 unit
768 } else {
769 return Err(DataFusionError::Internal(format!(
770 "Unsupported sort column type: {}",
771 sort_column.values.data_type()
772 )));
773 };
774 let array = &sort_column.values;
775 let opt = &sort_column.options.unwrap_or_default();
776 let descending = opt.descending;
777
778 let typed_sorted_range = [range.start, range.end]
779 .iter()
780 .map(|t| {
781 t.convert_to(time_unit.into())
782 .ok_or_else(|| {
783 DataFusionError::Internal(format!(
784 "Failed to convert timestamp from {:?} to {:?}",
785 t.unit(),
786 time_unit
787 ))
788 })
789 .and_then(|typed_ts| {
790 let value = Value::Timestamp(typed_ts);
791 value
792 .try_to_scalar_value(&value.data_type())
793 .map_err(|e| DataFusionError::External(Box::new(e) as _))
794 })
795 })
796 .try_collect::<_, Vec<_>, _>()?;
797
798 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
799
800 let (start, end) = if descending {
802 let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
806 let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
809 (start, end)
810 } else {
811 let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
814 let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
817 (start, end)
818 };
819
820 Ok((start, end - start))
821}
822
823#[macro_export]
827macro_rules! array_iter_helper {
828 ($t:ty, $unit:expr, $arr:expr) => {{
829 let typed = $arr
830 .as_any()
831 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
832 .unwrap();
833 let iter = typed.iter().enumerate();
834 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
835 }};
836}
837
838fn cmp_with_opts<T: Ord>(
842 a: &Option<T>,
843 b: &Option<T>,
844 opt: &Option<SortOptions>,
845) -> std::cmp::Ordering {
846 let opt = opt.unwrap_or_default();
847
848 if let (Some(a), Some(b)) = (a, b) {
849 if opt.descending {
850 b.cmp(a)
851 } else {
852 a.cmp(b)
853 }
854 } else if opt.nulls_first {
855 a.cmp(b)
858 } else {
859 match (a, b) {
860 (Some(a), Some(b)) => a.cmp(b),
861 (Some(_), None) => std::cmp::Ordering::Less,
862 (None, Some(_)) => std::cmp::Ordering::Greater,
863 (None, None) => std::cmp::Ordering::Equal,
864 }
865 }
866}
867
868#[derive(Debug, Clone)]
869struct SortedRunSet<N: Ord> {
870 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
872 sort_column: SortColumn,
874}
875
876#[derive(Debug, Clone, PartialEq)]
878struct SucRun<N: Ord> {
879 offset: usize,
881 len: usize,
883 first_val: Option<N>,
885 last_val: Option<N>,
887}
888
889impl SucRun<Timestamp> {
890 fn get_time_range(&self) -> Option<TimeRange> {
892 let start = self.first_val.min(self.last_val);
893 let end = self
894 .first_val
895 .max(self.last_val)
896 .map(|i| Timestamp::new(i.value() + 1, i.unit()));
897 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
898 }
899}
900
901fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
903 iter: T,
904 sort_opts: &Option<SortOptions>,
905) -> Vec<SucRun<N>> {
906 let mut runs = Vec::new();
907 let mut last_value = None;
908 let mut iter_len = None;
909
910 let mut last_offset = 0;
911 let mut first_val: Option<N> = None;
912 let mut last_val: Option<N> = None;
913
914 for (idx, t) in iter {
915 if let Some(last_value) = &last_value {
916 if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
917 let len = idx - last_offset;
919 let run = SucRun {
920 offset: last_offset,
921 len,
922 first_val,
923 last_val,
924 };
925 runs.push(run);
926 first_val = None;
927 last_val = None;
928
929 last_offset = idx;
930 }
931 }
932 last_value = Some(t);
933 if let Some(t) = t {
934 first_val = first_val.or(Some(t));
935 last_val = Some(t).or(last_val);
936 }
937 iter_len = Some(idx);
938 }
939 let run = SucRun {
940 offset: last_offset,
941 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
942 first_val,
943 last_val,
944 };
945 runs.push(run);
946
947 runs
948}
949
950fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
954 let ty = sort_column.values.data_type();
955 if let DataType::Timestamp(unit, _) = ty {
956 let array = &sort_column.values;
957 let iter = downcast_ts_array!(
958 array.data_type() => (array_iter_helper, array),
959 _ => internal_err!("Unsupported sort column type: {ty}")?
960 );
961
962 let raw = find_successive_runs(iter, &sort_column.options);
963 let ts_runs = raw
964 .into_iter()
965 .map(|run| SucRun {
966 offset: run.offset,
967 len: run.len,
968 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
969 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
970 })
971 .collect_vec();
972 Ok(ts_runs)
973 } else {
974 Err(DataFusionError::Internal(format!(
975 "Unsupported sort column type: {ty}"
976 )))
977 }
978}
979
980#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
984struct TimeRange {
985 start: Timestamp,
986 end: Timestamp,
987}
988
989impl From<&PartitionRange> for TimeRange {
990 fn from(range: &PartitionRange) -> Self {
991 Self::new(range.start, range.end)
992 }
993}
994
995impl From<(Timestamp, Timestamp)> for TimeRange {
996 fn from(range: (Timestamp, Timestamp)) -> Self {
997 Self::new(range.0, range.1)
998 }
999}
1000
1001impl From<&(Timestamp, Timestamp)> for TimeRange {
1002 fn from(range: &(Timestamp, Timestamp)) -> Self {
1003 Self::new(range.0, range.1)
1004 }
1005}
1006
1007impl TimeRange {
1008 fn new(start: Timestamp, end: Timestamp) -> Self {
1010 if start > end {
1011 Self {
1012 start: end,
1013 end: start,
1014 }
1015 } else {
1016 Self { start, end }
1017 }
1018 }
1019
1020 fn is_subset(&self, other: &Self) -> bool {
1021 self.start >= other.start && self.end <= other.end
1022 }
1023
1024 fn is_overlapping(&self, other: &Self) -> bool {
1026 !(self.start >= other.end || self.end <= other.start)
1027 }
1028
1029 fn intersection(&self, other: &Self) -> Option<Self> {
1030 if self.is_overlapping(other) {
1031 Some(Self::new(
1032 self.start.max(other.start),
1033 self.end.min(other.end),
1034 ))
1035 } else {
1036 None
1037 }
1038 }
1039
1040 fn difference(&self, other: &Self) -> Vec<Self> {
1041 if !self.is_overlapping(other) {
1042 vec![*self]
1043 } else {
1044 let mut ret = Vec::new();
1045 if self.start < other.start && self.end > other.end {
1046 ret.push(Self::new(self.start, other.start));
1047 ret.push(Self::new(other.end, self.end));
1048 } else if self.start < other.start {
1049 ret.push(Self::new(self.start, other.start));
1050 } else if self.end > other.end {
1051 ret.push(Self::new(other.end, self.end));
1052 }
1053 ret
1054 }
1055 }
1056}
1057
1058fn split_range_by(
1060 input_range: &TimeRange,
1061 input_parts: &[usize],
1062 split_by: &TimeRange,
1063 split_idx: usize,
1064) -> Vec<Action> {
1065 let mut ret = Vec::new();
1066 if input_range.is_overlapping(split_by) {
1067 let input_parts = input_parts.to_vec();
1068 let new_parts = {
1069 let mut new_parts = input_parts.clone();
1070 new_parts.push(split_idx);
1071 new_parts
1072 };
1073
1074 ret.push(Action::Pop(*input_range));
1075 if let Some(intersection) = input_range.intersection(split_by) {
1076 ret.push(Action::Push(intersection, new_parts.clone()));
1077 }
1078 for diff in input_range.difference(split_by) {
1079 ret.push(Action::Push(diff, input_parts.clone()));
1080 }
1081 }
1082 ret
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Eq)]
1086enum Action {
1087 Pop(TimeRange),
1088 Push(TimeRange, Vec<usize>),
1089}
1090
1091fn compute_all_working_ranges(
1099 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1100 descending: bool,
1101) -> Vec<(TimeRange, BTreeSet<usize>)> {
1102 let mut ret = Vec::new();
1103 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1104 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1105 Box::new(overlap_counts.iter().rev()) as _
1106 } else {
1107 Box::new(overlap_counts.iter()) as _
1108 };
1109 for (range, set) in overlap_iter {
1110 match &mut cur_range_set {
1111 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1112 Some((working_range, working_set)) => {
1113 let need_expand = {
1118 let last_part = working_set.last();
1119 let inter: BTreeSet<usize> = working_set
1120 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1121 .cloned()
1122 .collect();
1123 if let Some(one) = inter.first()
1124 && inter.len() == 1
1125 && Some(one) == last_part
1126 {
1127 if set.iter().all(|p| Some(p) >= last_part) {
1129 false
1131 } else {
1132 true
1134 }
1135 } else if inter.is_empty() {
1136 false
1138 } else {
1139 true
1141 }
1142 };
1143
1144 if need_expand {
1145 if descending {
1146 working_range.start = range.start;
1147 } else {
1148 working_range.end = range.end;
1149 }
1150 working_set.extend(set.iter().cloned());
1151 } else {
1152 ret.push((*working_range, std::mem::take(working_set)));
1153 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1154 }
1155 }
1156 }
1157 }
1158
1159 if let Some(cur_range_set) = cur_range_set {
1160 ret.push(cur_range_set)
1161 }
1162
1163 ret
1164}
1165
1166fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1169 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1171 for (idx, range) in ranges.iter().enumerate() {
1172 let key: TimeRange = (range.start, range.end).into();
1173 let mut actions = Vec::new();
1174 let mut untouched = vec![key];
1175 let forward_iter = ret
1179 .range(key..)
1180 .take_while(|(range, _)| range.is_overlapping(&key));
1181 let backward_iter = ret
1182 .range(..key)
1183 .rev()
1184 .take_while(|(range, _)| range.is_overlapping(&key));
1185
1186 for (range, parts) in forward_iter.chain(backward_iter) {
1187 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1188 let act = split_range_by(range, parts, &key, idx);
1189 actions.extend(act.into_iter());
1190 }
1191
1192 for action in actions {
1193 match action {
1194 Action::Pop(range) => {
1195 ret.remove(&range);
1196 }
1197 Action::Push(range, parts) => {
1198 ret.insert(range, parts);
1199 }
1200 }
1201 }
1202
1203 for range in untouched {
1205 ret.insert(range, vec![idx]);
1206 }
1207 }
1208 ret
1209}
1210
1211fn get_timestamp_from_idx(
1213 array: &ArrayRef,
1214 offset: usize,
1215) -> datafusion_common::Result<Option<Timestamp>> {
1216 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1217 unit
1218 } else {
1219 return Err(DataFusionError::Internal(format!(
1220 "Unsupported sort column type: {}",
1221 array.data_type()
1222 )));
1223 };
1224 let ty = array.data_type();
1225 let array = array.slice(offset, 1);
1226 let mut iter = downcast_ts_array!(
1227 array.data_type() => (array_iter_helper, array),
1228 _ => internal_err!("Unsupported sort column type: {ty}")?
1229 );
1230 let (_idx, val) = iter.next().ok_or_else(|| {
1231 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1232 })?;
1233 let val = if let Some(val) = val {
1234 val
1235 } else {
1236 return Ok(None);
1237 };
1238 let gt_timestamp = Timestamp::new(val, time_unit.into());
1239 Ok(Some(gt_timestamp))
1240}
1241
1242#[cfg(test)]
1243mod test {
1244 use std::io::Write;
1245 use std::sync::Arc;
1246
1247 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1248 use arrow::compute::concat_batches;
1249 use arrow::json::ArrayWriter;
1250 use arrow_schema::{Field, Schema, TimeUnit};
1251 use futures::StreamExt;
1252 use pretty_assertions::assert_eq;
1253 use serde_json::json;
1254
1255 use super::*;
1256 use crate::test_util::{new_ts_array, MockInputExec};
1257
1258 #[test]
1259 fn test_overlapping() {
1260 let testcases = [
1261 (
1262 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1263 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1264 false,
1265 ),
1266 (
1267 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1268 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1269 true,
1270 ),
1271 (
1272 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1273 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1274 true,
1275 ),
1276 (
1277 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1278 (
1279 Timestamp::new_millisecond(1000),
1280 Timestamp::new_millisecond(1002),
1281 ),
1282 true,
1283 ),
1284 (
1285 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1286 (
1287 Timestamp::new_millisecond(1001),
1288 Timestamp::new_millisecond(1002),
1289 ),
1290 false,
1291 ),
1292 (
1293 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1294 (
1295 Timestamp::new_millisecond(1002),
1296 Timestamp::new_millisecond(1003),
1297 ),
1298 false,
1299 ),
1300 ];
1301
1302 for (range1, range2, expected) in testcases.iter() {
1303 assert_eq!(
1304 TimeRange::from(range1).is_overlapping(&range2.into()),
1305 *expected,
1306 "range1: {:?}, range2: {:?}",
1307 range1,
1308 range2
1309 );
1310 }
1311 }
1312
1313 #[test]
1314 fn test_split() {
1315 let testcases = [
1316 (
1318 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1319 vec![0],
1320 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1321 1,
1322 vec![],
1323 ),
1324 (
1326 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1327 vec![0],
1328 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1329 1,
1330 vec![
1331 Action::Pop(
1332 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1333 ),
1334 Action::Push(
1335 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1336 vec![0, 1],
1337 ),
1338 ],
1339 ),
1340 (
1341 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1342 vec![0],
1343 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1344 1,
1345 vec![
1346 Action::Pop(
1347 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1348 ),
1349 Action::Push(
1350 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1351 vec![0, 1],
1352 ),
1353 ],
1354 ),
1355 (
1356 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1357 vec![0],
1358 (
1359 Timestamp::new_millisecond(1000),
1360 Timestamp::new_millisecond(1002),
1361 ),
1362 1,
1363 vec![
1364 Action::Pop(
1365 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1366 ),
1367 Action::Push(
1368 (
1369 Timestamp::new_millisecond(1000),
1370 Timestamp::new_millisecond(1001),
1371 )
1372 .into(),
1373 vec![0, 1],
1374 ),
1375 ],
1376 ),
1377 (
1379 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1380 vec![0],
1381 (
1382 Timestamp::new_millisecond(1001),
1383 Timestamp::new_millisecond(1002),
1384 ),
1385 1,
1386 vec![
1387 Action::Pop(
1388 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1389 ),
1390 Action::Push(
1391 (
1392 Timestamp::new_millisecond(1001),
1393 Timestamp::new_millisecond(1002),
1394 )
1395 .into(),
1396 vec![0, 1],
1397 ),
1398 Action::Push(
1399 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1400 vec![0],
1401 ),
1402 ],
1403 ),
1404 (
1406 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1407 vec![0],
1408 (
1409 Timestamp::new_millisecond(1001),
1410 Timestamp::new_millisecond(1002),
1411 ),
1412 1,
1413 vec![
1414 Action::Pop(
1415 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1416 ),
1417 Action::Push(
1418 (
1419 Timestamp::new_millisecond(1001),
1420 Timestamp::new_millisecond(1002),
1421 )
1422 .into(),
1423 vec![0, 1],
1424 ),
1425 Action::Push(
1426 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1427 vec![0],
1428 ),
1429 Action::Push(
1430 (
1431 Timestamp::new_millisecond(1002),
1432 Timestamp::new_millisecond(1004),
1433 )
1434 .into(),
1435 vec![0],
1436 ),
1437 ],
1438 ),
1439 ];
1440
1441 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1442 assert_eq!(
1443 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1444 *expected,
1445 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1446 range,
1447 parts,
1448 split_by,
1449 split_idx
1450 );
1451 }
1452 }
1453
1454 #[test]
1455 fn test_compute_working_ranges_rev() {
1456 let testcases = vec![
1457 (
1458 BTreeMap::from([(
1459 (Timestamp::new_second(1), Timestamp::new_second(2)),
1460 vec![0],
1461 )]),
1462 vec![(
1463 (Timestamp::new_second(1), Timestamp::new_second(2)),
1464 BTreeSet::from([0]),
1465 )],
1466 ),
1467 (
1468 BTreeMap::from([(
1469 (Timestamp::new_second(1), Timestamp::new_second(2)),
1470 vec![0, 1],
1471 )]),
1472 vec![(
1473 (Timestamp::new_second(1), Timestamp::new_second(2)),
1474 BTreeSet::from([0, 1]),
1475 )],
1476 ),
1477 (
1478 BTreeMap::from([
1479 (
1480 (Timestamp::new_second(2), Timestamp::new_second(3)),
1481 vec![0],
1482 ),
1483 (
1484 (Timestamp::new_second(1), Timestamp::new_second(2)),
1485 vec![0, 1],
1486 ),
1487 ]),
1488 vec![
1489 (
1490 (Timestamp::new_second(2), Timestamp::new_second(3)),
1491 BTreeSet::from([0]),
1492 ),
1493 (
1494 (Timestamp::new_second(1), Timestamp::new_second(2)),
1495 BTreeSet::from([0, 1]),
1496 ),
1497 ],
1498 ),
1499 (
1500 BTreeMap::from([
1501 (
1502 (Timestamp::new_second(2), Timestamp::new_second(3)),
1503 vec![0, 1],
1504 ),
1505 (
1506 (Timestamp::new_second(1), Timestamp::new_second(2)),
1507 vec![1],
1508 ),
1509 ]),
1510 vec![
1511 (
1512 (Timestamp::new_second(2), Timestamp::new_second(3)),
1513 BTreeSet::from([0, 1]),
1514 ),
1515 (
1516 (Timestamp::new_second(1), Timestamp::new_second(2)),
1517 BTreeSet::from([1]),
1518 ),
1519 ],
1520 ),
1521 (
1522 BTreeMap::from([
1523 (
1524 (Timestamp::new_second(3), Timestamp::new_second(4)),
1525 vec![0],
1526 ),
1527 (
1528 (Timestamp::new_second(2), Timestamp::new_second(3)),
1529 vec![0, 1],
1530 ),
1531 (
1532 (Timestamp::new_second(1), Timestamp::new_second(2)),
1533 vec![1],
1534 ),
1535 ]),
1536 vec![
1537 (
1538 (Timestamp::new_second(3), Timestamp::new_second(4)),
1539 BTreeSet::from([0]),
1540 ),
1541 (
1542 (Timestamp::new_second(2), Timestamp::new_second(3)),
1543 BTreeSet::from([0, 1]),
1544 ),
1545 (
1546 (Timestamp::new_second(1), Timestamp::new_second(2)),
1547 BTreeSet::from([1]),
1548 ),
1549 ],
1550 ),
1551 (
1552 BTreeMap::from([
1553 (
1554 (Timestamp::new_second(3), Timestamp::new_second(4)),
1555 vec![0, 2],
1556 ),
1557 (
1558 (Timestamp::new_second(2), Timestamp::new_second(3)),
1559 vec![0, 1, 2],
1560 ),
1561 (
1562 (Timestamp::new_second(1), Timestamp::new_second(2)),
1563 vec![1, 2],
1564 ),
1565 ]),
1566 vec![(
1567 (Timestamp::new_second(1), Timestamp::new_second(4)),
1568 BTreeSet::from([0, 1, 2]),
1569 )],
1570 ),
1571 (
1572 BTreeMap::from([
1573 (
1574 (Timestamp::new_second(2), Timestamp::new_second(3)),
1575 vec![0, 2],
1576 ),
1577 (
1578 (Timestamp::new_second(1), Timestamp::new_second(2)),
1579 vec![1, 2],
1580 ),
1581 ]),
1582 vec![(
1583 (Timestamp::new_second(1), Timestamp::new_second(3)),
1584 BTreeSet::from([0, 1, 2]),
1585 )],
1586 ),
1587 (
1588 BTreeMap::from([
1589 (
1590 (Timestamp::new_second(3), Timestamp::new_second(4)),
1591 vec![0, 1],
1592 ),
1593 (
1594 (Timestamp::new_second(2), Timestamp::new_second(3)),
1595 vec![0, 1, 2],
1596 ),
1597 (
1598 (Timestamp::new_second(1), Timestamp::new_second(2)),
1599 vec![1, 2],
1600 ),
1601 ]),
1602 vec![(
1603 (Timestamp::new_second(1), Timestamp::new_second(4)),
1604 BTreeSet::from([0, 1, 2]),
1605 )],
1606 ),
1607 (
1608 BTreeMap::from([
1609 (
1610 (Timestamp::new_second(2), Timestamp::new_second(3)),
1611 vec![0, 1],
1612 ),
1613 (
1614 (Timestamp::new_second(1), Timestamp::new_second(2)),
1615 vec![1, 2],
1616 ),
1617 ]),
1618 vec![
1619 (
1620 (Timestamp::new_second(2), Timestamp::new_second(3)),
1621 BTreeSet::from([0, 1]),
1622 ),
1623 (
1624 (Timestamp::new_second(1), Timestamp::new_second(2)),
1625 BTreeSet::from([1, 2]),
1626 ),
1627 ],
1628 ),
1629 (
1631 BTreeMap::from([
1632 (
1633 (Timestamp::new_second(2), Timestamp::new_second(3)),
1634 vec![0],
1635 ),
1636 (
1637 (Timestamp::new_second(1), Timestamp::new_second(2)),
1638 vec![1, 2],
1639 ),
1640 ]),
1641 vec![
1642 (
1643 (Timestamp::new_second(2), Timestamp::new_second(3)),
1644 BTreeSet::from([0]),
1645 ),
1646 (
1647 (Timestamp::new_second(1), Timestamp::new_second(2)),
1648 BTreeSet::from([1, 2]),
1649 ),
1650 ],
1651 ),
1652 ];
1653
1654 for (input, expected) in testcases {
1655 let expected = expected
1656 .into_iter()
1657 .map(|(r, s)| (r.into(), s))
1658 .collect_vec();
1659 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1660 assert_eq!(
1661 compute_all_working_ranges(&input, true),
1662 expected,
1663 "input: {:?}",
1664 input
1665 );
1666 }
1667 }
1668
1669 #[test]
1670 fn test_compute_working_ranges() {
1671 let testcases = vec![
1672 (
1673 BTreeMap::from([(
1674 (Timestamp::new_second(1), Timestamp::new_second(2)),
1675 vec![0],
1676 )]),
1677 vec![(
1678 (Timestamp::new_second(1), Timestamp::new_second(2)),
1679 BTreeSet::from([0]),
1680 )],
1681 ),
1682 (
1683 BTreeMap::from([(
1684 (Timestamp::new_second(1), Timestamp::new_second(2)),
1685 vec![0, 1],
1686 )]),
1687 vec![(
1688 (Timestamp::new_second(1), Timestamp::new_second(2)),
1689 BTreeSet::from([0, 1]),
1690 )],
1691 ),
1692 (
1693 BTreeMap::from([
1694 (
1695 (Timestamp::new_second(1), Timestamp::new_second(2)),
1696 vec![0, 1],
1697 ),
1698 (
1699 (Timestamp::new_second(2), Timestamp::new_second(3)),
1700 vec![1],
1701 ),
1702 ]),
1703 vec![
1704 (
1705 (Timestamp::new_second(1), Timestamp::new_second(2)),
1706 BTreeSet::from([0, 1]),
1707 ),
1708 (
1709 (Timestamp::new_second(2), Timestamp::new_second(3)),
1710 BTreeSet::from([1]),
1711 ),
1712 ],
1713 ),
1714 (
1715 BTreeMap::from([
1716 (
1717 (Timestamp::new_second(1), Timestamp::new_second(2)),
1718 vec![0],
1719 ),
1720 (
1721 (Timestamp::new_second(2), Timestamp::new_second(3)),
1722 vec![0, 1],
1723 ),
1724 ]),
1725 vec![
1726 (
1727 (Timestamp::new_second(1), Timestamp::new_second(2)),
1728 BTreeSet::from([0]),
1729 ),
1730 (
1731 (Timestamp::new_second(2), Timestamp::new_second(3)),
1732 BTreeSet::from([0, 1]),
1733 ),
1734 ],
1735 ),
1736 (
1738 BTreeMap::from([
1739 (
1740 (Timestamp::new_second(1), Timestamp::new_second(2)),
1741 vec![0],
1742 ),
1743 (
1744 (Timestamp::new_second(2), Timestamp::new_second(3)),
1745 vec![0, 1],
1746 ),
1747 (
1748 (Timestamp::new_second(3), Timestamp::new_second(4)),
1749 vec![1],
1750 ),
1751 ]),
1752 vec![
1753 (
1754 (Timestamp::new_second(1), Timestamp::new_second(2)),
1755 BTreeSet::from([0]),
1756 ),
1757 (
1758 (Timestamp::new_second(2), Timestamp::new_second(3)),
1759 BTreeSet::from([0, 1]),
1760 ),
1761 (
1762 (Timestamp::new_second(3), Timestamp::new_second(4)),
1763 BTreeSet::from([1]),
1764 ),
1765 ],
1766 ),
1767 (
1768 BTreeMap::from([
1769 (
1770 (Timestamp::new_second(1), Timestamp::new_second(2)),
1771 vec![0, 2],
1772 ),
1773 (
1774 (Timestamp::new_second(2), Timestamp::new_second(3)),
1775 vec![0, 1, 2],
1776 ),
1777 (
1778 (Timestamp::new_second(3), Timestamp::new_second(4)),
1779 vec![1, 2],
1780 ),
1781 ]),
1782 vec![(
1783 (Timestamp::new_second(1), Timestamp::new_second(4)),
1784 BTreeSet::from([0, 1, 2]),
1785 )],
1786 ),
1787 (
1788 BTreeMap::from([
1789 (
1790 (Timestamp::new_second(1), Timestamp::new_second(2)),
1791 vec![0, 2],
1792 ),
1793 (
1794 (Timestamp::new_second(2), Timestamp::new_second(3)),
1795 vec![1, 2],
1796 ),
1797 ]),
1798 vec![(
1799 (Timestamp::new_second(1), Timestamp::new_second(3)),
1800 BTreeSet::from([0, 1, 2]),
1801 )],
1802 ),
1803 (
1804 BTreeMap::from([
1805 (
1806 (Timestamp::new_second(1), Timestamp::new_second(2)),
1807 vec![0, 1],
1808 ),
1809 (
1810 (Timestamp::new_second(2), Timestamp::new_second(3)),
1811 vec![0, 1, 2],
1812 ),
1813 (
1814 (Timestamp::new_second(3), Timestamp::new_second(4)),
1815 vec![1, 2],
1816 ),
1817 ]),
1818 vec![(
1819 (Timestamp::new_second(1), Timestamp::new_second(4)),
1820 BTreeSet::from([0, 1, 2]),
1821 )],
1822 ),
1823 (
1824 BTreeMap::from([
1825 (
1826 (Timestamp::new_second(1), Timestamp::new_second(2)),
1827 vec![0, 1],
1828 ),
1829 (
1830 (Timestamp::new_second(2), Timestamp::new_second(3)),
1831 vec![1, 2],
1832 ),
1833 ]),
1834 vec![
1835 (
1836 (Timestamp::new_second(1), Timestamp::new_second(2)),
1837 BTreeSet::from([0, 1]),
1838 ),
1839 (
1840 (Timestamp::new_second(2), Timestamp::new_second(3)),
1841 BTreeSet::from([1, 2]),
1842 ),
1843 ],
1844 ),
1845 (
1847 BTreeMap::from([
1848 (
1849 (Timestamp::new_second(1), Timestamp::new_second(2)),
1850 vec![0, 1],
1851 ),
1852 (
1853 (Timestamp::new_second(2), Timestamp::new_second(3)),
1854 vec![2],
1855 ),
1856 ]),
1857 vec![
1858 (
1859 (Timestamp::new_second(1), Timestamp::new_second(2)),
1860 BTreeSet::from([0, 1]),
1861 ),
1862 (
1863 (Timestamp::new_second(2), Timestamp::new_second(3)),
1864 BTreeSet::from([2]),
1865 ),
1866 ],
1867 ),
1868 ];
1869
1870 for (input, expected) in testcases {
1871 let expected = expected
1872 .into_iter()
1873 .map(|(r, s)| (r.into(), s))
1874 .collect_vec();
1875 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1876 assert_eq!(
1877 compute_all_working_ranges(&input, false),
1878 expected,
1879 "input: {:?}",
1880 input
1881 );
1882 }
1883 }
1884
1885 #[test]
1886 fn test_split_overlap_range() {
1887 let testcases = vec![
1888 (
1890 vec![PartitionRange {
1891 start: Timestamp::new_second(1),
1892 end: Timestamp::new_second(2),
1893 num_rows: 2,
1894 identifier: 0,
1895 }],
1896 BTreeMap::from_iter(
1897 vec![(
1898 (Timestamp::new_second(1), Timestamp::new_second(2)),
1899 vec![0],
1900 )]
1901 .into_iter(),
1902 ),
1903 ),
1904 (
1906 vec![
1907 PartitionRange {
1908 start: Timestamp::new_second(1),
1909 end: Timestamp::new_second(2),
1910 num_rows: 2,
1911 identifier: 0,
1912 },
1913 PartitionRange {
1914 start: Timestamp::new_second(1),
1915 end: Timestamp::new_second(2),
1916 num_rows: 2,
1917 identifier: 1,
1918 },
1919 ],
1920 BTreeMap::from_iter(
1921 vec![(
1922 (Timestamp::new_second(1), Timestamp::new_second(2)),
1923 vec![0, 1],
1924 )]
1925 .into_iter(),
1926 ),
1927 ),
1928 (
1929 vec![
1930 PartitionRange {
1931 start: Timestamp::new_second(1),
1932 end: Timestamp::new_second(3),
1933 num_rows: 2,
1934 identifier: 0,
1935 },
1936 PartitionRange {
1937 start: Timestamp::new_second(2),
1938 end: Timestamp::new_second(4),
1939 num_rows: 2,
1940 identifier: 1,
1941 },
1942 ],
1943 BTreeMap::from_iter(
1944 vec![
1945 (
1946 (Timestamp::new_second(1), Timestamp::new_second(2)),
1947 vec![0],
1948 ),
1949 (
1950 (Timestamp::new_second(2), Timestamp::new_second(3)),
1951 vec![0, 1],
1952 ),
1953 (
1954 (Timestamp::new_second(3), Timestamp::new_second(4)),
1955 vec![1],
1956 ),
1957 ]
1958 .into_iter(),
1959 ),
1960 ),
1961 (
1963 vec![
1964 PartitionRange {
1965 start: Timestamp::new_second(1),
1966 end: Timestamp::new_second(3),
1967 num_rows: 2,
1968 identifier: 0,
1969 },
1970 PartitionRange {
1971 start: Timestamp::new_second(2),
1972 end: Timestamp::new_second(4),
1973 num_rows: 2,
1974 identifier: 1,
1975 },
1976 PartitionRange {
1977 start: Timestamp::new_second(1),
1978 end: Timestamp::new_second(4),
1979 num_rows: 2,
1980 identifier: 2,
1981 },
1982 ],
1983 BTreeMap::from_iter(
1984 vec![
1985 (
1986 (Timestamp::new_second(1), Timestamp::new_second(2)),
1987 vec![0, 2],
1988 ),
1989 (
1990 (Timestamp::new_second(2), Timestamp::new_second(3)),
1991 vec![0, 1, 2],
1992 ),
1993 (
1994 (Timestamp::new_second(3), Timestamp::new_second(4)),
1995 vec![1, 2],
1996 ),
1997 ]
1998 .into_iter(),
1999 ),
2000 ),
2001 (
2002 vec![
2003 PartitionRange {
2004 start: Timestamp::new_second(1),
2005 end: Timestamp::new_second(3),
2006 num_rows: 2,
2007 identifier: 0,
2008 },
2009 PartitionRange {
2010 start: Timestamp::new_second(1),
2011 end: Timestamp::new_second(4),
2012 num_rows: 2,
2013 identifier: 1,
2014 },
2015 PartitionRange {
2016 start: Timestamp::new_second(2),
2017 end: Timestamp::new_second(4),
2018 num_rows: 2,
2019 identifier: 2,
2020 },
2021 ],
2022 BTreeMap::from_iter(
2023 vec![
2024 (
2025 (Timestamp::new_second(1), Timestamp::new_second(2)),
2026 vec![0, 1],
2027 ),
2028 (
2029 (Timestamp::new_second(2), Timestamp::new_second(3)),
2030 vec![0, 1, 2],
2031 ),
2032 (
2033 (Timestamp::new_second(3), Timestamp::new_second(4)),
2034 vec![1, 2],
2035 ),
2036 ]
2037 .into_iter(),
2038 ),
2039 ),
2040 ];
2041
2042 for (input, expected) in testcases {
2043 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2044 assert_eq!(split_overlapping_ranges(&input), expected);
2045 }
2046 }
2047
2048 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2049 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2050 Self {
2051 offset: offset as usize,
2052 len: len as usize,
2053 first_val: min_val,
2054 last_val: max_val,
2055 }
2056 }
2057 }
2058
2059 #[test]
2060 fn test_find_successive_runs() {
2061 let testcases = vec![
2062 (
2063 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2064 Some(SortOptions {
2065 descending: false,
2066 nulls_first: false,
2067 }),
2068 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2069 ),
2070 (
2071 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2072 Some(SortOptions {
2073 descending: false,
2074 nulls_first: false,
2075 }),
2076 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2077 ),
2078 (
2079 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2080 Some(SortOptions {
2081 descending: false,
2082 nulls_first: false,
2083 }),
2084 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2085 ),
2086 (
2087 vec![Some(1), Some(2), Some(1), Some(3)],
2088 Some(SortOptions {
2089 descending: false,
2090 nulls_first: false,
2091 }),
2092 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2093 ),
2094 (
2095 vec![Some(1), Some(2), Some(1), Some(3)],
2096 Some(SortOptions {
2097 descending: true,
2098 nulls_first: false,
2099 }),
2100 vec![
2101 (0, 1, Some(1), Some(1)),
2102 (1, 2, Some(2), Some(1)),
2103 (3, 1, Some(3), Some(3)),
2104 ],
2105 ),
2106 (
2107 vec![Some(1), Some(2), None, Some(3)],
2108 Some(SortOptions {
2109 descending: false,
2110 nulls_first: true,
2111 }),
2112 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2113 ),
2114 (
2115 vec![Some(1), Some(2), None, Some(3)],
2116 Some(SortOptions {
2117 descending: false,
2118 nulls_first: false,
2119 }),
2120 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2121 ),
2122 (
2123 vec![Some(2), Some(1), None, Some(3)],
2124 Some(SortOptions {
2125 descending: true,
2126 nulls_first: true,
2127 }),
2128 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2129 ),
2130 (
2131 vec![],
2132 Some(SortOptions {
2133 descending: false,
2134 nulls_first: true,
2135 }),
2136 vec![(0, 0, None, None)],
2137 ),
2138 (
2139 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2140 Some(SortOptions {
2141 descending: true,
2142 nulls_first: true,
2143 }),
2144 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2145 ),
2146 (
2147 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2148 Some(SortOptions {
2149 descending: true,
2150 nulls_first: false,
2151 }),
2152 vec![
2153 (0, 2, None, None),
2154 (2, 3, Some(2), Some(1)),
2155 (5, 2, Some(5), Some(4)),
2156 ],
2157 ),
2158 ];
2159 for (input, sort_opts, expected) in testcases {
2160 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2161 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2162 assert_eq!(
2163 ret, expected,
2164 "input: {:?}, opt: {:?},expected: {:?}",
2165 input, sort_opts, expected
2166 );
2167 }
2168 }
2169
2170 #[test]
2171 fn test_cmp_with_opts() {
2172 let testcases = vec![
2173 (
2174 Some(1),
2175 Some(2),
2176 Some(SortOptions {
2177 descending: false,
2178 nulls_first: false,
2179 }),
2180 std::cmp::Ordering::Less,
2181 ),
2182 (
2183 Some(1),
2184 Some(2),
2185 Some(SortOptions {
2186 descending: true,
2187 nulls_first: false,
2188 }),
2189 std::cmp::Ordering::Greater,
2190 ),
2191 (
2192 Some(1),
2193 None,
2194 Some(SortOptions {
2195 descending: false,
2196 nulls_first: true,
2197 }),
2198 std::cmp::Ordering::Greater,
2199 ),
2200 (
2201 Some(1),
2202 None,
2203 Some(SortOptions {
2204 descending: true,
2205 nulls_first: true,
2206 }),
2207 std::cmp::Ordering::Greater,
2208 ),
2209 (
2210 Some(1),
2211 None,
2212 Some(SortOptions {
2213 descending: true,
2214 nulls_first: false,
2215 }),
2216 std::cmp::Ordering::Less,
2217 ),
2218 (
2219 Some(1),
2220 None,
2221 Some(SortOptions {
2222 descending: false,
2223 nulls_first: false,
2224 }),
2225 std::cmp::Ordering::Less,
2226 ),
2227 (
2228 None,
2229 None,
2230 Some(SortOptions {
2231 descending: true,
2232 nulls_first: true,
2233 }),
2234 std::cmp::Ordering::Equal,
2235 ),
2236 (
2237 None,
2238 None,
2239 Some(SortOptions {
2240 descending: false,
2241 nulls_first: true,
2242 }),
2243 std::cmp::Ordering::Equal,
2244 ),
2245 (
2246 None,
2247 None,
2248 Some(SortOptions {
2249 descending: true,
2250 nulls_first: false,
2251 }),
2252 std::cmp::Ordering::Equal,
2253 ),
2254 (
2255 None,
2256 None,
2257 Some(SortOptions {
2258 descending: false,
2259 nulls_first: false,
2260 }),
2261 std::cmp::Ordering::Equal,
2262 ),
2263 ];
2264 for (a, b, opts, expected) in testcases {
2265 assert_eq!(
2266 cmp_with_opts(&a, &b, &opts),
2267 expected,
2268 "a: {:?}, b: {:?}, opts: {:?}",
2269 a,
2270 b,
2271 opts
2272 );
2273 }
2274 }
2275
2276 #[test]
2277 fn test_find_slice_from_range() {
2278 let test_cases = vec![
2279 (
2281 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2282 false,
2283 TimeRange {
2284 start: Timestamp::new_millisecond(2),
2285 end: Timestamp::new_millisecond(4),
2286 },
2287 Ok((1, 2)),
2288 ),
2289 (
2290 Arc::new(TimestampMillisecondArray::from_iter_values([
2291 -2, -1, 0, 1, 2, 3, 4, 5,
2292 ])) as ArrayRef,
2293 false,
2294 TimeRange {
2295 start: Timestamp::new_millisecond(-1),
2296 end: Timestamp::new_millisecond(4),
2297 },
2298 Ok((1, 5)),
2299 ),
2300 (
2301 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2302 false,
2303 TimeRange {
2304 start: Timestamp::new_millisecond(2),
2305 end: Timestamp::new_millisecond(5),
2306 },
2307 Ok((1, 2)),
2308 ),
2309 (
2310 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2311 false,
2312 TimeRange {
2313 start: Timestamp::new_millisecond(2),
2314 end: Timestamp::new_millisecond(5),
2315 },
2316 Ok((1, 3)),
2317 ),
2318 (
2319 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2320 false,
2321 TimeRange {
2322 start: Timestamp::new_millisecond(2),
2323 end: Timestamp::new_millisecond(5),
2324 },
2325 Ok((1, 2)),
2326 ),
2327 (
2328 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2329 false,
2330 TimeRange {
2331 start: Timestamp::new_millisecond(6),
2332 end: Timestamp::new_millisecond(7),
2333 },
2334 Ok((5, 0)),
2335 ),
2336 (
2338 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2339 true,
2340 TimeRange {
2341 end: Timestamp::new_millisecond(4),
2342 start: Timestamp::new_millisecond(1),
2343 },
2344 Ok((1, 3)),
2345 ),
2346 (
2347 Arc::new(TimestampMillisecondArray::from_iter_values([
2348 5, 4, 3, 2, 1, 0,
2349 ])) as ArrayRef,
2350 true,
2351 TimeRange {
2352 end: Timestamp::new_millisecond(4),
2353 start: Timestamp::new_millisecond(1),
2354 },
2355 Ok((2, 3)),
2356 ),
2357 (
2358 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2359 true,
2360 TimeRange {
2361 end: Timestamp::new_millisecond(4),
2362 start: Timestamp::new_millisecond(1),
2363 },
2364 Ok((1, 2)),
2365 ),
2366 (
2367 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2368 true,
2369 TimeRange {
2370 end: Timestamp::new_millisecond(4),
2371 start: Timestamp::new_millisecond(1),
2372 },
2373 Ok((2, 2)),
2374 ),
2375 (
2376 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2377 true,
2378 TimeRange {
2379 end: Timestamp::new_millisecond(5),
2380 start: Timestamp::new_millisecond(2),
2381 },
2382 Ok((1, 3)),
2383 ),
2384 (
2385 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2386 true,
2387 TimeRange {
2388 end: Timestamp::new_millisecond(5),
2389 start: Timestamp::new_millisecond(2),
2390 },
2391 Ok((1, 2)),
2392 ),
2393 (
2394 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2395 true,
2396 TimeRange {
2397 end: Timestamp::new_millisecond(5),
2398 start: Timestamp::new_millisecond(2),
2399 },
2400 Ok((1, 3)),
2401 ),
2402 (
2403 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2404 true,
2405 TimeRange {
2406 end: Timestamp::new_millisecond(5),
2407 start: Timestamp::new_millisecond(2),
2408 },
2409 Ok((1, 2)),
2410 ),
2411 (
2412 Arc::new(TimestampMillisecondArray::from_iter_values([
2413 10, 9, 8, 7, 6,
2414 ])) as ArrayRef,
2415 true,
2416 TimeRange {
2417 end: Timestamp::new_millisecond(5),
2418 start: Timestamp::new_millisecond(2),
2419 },
2420 Ok((5, 0)),
2421 ),
2422 (
2424 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2425 true,
2426 TimeRange {
2427 end: Timestamp::new_millisecond(4),
2428 start: Timestamp::new_millisecond(3),
2429 },
2430 Ok((0, 1)),
2431 ),
2432 (
2433 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2434 true,
2435 TimeRange {
2436 end: Timestamp::new_millisecond(4),
2437 start: Timestamp::new_millisecond(3),
2438 },
2439 Ok((1, 1)),
2440 ),
2441 (
2442 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2443 true,
2444 TimeRange {
2445 end: Timestamp::new_millisecond(4),
2446 start: Timestamp::new_millisecond(3),
2447 },
2448 Ok((2, 1)),
2449 ),
2450 ];
2451
2452 for (sort_vals, descending, range, expected) in test_cases {
2453 let sort_column = SortColumn {
2454 values: sort_vals,
2455 options: Some(SortOptions {
2456 descending,
2457 ..Default::default()
2458 }),
2459 };
2460 let ret = find_slice_from_range(&sort_column, &range);
2461 match (ret, expected) {
2462 (Ok(ret), Ok(expected)) => {
2463 assert_eq!(
2464 ret, expected,
2465 "sort_vals: {:?}, range: {:?}",
2466 sort_column, range
2467 )
2468 }
2469 (Err(err), Err(expected)) => {
2470 let expected: &str = expected;
2471 assert!(
2472 err.to_string().contains(expected),
2473 "err: {:?}, expected: {:?}",
2474 err,
2475 expected
2476 );
2477 }
2478 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2479 }
2480 }
2481 }
2482
2483 #[derive(Debug)]
2484 struct TestStream {
2485 expression: PhysicalSortExpr,
2486 fetch: Option<usize>,
2487 input: Vec<(PartitionRange, DfRecordBatch)>,
2488 output: Vec<DfRecordBatch>,
2489 schema: SchemaRef,
2490 }
2491 use datafusion::physical_plan::expressions::Column;
2492 impl TestStream {
2493 fn new(
2494 ts_col: Column,
2495 opt: SortOptions,
2496 fetch: Option<usize>,
2497 schema: impl Into<arrow_schema::Fields>,
2498 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2499 expected: Vec<Vec<ArrayRef>>,
2500 ) -> Self {
2501 let expression = PhysicalSortExpr {
2502 expr: Arc::new(ts_col),
2503 options: opt,
2504 };
2505 let schema = Schema::new(schema.into());
2506 let schema = Arc::new(schema);
2507 let input = input
2508 .into_iter()
2509 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2510 .collect_vec();
2511 let output_batchs = expected
2512 .into_iter()
2513 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2514 .collect_vec();
2515 Self {
2516 expression,
2517 fetch,
2518 input,
2519 output: output_batchs,
2520 schema,
2521 }
2522 }
2523
2524 async fn run_test(&self) -> Vec<DfRecordBatch> {
2525 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2526
2527 let mock_input = MockInputExec::new(batches, self.schema.clone());
2528
2529 let exec = WindowedSortExec::try_new(
2530 self.expression.clone(),
2531 self.fetch,
2532 vec![ranges],
2533 Arc::new(mock_input),
2534 )
2535 .unwrap();
2536
2537 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2538
2539 let real_output = exec_stream.collect::<Vec<_>>().await;
2540 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2541 real_output
2542 }
2543 }
2544
2545 #[tokio::test]
2546 async fn test_window_sort_stream() {
2547 let test_cases = [
2548 TestStream::new(
2549 Column::new("ts", 0),
2550 SortOptions {
2551 descending: false,
2552 nulls_first: true,
2553 },
2554 None,
2555 vec![Field::new(
2556 "ts",
2557 DataType::Timestamp(TimeUnit::Millisecond, None),
2558 false,
2559 )],
2560 vec![],
2561 vec![],
2562 ),
2563 TestStream::new(
2564 Column::new("ts", 0),
2565 SortOptions {
2566 descending: false,
2567 nulls_first: true,
2568 },
2569 None,
2570 vec![Field::new(
2571 "ts",
2572 DataType::Timestamp(TimeUnit::Millisecond, None),
2573 false,
2574 )],
2575 vec![
2576 (
2578 PartitionRange {
2579 start: Timestamp::new_millisecond(1),
2580 end: Timestamp::new_millisecond(2),
2581 num_rows: 1,
2582 identifier: 0,
2583 },
2584 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2585 ),
2586 (
2587 PartitionRange {
2588 start: Timestamp::new_millisecond(1),
2589 end: Timestamp::new_millisecond(3),
2590 num_rows: 1,
2591 identifier: 0,
2592 },
2593 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2594 ),
2595 ],
2596 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2597 [2],
2598 ))]],
2599 ),
2600 TestStream::new(
2601 Column::new("ts", 0),
2602 SortOptions {
2603 descending: false,
2604 nulls_first: true,
2605 },
2606 None,
2607 vec![Field::new(
2608 "ts",
2609 DataType::Timestamp(TimeUnit::Millisecond, None),
2610 false,
2611 )],
2612 vec![
2613 (
2615 PartitionRange {
2616 start: Timestamp::new_millisecond(1),
2617 end: Timestamp::new_millisecond(2),
2618 num_rows: 1,
2619 identifier: 0,
2620 },
2621 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2622 ),
2623 (
2624 PartitionRange {
2625 start: Timestamp::new_millisecond(1),
2626 end: Timestamp::new_millisecond(3),
2627 num_rows: 1,
2628 identifier: 0,
2629 },
2630 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2631 ),
2632 ],
2633 vec![],
2634 ),
2635 TestStream::new(
2636 Column::new("ts", 0),
2637 SortOptions {
2638 descending: false,
2639 nulls_first: true,
2640 },
2641 None,
2642 vec![Field::new(
2643 "ts",
2644 DataType::Timestamp(TimeUnit::Millisecond, None),
2645 false,
2646 )],
2647 vec![
2648 (
2651 PartitionRange {
2652 start: Timestamp::new_millisecond(1),
2653 end: Timestamp::new_millisecond(2),
2654 num_rows: 1,
2655 identifier: 0,
2656 },
2657 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2658 ),
2659 (
2660 PartitionRange {
2661 start: Timestamp::new_millisecond(1),
2662 end: Timestamp::new_millisecond(3),
2663 num_rows: 1,
2664 identifier: 0,
2665 },
2666 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2667 ),
2668 ],
2669 vec![
2670 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2671 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2672 ],
2673 ),
2674 TestStream::new(
2675 Column::new("ts", 0),
2676 SortOptions {
2677 descending: false,
2678 nulls_first: true,
2679 },
2680 None,
2681 vec![Field::new(
2682 "ts",
2683 DataType::Timestamp(TimeUnit::Millisecond, None),
2684 false,
2685 )],
2686 vec![
2687 (
2689 PartitionRange {
2690 start: Timestamp::new_millisecond(1),
2691 end: Timestamp::new_millisecond(3),
2692 num_rows: 1,
2693 identifier: 0,
2694 },
2695 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2696 1, 2,
2697 ]))],
2698 ),
2699 (
2700 PartitionRange {
2701 start: Timestamp::new_millisecond(1),
2702 end: Timestamp::new_millisecond(4),
2703 num_rows: 1,
2704 identifier: 0,
2705 },
2706 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2707 2, 3,
2708 ]))],
2709 ),
2710 ],
2711 vec![
2712 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2713 1, 2,
2714 ]))],
2715 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2717 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2718 ],
2719 ),
2720 TestStream::new(
2721 Column::new("ts", 0),
2722 SortOptions {
2723 descending: false,
2724 nulls_first: true,
2725 },
2726 None,
2727 vec![Field::new(
2728 "ts",
2729 DataType::Timestamp(TimeUnit::Millisecond, None),
2730 false,
2731 )],
2732 vec![
2733 (
2735 PartitionRange {
2736 start: Timestamp::new_millisecond(1),
2737 end: Timestamp::new_millisecond(3),
2738 num_rows: 1,
2739 identifier: 0,
2740 },
2741 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2742 1, 2,
2743 ]))],
2744 ),
2745 (
2746 PartitionRange {
2747 start: Timestamp::new_millisecond(1),
2748 end: Timestamp::new_millisecond(4),
2749 num_rows: 1,
2750 identifier: 1,
2751 },
2752 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2753 1, 2, 3,
2754 ]))],
2755 ),
2756 ],
2757 vec![
2758 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2759 1, 1, 2, 2,
2760 ]))],
2761 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2762 ],
2763 ),
2764 TestStream::new(
2765 Column::new("ts", 0),
2766 SortOptions {
2767 descending: false,
2768 nulls_first: true,
2769 },
2770 None,
2771 vec![Field::new(
2772 "ts",
2773 DataType::Timestamp(TimeUnit::Millisecond, None),
2774 false,
2775 )],
2776 vec![
2777 (
2779 PartitionRange {
2780 start: Timestamp::new_millisecond(1),
2781 end: Timestamp::new_millisecond(3),
2782 num_rows: 1,
2783 identifier: 0,
2784 },
2785 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2786 1, 2,
2787 ]))],
2788 ),
2789 (
2790 PartitionRange {
2791 start: Timestamp::new_millisecond(1),
2792 end: Timestamp::new_millisecond(4),
2793 num_rows: 1,
2794 identifier: 1,
2795 },
2796 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2797 1, 2, 3,
2798 ]))],
2799 ),
2800 (
2801 PartitionRange {
2802 start: Timestamp::new_millisecond(4),
2803 end: Timestamp::new_millisecond(6),
2804 num_rows: 1,
2805 identifier: 1,
2806 },
2807 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2808 4, 5,
2809 ]))],
2810 ),
2811 ],
2812 vec![
2813 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2814 1, 1, 2, 2,
2815 ]))],
2816 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2817 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2818 4, 5,
2819 ]))],
2820 ],
2821 ),
2822 TestStream::new(
2824 Column::new("ts", 0),
2825 SortOptions {
2826 descending: false,
2827 nulls_first: true,
2828 },
2829 Some(6),
2830 vec![Field::new(
2831 "ts",
2832 DataType::Timestamp(TimeUnit::Millisecond, None),
2833 false,
2834 )],
2835 vec![
2836 (
2838 PartitionRange {
2839 start: Timestamp::new_millisecond(1),
2840 end: Timestamp::new_millisecond(3),
2841 num_rows: 1,
2842 identifier: 0,
2843 },
2844 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2845 1, 2,
2846 ]))],
2847 ),
2848 (
2849 PartitionRange {
2850 start: Timestamp::new_millisecond(1),
2851 end: Timestamp::new_millisecond(4),
2852 num_rows: 1,
2853 identifier: 1,
2854 },
2855 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2856 1, 2, 3,
2857 ]))],
2858 ),
2859 (
2860 PartitionRange {
2861 start: Timestamp::new_millisecond(3),
2862 end: Timestamp::new_millisecond(6),
2863 num_rows: 1,
2864 identifier: 1,
2865 },
2866 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2867 4, 5,
2868 ]))],
2869 ),
2870 ],
2871 vec![
2872 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2873 1, 1, 2, 2,
2874 ]))],
2875 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2876 vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2877 ],
2878 ),
2879 TestStream::new(
2880 Column::new("ts", 0),
2881 SortOptions {
2882 descending: false,
2883 nulls_first: true,
2884 },
2885 Some(3),
2886 vec![Field::new(
2887 "ts",
2888 DataType::Timestamp(TimeUnit::Millisecond, None),
2889 false,
2890 )],
2891 vec![
2892 (
2894 PartitionRange {
2895 start: Timestamp::new_millisecond(1),
2896 end: Timestamp::new_millisecond(3),
2897 num_rows: 1,
2898 identifier: 0,
2899 },
2900 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2901 1, 2,
2902 ]))],
2903 ),
2904 (
2905 PartitionRange {
2906 start: Timestamp::new_millisecond(1),
2907 end: Timestamp::new_millisecond(4),
2908 num_rows: 1,
2909 identifier: 1,
2910 },
2911 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2912 1, 2, 3,
2913 ]))],
2914 ),
2915 (
2916 PartitionRange {
2917 start: Timestamp::new_millisecond(3),
2918 end: Timestamp::new_millisecond(6),
2919 num_rows: 1,
2920 identifier: 1,
2921 },
2922 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2923 4, 5,
2924 ]))],
2925 ),
2926 ],
2927 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2928 [1, 1, 2],
2929 ))]],
2930 ),
2931 TestStream::new(
2933 Column::new("ts", 0),
2934 SortOptions {
2935 descending: true,
2936 nulls_first: true,
2937 },
2938 None,
2939 vec![Field::new(
2940 "ts",
2941 DataType::Timestamp(TimeUnit::Millisecond, None),
2942 false,
2943 )],
2944 vec![
2945 (
2947 PartitionRange {
2948 start: Timestamp::new_millisecond(3),
2949 end: Timestamp::new_millisecond(6),
2950 num_rows: 1,
2951 identifier: 1,
2952 },
2953 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2954 5, 4,
2955 ]))],
2956 ),
2957 (
2958 PartitionRange {
2959 start: Timestamp::new_millisecond(1),
2960 end: Timestamp::new_millisecond(4),
2961 num_rows: 1,
2962 identifier: 1,
2963 },
2964 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2965 3, 2, 1,
2966 ]))],
2967 ),
2968 (
2969 PartitionRange {
2970 start: Timestamp::new_millisecond(1),
2971 end: Timestamp::new_millisecond(3),
2972 num_rows: 1,
2973 identifier: 0,
2974 },
2975 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2976 2, 1,
2977 ]))],
2978 ),
2979 ],
2980 vec![
2981 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2982 5, 4,
2983 ]))],
2984 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2985 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2986 2, 2, 1, 1,
2987 ]))],
2988 ],
2989 ),
2990 TestStream::new(
2991 Column::new("ts", 0),
2992 SortOptions {
2993 descending: false,
2994 nulls_first: true,
2995 },
2996 None,
2997 vec![Field::new(
2998 "ts",
2999 DataType::Timestamp(TimeUnit::Millisecond, None),
3000 false,
3001 )],
3002 vec![
3003 (
3005 PartitionRange {
3006 start: Timestamp::new_millisecond(1),
3007 end: Timestamp::new_millisecond(10),
3008 num_rows: 1,
3009 identifier: 0,
3010 },
3011 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3012 1, 5, 9,
3013 ]))],
3014 ),
3015 (
3016 PartitionRange {
3017 start: Timestamp::new_millisecond(3),
3018 end: Timestamp::new_millisecond(7),
3019 num_rows: 1,
3020 identifier: 1,
3021 },
3022 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3023 3, 4, 5, 6,
3024 ]))],
3025 ),
3026 ],
3027 vec![
3028 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3029 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3030 3, 4, 5, 5, 6, 9,
3031 ]))],
3032 ],
3033 ),
3034 TestStream::new(
3035 Column::new("ts", 0),
3036 SortOptions {
3037 descending: false,
3038 nulls_first: true,
3039 },
3040 None,
3041 vec![Field::new(
3042 "ts",
3043 DataType::Timestamp(TimeUnit::Millisecond, None),
3044 false,
3045 )],
3046 vec![
3047 (
3049 PartitionRange {
3050 start: Timestamp::new_millisecond(1),
3051 end: Timestamp::new_millisecond(3),
3052 num_rows: 1,
3053 identifier: 0,
3054 },
3055 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3056 1, 2,
3057 ]))],
3058 ),
3059 (
3060 PartitionRange {
3061 start: Timestamp::new_millisecond(1),
3062 end: Timestamp::new_millisecond(10),
3063 num_rows: 1,
3064 identifier: 1,
3065 },
3066 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3067 1, 3, 4, 5, 6, 8,
3068 ]))],
3069 ),
3070 (
3071 PartitionRange {
3072 start: Timestamp::new_millisecond(7),
3073 end: Timestamp::new_millisecond(10),
3074 num_rows: 1,
3075 identifier: 1,
3076 },
3077 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3078 7, 8, 9,
3079 ]))],
3080 ),
3081 ],
3082 vec![
3083 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3084 1, 1, 2,
3085 ]))],
3086 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3087 3, 4, 5, 6,
3088 ]))],
3089 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3090 7, 8, 8, 9,
3091 ]))],
3092 ],
3093 ),
3094 TestStream::new(
3095 Column::new("ts", 0),
3096 SortOptions {
3097 descending: false,
3098 nulls_first: true,
3099 },
3100 None,
3101 vec![Field::new(
3102 "ts",
3103 DataType::Timestamp(TimeUnit::Millisecond, None),
3104 false,
3105 )],
3106 vec![
3107 (
3109 PartitionRange {
3110 start: Timestamp::new_millisecond(1),
3111 end: Timestamp::new_millisecond(11),
3112 num_rows: 1,
3113 identifier: 0,
3114 },
3115 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3116 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3117 ]))],
3118 ),
3119 (
3120 PartitionRange {
3121 start: Timestamp::new_millisecond(5),
3122 end: Timestamp::new_millisecond(7),
3123 num_rows: 1,
3124 identifier: 1,
3125 },
3126 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3127 5, 6,
3128 ]))],
3129 ),
3130 ],
3131 vec![
3132 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3133 1, 2, 3, 4,
3134 ]))],
3135 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3136 5, 5, 6, 6, 7, 8, 9, 10,
3137 ]))],
3138 ],
3139 ),
3140 ];
3141
3142 let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3143
3144 for (idx, testcase) in &indexed_test_cases {
3145 let output = testcase.run_test().await;
3146 assert_eq!(output, testcase.output, "case {idx} failed.");
3147 }
3148 }
3149
3150 #[tokio::test]
3151 async fn fuzzy_ish_test_window_sort_stream() {
3152 let test_cnt = 100;
3153 let part_cnt_bound = 100;
3154 let range_size_bound = 100;
3155 let range_offset_bound = 100;
3156 let in_range_datapoint_cnt_bound = 100;
3157 let fetch_bound = 100;
3158
3159 let mut rng = fastrand::Rng::new();
3160 let rng_seed = rng.u64(..);
3161 rng.seed(rng_seed);
3162 let mut bound_val = None;
3163 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3165 let mut full_testcase_list = Vec::new();
3166 for _case_id in 0..test_cnt {
3167 let descending = rng.bool();
3168 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3169 if descending {
3170 return Box::new(|a: &T, b: &T| b.cmp(a));
3171 }
3172 Box::new(|a: &T, b: &T| a.cmp(b))
3173 }
3174 let unit = match rng.u8(0..3) {
3175 0 => TimeUnit::Second,
3176 1 => TimeUnit::Millisecond,
3177 2 => TimeUnit::Microsecond,
3178 _ => TimeUnit::Nanosecond,
3179 };
3180 let fetch = if rng.bool() {
3181 Some(rng.usize(0..fetch_bound))
3182 } else {
3183 None
3184 };
3185
3186 let mut input_ranged_data = vec![];
3187 let mut output_data: Vec<i64> = vec![];
3188 for part_id in 0..rng.usize(0..part_cnt_bound) {
3190 let (start, end) = if descending {
3191 let end = bound_val
3192 .map(|i| i - rng.i64(0..range_offset_bound))
3193 .unwrap_or_else(|| rng.i64(..));
3194 bound_val = Some(end);
3195 let start = end - rng.i64(1..range_size_bound);
3196 let start = Timestamp::new(start, unit.into());
3197 let end = Timestamp::new(end, unit.into());
3198 (start, end)
3199 } else {
3200 let start = bound_val
3201 .map(|i| i + rng.i64(0..range_offset_bound))
3202 .unwrap_or_else(|| rng.i64(..));
3203 bound_val = Some(start);
3204 let end = start + rng.i64(1..range_size_bound);
3205 let start = Timestamp::new(start, unit.into());
3206 let end = Timestamp::new(end, unit.into());
3207 (start, end)
3208 };
3209
3210 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3211 let data_gen = iter
3212 .map(|_| rng.i64(start.value()..end.value()))
3213 .sorted_by(ret_cmp_fn(descending))
3214 .collect_vec();
3215 output_data.extend(data_gen.clone());
3216 let arr = new_ts_array(unit, data_gen);
3217 let range = PartitionRange {
3218 start,
3219 end,
3220 num_rows: arr.len(),
3221 identifier: part_id,
3222 };
3223 input_ranged_data.push((range, vec![arr]));
3224 }
3225
3226 output_data.sort_by(ret_cmp_fn(descending));
3227 if let Some(fetch) = fetch {
3228 output_data.truncate(fetch);
3229 }
3230 let output_arr = new_ts_array(unit, output_data);
3231
3232 let test_stream = TestStream::new(
3233 Column::new("ts", 0),
3234 SortOptions {
3235 descending,
3236 nulls_first: true,
3237 },
3238 fetch,
3239 vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3240 input_ranged_data.clone(),
3241 vec![vec![output_arr]],
3242 );
3243 full_testcase_list.push(test_stream);
3244 }
3245
3246 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3247 let res = test_stream.run_test().await;
3248 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3249 let expected = test_stream.output;
3250 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3251
3252 if res_concat != expected_concat {
3253 {
3254 let mut f_input = std::io::stderr();
3255 f_input.write_all(b"[").unwrap();
3256 for (input_range, input_arr) in test_stream.input {
3257 let range_json = json!({
3258 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3259 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3260 "num_rows": input_range.num_rows,
3261 "identifier": input_range.identifier,
3262 });
3263 let buf = Vec::new();
3264 let mut input_writer = ArrayWriter::new(buf);
3265 input_writer.write(&input_arr).unwrap();
3266 input_writer.finish().unwrap();
3267 let res_str =
3268 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3269 let whole_json =
3270 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3271 f_input.write_all(whole_json.as_bytes()).unwrap();
3272 }
3273 f_input.write_all(b"]").unwrap();
3274 }
3275 {
3276 let mut f_res = std::io::stderr();
3277 f_res.write_all(b"[").unwrap();
3278 for batch in &res {
3279 let mut res_writer = ArrayWriter::new(f_res);
3280 res_writer.write(batch).unwrap();
3281 res_writer.finish().unwrap();
3282 f_res = res_writer.into_inner();
3283 f_res.write_all(b",").unwrap();
3284 }
3285 f_res.write_all(b"]").unwrap();
3286
3287 let f_res_concat = std::io::stderr();
3288 let mut res_writer = ArrayWriter::new(f_res_concat);
3289 res_writer.write(&res_concat).unwrap();
3290 res_writer.finish().unwrap();
3291
3292 let f_expected = std::io::stderr();
3293 let mut expected_writer = ArrayWriter::new(f_expected);
3294 expected_writer.write(&expected_concat).unwrap();
3295 expected_writer.finish().unwrap();
3296 }
3297 panic!(
3298 "case failed, case id: {0}, output and expected output to stderr",
3299 case_id
3300 );
3301 }
3302 assert_eq!(
3303 res_concat, expected_concat,
3304 "case failed, case id: {}, rng seed: {}",
3305 case_id, rng_seed
3306 );
3307 }
3308 }
3309}