1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use arrow::array::{Array, ArrayRef};
25use arrow::compute::SortColumn;
26use arrow_schema::{DataType, SchemaRef, SortOptions};
27use common_error::ext::{BoxedError, PlainError};
28use common_error::status_code::StatusCode;
29use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
30use common_telemetry::error;
31use common_time::Timestamp;
32use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
33use datafusion::execution::{RecordBatchStream, TaskContext};
34use datafusion::physical_plan::memory::MemoryStream;
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
37use datafusion::physical_plan::{
38 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
39};
40use datafusion_common::utils::bisect;
41use datafusion_common::{internal_err, DataFusionError};
42use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
43use datatypes::value::Value;
44use futures::Stream;
45use itertools::Itertools;
46use snafu::ResultExt;
47use store_api::region_engine::PartitionRange;
48
49use crate::error::{QueryExecutionSnafu, Result};
50
51#[derive(Debug, Clone)]
66pub struct WindowedSortExec {
67 expression: PhysicalSortExpr,
69 fetch: Option<usize>,
71 ranges: Vec<Vec<PartitionRange>>,
75 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
80 input: Arc<dyn ExecutionPlan>,
81 metrics: ExecutionPlanMetricsSet,
83 properties: PlanProperties,
84}
85
86fn check_partition_range_monotonicity(
87 ranges: &[Vec<PartitionRange>],
88 descending: bool,
89) -> Result<()> {
90 let is_valid = ranges.iter().all(|r| {
91 if descending {
92 r.windows(2).all(|w| w[0].end >= w[1].end)
93 } else {
94 r.windows(2).all(|w| w[0].start <= w[1].start)
95 }
96 });
97
98 if !is_valid {
99 let msg = if descending {
100 "Input `PartitionRange`s's upper bound is not monotonic non-increase"
101 } else {
102 "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
103 };
104 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
105 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
106 } else {
107 Ok(())
108 }
109}
110
111impl WindowedSortExec {
112 pub fn try_new(
113 expression: PhysicalSortExpr,
114 fetch: Option<usize>,
115 ranges: Vec<Vec<PartitionRange>>,
116 input: Arc<dyn ExecutionPlan>,
117 ) -> Result<Self> {
118 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
119
120 let properties = input.properties();
121 let properties = PlanProperties::new(
122 input
123 .equivalence_properties()
124 .clone()
125 .with_reorder(LexOrdering::new(vec![expression.clone()])),
126 input.output_partitioning().clone(),
127 properties.emission_type,
128 properties.boundedness,
129 );
130
131 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
132 for r in &ranges {
133 let overlap_counts = split_overlapping_ranges(r);
134 let working_ranges =
135 compute_all_working_ranges(&overlap_counts, expression.options.descending);
136 all_avail_working_range.push(working_ranges);
137 }
138
139 Ok(Self {
140 expression,
141 fetch,
142 ranges,
143 all_avail_working_range,
144 input,
145 metrics: ExecutionPlanMetricsSet::new(),
146 properties,
147 })
148 }
149
150 pub fn to_stream(
154 &self,
155 context: Arc<TaskContext>,
156 partition: usize,
157 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
158 let input_stream: DfSendableRecordBatchStream =
159 self.input.execute(partition, context.clone())?;
160
161 let df_stream = Box::pin(WindowedSortStream::new(
162 context,
163 self,
164 input_stream,
165 partition,
166 )) as _;
167
168 Ok(df_stream)
169 }
170}
171
172impl DisplayAs for WindowedSortExec {
173 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 write!(
175 f,
176 "WindowedSortExec: expr={} num_ranges={}",
177 self.expression,
178 self.ranges.len()
179 )?;
180 if let Some(fetch) = self.fetch {
181 write!(f, " fetch={}", fetch)?;
182 }
183 Ok(())
184 }
185}
186
187impl ExecutionPlan for WindowedSortExec {
188 fn as_any(&self) -> &dyn Any {
189 self
190 }
191
192 fn schema(&self) -> SchemaRef {
193 self.input.schema()
194 }
195
196 fn properties(&self) -> &PlanProperties {
197 &self.properties
198 }
199
200 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
201 vec![&self.input]
202 }
203
204 fn with_new_children(
205 self: Arc<Self>,
206 children: Vec<Arc<dyn ExecutionPlan>>,
207 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
208 let new_input = if let Some(first) = children.first() {
209 first
210 } else {
211 internal_err!("No children found")?
212 };
213 let new = Self::try_new(
214 self.expression.clone(),
215 self.fetch,
216 self.ranges.clone(),
217 new_input.clone(),
218 )?;
219 Ok(Arc::new(new))
220 }
221
222 fn execute(
223 &self,
224 partition: usize,
225 context: Arc<TaskContext>,
226 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
227 self.to_stream(context, partition)
228 }
229
230 fn metrics(&self) -> Option<MetricsSet> {
231 Some(self.metrics.clone_inner())
232 }
233
234 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
240 vec![false; self.ranges.len()]
241 }
242
243 fn name(&self) -> &str {
244 "WindowedSortExec"
245 }
246}
247
248pub struct WindowedSortStream {
255 memory_pool: Arc<dyn MemoryPool>,
257 in_progress: Vec<DfRecordBatch>,
259 last_value: Option<Timestamp>,
261 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
263 merge_stream: VecDeque<DfSendableRecordBatchStream>,
265 merge_count: usize,
267 working_idx: usize,
269 input: DfSendableRecordBatchStream,
271 is_terminated: bool,
273 schema: SchemaRef,
275 expression: PhysicalSortExpr,
277 fetch: Option<usize>,
279 produced: usize,
281 batch_size: usize,
283 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
287 #[allow(dead_code)] ranges: Vec<PartitionRange>,
290 metrics: BaselineMetrics,
292}
293
294impl WindowedSortStream {
295 pub fn new(
296 context: Arc<TaskContext>,
297 exec: &WindowedSortExec,
298 input: DfSendableRecordBatchStream,
299 partition: usize,
300 ) -> Self {
301 Self {
302 memory_pool: context.runtime_env().memory_pool.clone(),
303 in_progress: Vec::new(),
304 last_value: None,
305 sorted_input_runs: Vec::new(),
306 merge_stream: VecDeque::new(),
307 merge_count: 0,
308 working_idx: 0,
309 schema: input.schema(),
310 input,
311 is_terminated: false,
312 expression: exec.expression.clone(),
313 fetch: exec.fetch,
314 produced: 0,
315 batch_size: context.session_config().batch_size(),
316 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
317 ranges: exec.ranges[partition].clone(),
318 metrics: BaselineMetrics::new(&exec.metrics, partition),
319 }
320 }
321}
322
323impl WindowedSortStream {
324 #[cfg(debug_assertions)]
325 fn check_subset_ranges(&self, cur_range: &TimeRange) {
326 let cur_is_subset_to = self
327 .ranges
328 .iter()
329 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
330 .collect_vec();
331 if cur_is_subset_to.is_empty() {
332 error!("Current range is not a subset of any PartitionRange");
333 let subset_ranges = self
335 .ranges
336 .iter()
337 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
338 .collect_vec();
339 let only_overlap = self
340 .ranges
341 .iter()
342 .filter(|r| {
343 let r = TimeRange::from(*r);
344 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
345 })
346 .collect_vec();
347 error!(
348 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
349 subset_ranges.len(),
350 only_overlap.len(),
351 subset_ranges,
352 only_overlap
353 );
354 } else {
355 let only_overlap = self
356 .ranges
357 .iter()
358 .filter(|r| {
359 let r = TimeRange::from(*r);
360 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
361 })
362 .collect_vec();
363 error!(
364 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
365 cur_is_subset_to.len(),
366 only_overlap.len(),
367 cur_is_subset_to,
368 only_overlap
369 );
370 }
371 let all_overlap_working_range = self
372 .all_avail_working_range
373 .iter()
374 .filter(|(range, _)| range.is_overlapping(cur_range))
375 .map(|(range, _)| range)
376 .collect_vec();
377 error!(
378 "Found {} working ranges that overlap with current range: {:?}",
379 all_overlap_working_range.len(),
380 all_overlap_working_range
381 );
382 }
383
384 fn poll_result_stream(
386 &mut self,
387 cx: &mut Context<'_>,
388 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
389 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
390 match merge_stream.as_mut().poll_next(cx) {
391 Poll::Ready(Some(Ok(batch))) => {
392 let ret = if let Some(remaining) = self.remaining_fetch() {
393 if remaining == 0 {
394 self.is_terminated = true;
395 None
396 } else if remaining < batch.num_rows() {
397 self.produced += remaining;
398 Some(Ok(batch.slice(0, remaining)))
399 } else {
400 self.produced += batch.num_rows();
401 Some(Ok(batch))
402 }
403 } else {
404 self.produced += batch.num_rows();
405 Some(Ok(batch))
406 };
407 return Poll::Ready(ret);
408 }
409 Poll::Ready(Some(Err(e))) => {
410 return Poll::Ready(Some(Err(e)));
411 }
412 Poll::Ready(None) => {
413 self.merge_stream.pop_front();
416 continue;
417 }
418 Poll::Pending => {
419 return Poll::Pending;
420 }
421 }
422 }
423 Poll::Ready(None)
425 }
426
427 pub fn poll_next_inner(
431 mut self: Pin<&mut Self>,
432 cx: &mut Context<'_>,
433 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
434 match self.poll_result_stream(cx) {
436 Poll::Ready(None) => {
437 if self.is_terminated {
438 return Poll::Ready(None);
439 }
440 }
441 x => return x,
442 };
443
444 while !self.is_terminated {
446 let new_input_rbs = match self.input.as_mut().poll_next(cx) {
448 Poll::Ready(Some(Ok(batch))) => {
449 Some(split_batch_to_sorted_run(batch, &self.expression)?)
450 }
451 Poll::Ready(Some(Err(e))) => {
452 return Poll::Ready(Some(Err(e)));
453 }
454 Poll::Ready(None) => {
455 self.is_terminated = true;
457 None
458 }
459 Poll::Pending => return Poll::Pending,
460 };
461
462 let Some(SortedRunSet {
463 runs_with_batch,
464 sort_column,
465 }) = new_input_rbs
466 else {
467 self.build_sorted_stream()?;
469 self.start_new_merge_sort()?;
470 continue;
471 };
472 let mut last_remaining = None;
478 let mut run_iter = runs_with_batch.into_iter();
479 loop {
480 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
481 break;
482 };
483 if sorted_rb.num_rows() == 0 {
484 continue;
485 }
486 let Some(cur_range) = run_info.get_time_range() else {
488 internal_err!("Found NULL in time index column")?
489 };
490 let Some(working_range) = self.get_working_range() else {
491 internal_err!("No working range found")?
492 };
493
494 if sort_column.options.unwrap_or_default().descending {
496 if cur_range.end > working_range.end {
497 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
498 #[cfg(debug_assertions)]
499 self.check_subset_ranges(&cur_range);
500 internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
501 }
502 } else if cur_range.start < working_range.start {
503 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
504 #[cfg(debug_assertions)]
505 self.check_subset_ranges(&cur_range);
506 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
507 }
508
509 if cur_range.is_subset(&working_range) {
510 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
513 } else if let Some(intersection) = cur_range.intersection(&working_range) {
514 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
516 let (offset, len) = find_slice_from_range(
517 &SortColumn {
518 values: cur_sort_column.clone(),
519 options: sort_column.options,
520 },
521 &intersection,
522 )?;
523
524 if offset != 0 {
525 internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
526 }
527
528 let sliced_rb = sorted_rb.slice(offset, len);
529
530 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
532 self.build_sorted_stream()?;
534
535 self.start_new_merge_sort()?;
537
538 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
539 if r_len != 0 {
540 let remaining_rb = sorted_rb.slice(r_offset, r_len);
542 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
543 let new_run_info = SucRun {
544 offset: run_info.offset + r_offset,
545 len: r_len,
546 first_val: new_first_val,
547 last_val: run_info.last_val,
548 };
549 last_remaining = Some((remaining_rb, new_run_info));
550 }
551 } else {
557 self.build_sorted_stream()?;
560 self.start_new_merge_sort()?;
561
562 last_remaining = Some((sorted_rb, run_info));
564 }
565 }
566 }
567 self.poll_result_stream(cx)
569 }
570
571 fn push_batch(&mut self, batch: DfRecordBatch) {
572 self.in_progress.push(batch);
573 }
574
575 fn try_concat_batch(
579 &mut self,
580 batch: DfRecordBatch,
581 run_info: &SucRun<Timestamp>,
582 opt: Option<SortOptions>,
583 ) -> datafusion_common::Result<()> {
584 let is_ok_to_concat =
585 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
586
587 if is_ok_to_concat {
588 self.push_batch(batch);
589 } else {
591 self.build_sorted_stream()?;
593 self.push_batch(batch);
594 }
595 self.last_value = run_info.last_val;
596 Ok(())
597 }
598
599 fn get_working_range(&self) -> Option<TimeRange> {
601 self.all_avail_working_range
602 .get(self.working_idx)
603 .map(|(range, _)| *range)
604 }
605
606 fn set_next_working_range(&mut self) {
608 self.working_idx += 1;
609 }
610
611 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
613 if self.in_progress.is_empty() {
614 return Ok(());
615 }
616 let data = std::mem::take(&mut self.in_progress);
617
618 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
619 self.sorted_input_runs.push(Box::pin(new_stream));
620 Ok(())
621 }
622
623 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
625 if !self.in_progress.is_empty() {
626 return internal_err!("Starting a merge sort when in_progress is not empty")?;
627 }
628
629 self.set_next_working_range();
630
631 let streams = std::mem::take(&mut self.sorted_input_runs);
632 if streams.is_empty() {
633 return Ok(());
634 } else if streams.len() == 1 {
635 self.merge_stream
636 .push_back(streams.into_iter().next().unwrap());
637 return Ok(());
638 }
639
640 let fetch = self.remaining_fetch();
641 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
642 .register(&self.memory_pool);
643 self.merge_count += 1;
644 let lex_ordering = LexOrdering::new(vec![self.expression.clone()]);
645
646 let resulting_stream = StreamingMergeBuilder::new()
647 .with_streams(streams)
648 .with_schema(self.schema())
649 .with_expressions(&lex_ordering)
650 .with_metrics(self.metrics.clone())
651 .with_batch_size(self.batch_size)
652 .with_fetch(fetch)
653 .with_reservation(reservation)
654 .build()?;
655 self.merge_stream.push_back(resulting_stream);
656 Ok(())
658 }
659
660 fn remaining_fetch(&self) -> Option<usize> {
663 let total_now = self.produced;
664 self.fetch.map(|p| p.saturating_sub(total_now))
665 }
666}
667
668impl Stream for WindowedSortStream {
669 type Item = datafusion_common::Result<DfRecordBatch>;
670
671 fn poll_next(
672 mut self: Pin<&mut Self>,
673 cx: &mut Context<'_>,
674 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
675 let result = self.as_mut().poll_next_inner(cx);
676 self.metrics.record_poll(result)
677 }
678}
679
680impl RecordBatchStream for WindowedSortStream {
681 fn schema(&self) -> SchemaRef {
682 self.schema.clone()
683 }
684}
685
686fn split_batch_to_sorted_run(
688 batch: DfRecordBatch,
689 expression: &PhysicalSortExpr,
690) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
691 let sort_column = expression.evaluate_to_sort_column(&batch)?;
693 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
694 if let Some(run) = sorted_runs_offset.first()
695 && sorted_runs_offset.len() == 1
696 {
697 if !(run.offset == 0 && run.len == batch.num_rows()) {
698 internal_err!(
699 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
700 run.offset,
701 run.len,
702 batch.num_rows()
703 )?;
704 }
705 Ok(SortedRunSet {
707 runs_with_batch: vec![(batch, run.clone())],
708 sort_column,
709 })
710 } else {
711 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
713 for run in sorted_runs_offset {
714 if run.offset + run.len > batch.num_rows() {
715 internal_err!(
716 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
717 run.offset,
718 run.len,
719 batch.num_rows()
720 )?;
721 }
722 let new_rb = batch.slice(run.offset, run.len);
723 ret.push((new_rb, run));
724 }
725 Ok(SortedRunSet {
726 runs_with_batch: ret,
727 sort_column,
728 })
729 }
730}
731
732#[macro_export]
736macro_rules! downcast_ts_array {
737 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
738 {
739 match $data_type {
740 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
741 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
742 }
743 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
744 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
745 }
746 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
747 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
748 }
749 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
750 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
751 }
752 $($p => $fallback,)*
753 }
754 };
755}
756
757fn find_slice_from_range(
761 sort_column: &SortColumn,
762 range: &TimeRange,
763) -> datafusion_common::Result<(usize, usize)> {
764 let ty = sort_column.values.data_type();
765 let time_unit = if let DataType::Timestamp(unit, _) = ty {
766 unit
767 } else {
768 return Err(DataFusionError::Internal(format!(
769 "Unsupported sort column type: {}",
770 sort_column.values.data_type()
771 )));
772 };
773 let array = &sort_column.values;
774 let opt = &sort_column.options.unwrap_or_default();
775 let descending = opt.descending;
776
777 let typed_sorted_range = [range.start, range.end]
778 .iter()
779 .map(|t| {
780 t.convert_to(time_unit.into())
781 .ok_or_else(|| {
782 DataFusionError::Internal(format!(
783 "Failed to convert timestamp from {:?} to {:?}",
784 t.unit(),
785 time_unit
786 ))
787 })
788 .and_then(|typed_ts| {
789 let value = Value::Timestamp(typed_ts);
790 value
791 .try_to_scalar_value(&value.data_type())
792 .map_err(|e| DataFusionError::External(Box::new(e) as _))
793 })
794 })
795 .try_collect::<_, Vec<_>, _>()?;
796
797 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
798
799 let (start, end) = if descending {
801 let start = bisect::<false>(&[array.clone()], &[max_val.clone()], &[*opt])?;
805 let end = bisect::<false>(&[array.clone()], &[min_val.clone()], &[*opt])?;
808 (start, end)
809 } else {
810 let start = bisect::<true>(&[array.clone()], &[min_val.clone()], &[*opt])?;
813 let end = bisect::<true>(&[array.clone()], &[max_val.clone()], &[*opt])?;
816 (start, end)
817 };
818
819 Ok((start, end - start))
820}
821
822#[macro_export]
826macro_rules! array_iter_helper {
827 ($t:ty, $unit:expr, $arr:expr) => {{
828 let typed = $arr
829 .as_any()
830 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
831 .unwrap();
832 let iter = typed.iter().enumerate();
833 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
834 }};
835}
836
837fn cmp_with_opts<T: Ord>(
841 a: &Option<T>,
842 b: &Option<T>,
843 opt: &Option<SortOptions>,
844) -> std::cmp::Ordering {
845 let opt = opt.unwrap_or_default();
846
847 if let (Some(a), Some(b)) = (a, b) {
848 if opt.descending {
849 b.cmp(a)
850 } else {
851 a.cmp(b)
852 }
853 } else if opt.nulls_first {
854 a.cmp(b)
857 } else {
858 match (a, b) {
859 (Some(a), Some(b)) => a.cmp(b),
860 (Some(_), None) => std::cmp::Ordering::Less,
861 (None, Some(_)) => std::cmp::Ordering::Greater,
862 (None, None) => std::cmp::Ordering::Equal,
863 }
864 }
865}
866
867#[derive(Debug, Clone)]
868struct SortedRunSet<N: Ord> {
869 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
871 sort_column: SortColumn,
873}
874
875#[derive(Debug, Clone, PartialEq)]
877struct SucRun<N: Ord> {
878 offset: usize,
880 len: usize,
882 first_val: Option<N>,
884 last_val: Option<N>,
886}
887
888impl SucRun<Timestamp> {
889 fn get_time_range(&self) -> Option<TimeRange> {
891 let start = self.first_val.min(self.last_val);
892 let end = self
893 .first_val
894 .max(self.last_val)
895 .map(|i| Timestamp::new(i.value() + 1, i.unit()));
896 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
897 }
898}
899
900fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
902 iter: T,
903 sort_opts: &Option<SortOptions>,
904) -> Vec<SucRun<N>> {
905 let mut runs = Vec::new();
906 let mut last_value = None;
907 let mut iter_len = None;
908
909 let mut last_offset = 0;
910 let mut first_val: Option<N> = None;
911 let mut last_val: Option<N> = None;
912
913 for (idx, t) in iter {
914 if let Some(last_value) = &last_value {
915 if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
916 let len = idx - last_offset;
918 let run = SucRun {
919 offset: last_offset,
920 len,
921 first_val,
922 last_val,
923 };
924 runs.push(run);
925 first_val = None;
926 last_val = None;
927
928 last_offset = idx;
929 }
930 }
931 last_value = Some(t);
932 if let Some(t) = t {
933 first_val = first_val.or(Some(t));
934 last_val = Some(t).or(last_val);
935 }
936 iter_len = Some(idx);
937 }
938 let run = SucRun {
939 offset: last_offset,
940 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
941 first_val,
942 last_val,
943 };
944 runs.push(run);
945
946 runs
947}
948
949fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
953 let ty = sort_column.values.data_type();
954 if let DataType::Timestamp(unit, _) = ty {
955 let array = &sort_column.values;
956 let iter = downcast_ts_array!(
957 array.data_type() => (array_iter_helper, array),
958 _ => internal_err!("Unsupported sort column type: {ty}")?
959 );
960
961 let raw = find_successive_runs(iter, &sort_column.options);
962 let ts_runs = raw
963 .into_iter()
964 .map(|run| SucRun {
965 offset: run.offset,
966 len: run.len,
967 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
968 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
969 })
970 .collect_vec();
971 Ok(ts_runs)
972 } else {
973 Err(DataFusionError::Internal(format!(
974 "Unsupported sort column type: {ty}"
975 )))
976 }
977}
978
979#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
983struct TimeRange {
984 start: Timestamp,
985 end: Timestamp,
986}
987
988impl From<&PartitionRange> for TimeRange {
989 fn from(range: &PartitionRange) -> Self {
990 Self::new(range.start, range.end)
991 }
992}
993
994impl From<(Timestamp, Timestamp)> for TimeRange {
995 fn from(range: (Timestamp, Timestamp)) -> Self {
996 Self::new(range.0, range.1)
997 }
998}
999
1000impl From<&(Timestamp, Timestamp)> for TimeRange {
1001 fn from(range: &(Timestamp, Timestamp)) -> Self {
1002 Self::new(range.0, range.1)
1003 }
1004}
1005
1006impl TimeRange {
1007 fn new(start: Timestamp, end: Timestamp) -> Self {
1009 if start > end {
1010 Self {
1011 start: end,
1012 end: start,
1013 }
1014 } else {
1015 Self { start, end }
1016 }
1017 }
1018
1019 fn is_subset(&self, other: &Self) -> bool {
1020 self.start >= other.start && self.end <= other.end
1021 }
1022
1023 fn is_overlapping(&self, other: &Self) -> bool {
1025 !(self.start >= other.end || self.end <= other.start)
1026 }
1027
1028 fn intersection(&self, other: &Self) -> Option<Self> {
1029 if self.is_overlapping(other) {
1030 Some(Self::new(
1031 self.start.max(other.start),
1032 self.end.min(other.end),
1033 ))
1034 } else {
1035 None
1036 }
1037 }
1038
1039 fn difference(&self, other: &Self) -> Vec<Self> {
1040 if !self.is_overlapping(other) {
1041 vec![*self]
1042 } else {
1043 let mut ret = Vec::new();
1044 if self.start < other.start && self.end > other.end {
1045 ret.push(Self::new(self.start, other.start));
1046 ret.push(Self::new(other.end, self.end));
1047 } else if self.start < other.start {
1048 ret.push(Self::new(self.start, other.start));
1049 } else if self.end > other.end {
1050 ret.push(Self::new(other.end, self.end));
1051 }
1052 ret
1053 }
1054 }
1055}
1056
1057fn split_range_by(
1059 input_range: &TimeRange,
1060 input_parts: &[usize],
1061 split_by: &TimeRange,
1062 split_idx: usize,
1063) -> Vec<Action> {
1064 let mut ret = Vec::new();
1065 if input_range.is_overlapping(split_by) {
1066 let input_parts = input_parts.to_vec();
1067 let new_parts = {
1068 let mut new_parts = input_parts.clone();
1069 new_parts.push(split_idx);
1070 new_parts
1071 };
1072
1073 ret.push(Action::Pop(*input_range));
1074 if let Some(intersection) = input_range.intersection(split_by) {
1075 ret.push(Action::Push(intersection, new_parts.clone()));
1076 }
1077 for diff in input_range.difference(split_by) {
1078 ret.push(Action::Push(diff, input_parts.clone()));
1079 }
1080 }
1081 ret
1082}
1083
1084#[derive(Debug, Clone, PartialEq, Eq)]
1085enum Action {
1086 Pop(TimeRange),
1087 Push(TimeRange, Vec<usize>),
1088}
1089
1090fn compute_all_working_ranges(
1098 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1099 descending: bool,
1100) -> Vec<(TimeRange, BTreeSet<usize>)> {
1101 let mut ret = Vec::new();
1102 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1103 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1104 Box::new(overlap_counts.iter().rev()) as _
1105 } else {
1106 Box::new(overlap_counts.iter()) as _
1107 };
1108 for (range, set) in overlap_iter {
1109 match &mut cur_range_set {
1110 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1111 Some((working_range, working_set)) => {
1112 let need_expand = {
1117 let last_part = working_set.last();
1118 let inter: BTreeSet<usize> = working_set
1119 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1120 .cloned()
1121 .collect();
1122 if let Some(one) = inter.first()
1123 && inter.len() == 1
1124 && Some(one) == last_part
1125 {
1126 if set.iter().all(|p| Some(p) >= last_part) {
1128 false
1130 } else {
1131 true
1133 }
1134 } else if inter.is_empty() {
1135 false
1137 } else {
1138 true
1140 }
1141 };
1142
1143 if need_expand {
1144 if descending {
1145 working_range.start = range.start;
1146 } else {
1147 working_range.end = range.end;
1148 }
1149 working_set.extend(set.iter().cloned());
1150 } else {
1151 ret.push((*working_range, std::mem::take(working_set)));
1152 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1153 }
1154 }
1155 }
1156 }
1157
1158 if let Some(cur_range_set) = cur_range_set {
1159 ret.push(cur_range_set)
1160 }
1161
1162 ret
1163}
1164
1165fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1168 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1170 for (idx, range) in ranges.iter().enumerate() {
1171 let key: TimeRange = (range.start, range.end).into();
1172 let mut actions = Vec::new();
1173 let mut untouched = vec![key];
1174 let forward_iter = ret
1178 .range(key..)
1179 .take_while(|(range, _)| range.is_overlapping(&key));
1180 let backward_iter = ret
1181 .range(..key)
1182 .rev()
1183 .take_while(|(range, _)| range.is_overlapping(&key));
1184
1185 for (range, parts) in forward_iter.chain(backward_iter) {
1186 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1187 let act = split_range_by(range, parts, &key, idx);
1188 actions.extend(act.into_iter());
1189 }
1190
1191 for action in actions {
1192 match action {
1193 Action::Pop(range) => {
1194 ret.remove(&range);
1195 }
1196 Action::Push(range, parts) => {
1197 ret.insert(range, parts);
1198 }
1199 }
1200 }
1201
1202 for range in untouched {
1204 ret.insert(range, vec![idx]);
1205 }
1206 }
1207 ret
1208}
1209
1210fn get_timestamp_from_idx(
1212 array: &ArrayRef,
1213 offset: usize,
1214) -> datafusion_common::Result<Option<Timestamp>> {
1215 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1216 unit
1217 } else {
1218 return Err(DataFusionError::Internal(format!(
1219 "Unsupported sort column type: {}",
1220 array.data_type()
1221 )));
1222 };
1223 let ty = array.data_type();
1224 let array = array.slice(offset, 1);
1225 let mut iter = downcast_ts_array!(
1226 array.data_type() => (array_iter_helper, array),
1227 _ => internal_err!("Unsupported sort column type: {ty}")?
1228 );
1229 let (_idx, val) = iter.next().ok_or_else(|| {
1230 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1231 })?;
1232 let val = if let Some(val) = val {
1233 val
1234 } else {
1235 return Ok(None);
1236 };
1237 let gt_timestamp = Timestamp::new(val, time_unit.into());
1238 Ok(Some(gt_timestamp))
1239}
1240
1241#[cfg(test)]
1242mod test {
1243 use std::io::Write;
1244 use std::sync::Arc;
1245
1246 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1247 use arrow::compute::concat_batches;
1248 use arrow::json::ArrayWriter;
1249 use arrow_schema::{Field, Schema, TimeUnit};
1250 use futures::StreamExt;
1251 use pretty_assertions::assert_eq;
1252 use serde_json::json;
1253
1254 use super::*;
1255 use crate::test_util::{new_ts_array, MockInputExec};
1256
1257 #[test]
1258 fn test_overlapping() {
1259 let testcases = [
1260 (
1261 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1262 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1263 false,
1264 ),
1265 (
1266 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1267 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1268 true,
1269 ),
1270 (
1271 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1272 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1273 true,
1274 ),
1275 (
1276 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1277 (
1278 Timestamp::new_millisecond(1000),
1279 Timestamp::new_millisecond(1002),
1280 ),
1281 true,
1282 ),
1283 (
1284 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1285 (
1286 Timestamp::new_millisecond(1001),
1287 Timestamp::new_millisecond(1002),
1288 ),
1289 false,
1290 ),
1291 (
1292 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1293 (
1294 Timestamp::new_millisecond(1002),
1295 Timestamp::new_millisecond(1003),
1296 ),
1297 false,
1298 ),
1299 ];
1300
1301 for (range1, range2, expected) in testcases.iter() {
1302 assert_eq!(
1303 TimeRange::from(range1).is_overlapping(&range2.into()),
1304 *expected,
1305 "range1: {:?}, range2: {:?}",
1306 range1,
1307 range2
1308 );
1309 }
1310 }
1311
1312 #[test]
1313 fn test_split() {
1314 let testcases = [
1315 (
1317 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1318 vec![0],
1319 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1320 1,
1321 vec![],
1322 ),
1323 (
1325 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1326 vec![0],
1327 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1328 1,
1329 vec![
1330 Action::Pop(
1331 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1332 ),
1333 Action::Push(
1334 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1335 vec![0, 1],
1336 ),
1337 ],
1338 ),
1339 (
1340 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1341 vec![0],
1342 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1343 1,
1344 vec![
1345 Action::Pop(
1346 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1347 ),
1348 Action::Push(
1349 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1350 vec![0, 1],
1351 ),
1352 ],
1353 ),
1354 (
1355 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1356 vec![0],
1357 (
1358 Timestamp::new_millisecond(1000),
1359 Timestamp::new_millisecond(1002),
1360 ),
1361 1,
1362 vec![
1363 Action::Pop(
1364 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1365 ),
1366 Action::Push(
1367 (
1368 Timestamp::new_millisecond(1000),
1369 Timestamp::new_millisecond(1001),
1370 )
1371 .into(),
1372 vec![0, 1],
1373 ),
1374 ],
1375 ),
1376 (
1378 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1379 vec![0],
1380 (
1381 Timestamp::new_millisecond(1001),
1382 Timestamp::new_millisecond(1002),
1383 ),
1384 1,
1385 vec![
1386 Action::Pop(
1387 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1388 ),
1389 Action::Push(
1390 (
1391 Timestamp::new_millisecond(1001),
1392 Timestamp::new_millisecond(1002),
1393 )
1394 .into(),
1395 vec![0, 1],
1396 ),
1397 Action::Push(
1398 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1399 vec![0],
1400 ),
1401 ],
1402 ),
1403 (
1405 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1406 vec![0],
1407 (
1408 Timestamp::new_millisecond(1001),
1409 Timestamp::new_millisecond(1002),
1410 ),
1411 1,
1412 vec![
1413 Action::Pop(
1414 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1415 ),
1416 Action::Push(
1417 (
1418 Timestamp::new_millisecond(1001),
1419 Timestamp::new_millisecond(1002),
1420 )
1421 .into(),
1422 vec![0, 1],
1423 ),
1424 Action::Push(
1425 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1426 vec![0],
1427 ),
1428 Action::Push(
1429 (
1430 Timestamp::new_millisecond(1002),
1431 Timestamp::new_millisecond(1004),
1432 )
1433 .into(),
1434 vec![0],
1435 ),
1436 ],
1437 ),
1438 ];
1439
1440 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1441 assert_eq!(
1442 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1443 *expected,
1444 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1445 range,
1446 parts,
1447 split_by,
1448 split_idx
1449 );
1450 }
1451 }
1452
1453 #[test]
1454 fn test_compute_working_ranges_rev() {
1455 let testcases = vec![
1456 (
1457 BTreeMap::from([(
1458 (Timestamp::new_second(1), Timestamp::new_second(2)),
1459 vec![0],
1460 )]),
1461 vec![(
1462 (Timestamp::new_second(1), Timestamp::new_second(2)),
1463 BTreeSet::from([0]),
1464 )],
1465 ),
1466 (
1467 BTreeMap::from([(
1468 (Timestamp::new_second(1), Timestamp::new_second(2)),
1469 vec![0, 1],
1470 )]),
1471 vec![(
1472 (Timestamp::new_second(1), Timestamp::new_second(2)),
1473 BTreeSet::from([0, 1]),
1474 )],
1475 ),
1476 (
1477 BTreeMap::from([
1478 (
1479 (Timestamp::new_second(2), Timestamp::new_second(3)),
1480 vec![0],
1481 ),
1482 (
1483 (Timestamp::new_second(1), Timestamp::new_second(2)),
1484 vec![0, 1],
1485 ),
1486 ]),
1487 vec![
1488 (
1489 (Timestamp::new_second(2), Timestamp::new_second(3)),
1490 BTreeSet::from([0]),
1491 ),
1492 (
1493 (Timestamp::new_second(1), Timestamp::new_second(2)),
1494 BTreeSet::from([0, 1]),
1495 ),
1496 ],
1497 ),
1498 (
1499 BTreeMap::from([
1500 (
1501 (Timestamp::new_second(2), Timestamp::new_second(3)),
1502 vec![0, 1],
1503 ),
1504 (
1505 (Timestamp::new_second(1), Timestamp::new_second(2)),
1506 vec![1],
1507 ),
1508 ]),
1509 vec![
1510 (
1511 (Timestamp::new_second(2), Timestamp::new_second(3)),
1512 BTreeSet::from([0, 1]),
1513 ),
1514 (
1515 (Timestamp::new_second(1), Timestamp::new_second(2)),
1516 BTreeSet::from([1]),
1517 ),
1518 ],
1519 ),
1520 (
1521 BTreeMap::from([
1522 (
1523 (Timestamp::new_second(3), Timestamp::new_second(4)),
1524 vec![0],
1525 ),
1526 (
1527 (Timestamp::new_second(2), Timestamp::new_second(3)),
1528 vec![0, 1],
1529 ),
1530 (
1531 (Timestamp::new_second(1), Timestamp::new_second(2)),
1532 vec![1],
1533 ),
1534 ]),
1535 vec![
1536 (
1537 (Timestamp::new_second(3), Timestamp::new_second(4)),
1538 BTreeSet::from([0]),
1539 ),
1540 (
1541 (Timestamp::new_second(2), Timestamp::new_second(3)),
1542 BTreeSet::from([0, 1]),
1543 ),
1544 (
1545 (Timestamp::new_second(1), Timestamp::new_second(2)),
1546 BTreeSet::from([1]),
1547 ),
1548 ],
1549 ),
1550 (
1551 BTreeMap::from([
1552 (
1553 (Timestamp::new_second(3), Timestamp::new_second(4)),
1554 vec![0, 2],
1555 ),
1556 (
1557 (Timestamp::new_second(2), Timestamp::new_second(3)),
1558 vec![0, 1, 2],
1559 ),
1560 (
1561 (Timestamp::new_second(1), Timestamp::new_second(2)),
1562 vec![1, 2],
1563 ),
1564 ]),
1565 vec![(
1566 (Timestamp::new_second(1), Timestamp::new_second(4)),
1567 BTreeSet::from([0, 1, 2]),
1568 )],
1569 ),
1570 (
1571 BTreeMap::from([
1572 (
1573 (Timestamp::new_second(2), Timestamp::new_second(3)),
1574 vec![0, 2],
1575 ),
1576 (
1577 (Timestamp::new_second(1), Timestamp::new_second(2)),
1578 vec![1, 2],
1579 ),
1580 ]),
1581 vec![(
1582 (Timestamp::new_second(1), Timestamp::new_second(3)),
1583 BTreeSet::from([0, 1, 2]),
1584 )],
1585 ),
1586 (
1587 BTreeMap::from([
1588 (
1589 (Timestamp::new_second(3), Timestamp::new_second(4)),
1590 vec![0, 1],
1591 ),
1592 (
1593 (Timestamp::new_second(2), Timestamp::new_second(3)),
1594 vec![0, 1, 2],
1595 ),
1596 (
1597 (Timestamp::new_second(1), Timestamp::new_second(2)),
1598 vec![1, 2],
1599 ),
1600 ]),
1601 vec![(
1602 (Timestamp::new_second(1), Timestamp::new_second(4)),
1603 BTreeSet::from([0, 1, 2]),
1604 )],
1605 ),
1606 (
1607 BTreeMap::from([
1608 (
1609 (Timestamp::new_second(2), Timestamp::new_second(3)),
1610 vec![0, 1],
1611 ),
1612 (
1613 (Timestamp::new_second(1), Timestamp::new_second(2)),
1614 vec![1, 2],
1615 ),
1616 ]),
1617 vec![
1618 (
1619 (Timestamp::new_second(2), Timestamp::new_second(3)),
1620 BTreeSet::from([0, 1]),
1621 ),
1622 (
1623 (Timestamp::new_second(1), Timestamp::new_second(2)),
1624 BTreeSet::from([1, 2]),
1625 ),
1626 ],
1627 ),
1628 (
1630 BTreeMap::from([
1631 (
1632 (Timestamp::new_second(2), Timestamp::new_second(3)),
1633 vec![0],
1634 ),
1635 (
1636 (Timestamp::new_second(1), Timestamp::new_second(2)),
1637 vec![1, 2],
1638 ),
1639 ]),
1640 vec![
1641 (
1642 (Timestamp::new_second(2), Timestamp::new_second(3)),
1643 BTreeSet::from([0]),
1644 ),
1645 (
1646 (Timestamp::new_second(1), Timestamp::new_second(2)),
1647 BTreeSet::from([1, 2]),
1648 ),
1649 ],
1650 ),
1651 ];
1652
1653 for (input, expected) in testcases {
1654 let expected = expected
1655 .into_iter()
1656 .map(|(r, s)| (r.into(), s))
1657 .collect_vec();
1658 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1659 assert_eq!(
1660 compute_all_working_ranges(&input, true),
1661 expected,
1662 "input: {:?}",
1663 input
1664 );
1665 }
1666 }
1667
1668 #[test]
1669 fn test_compute_working_ranges() {
1670 let testcases = vec![
1671 (
1672 BTreeMap::from([(
1673 (Timestamp::new_second(1), Timestamp::new_second(2)),
1674 vec![0],
1675 )]),
1676 vec![(
1677 (Timestamp::new_second(1), Timestamp::new_second(2)),
1678 BTreeSet::from([0]),
1679 )],
1680 ),
1681 (
1682 BTreeMap::from([(
1683 (Timestamp::new_second(1), Timestamp::new_second(2)),
1684 vec![0, 1],
1685 )]),
1686 vec![(
1687 (Timestamp::new_second(1), Timestamp::new_second(2)),
1688 BTreeSet::from([0, 1]),
1689 )],
1690 ),
1691 (
1692 BTreeMap::from([
1693 (
1694 (Timestamp::new_second(1), Timestamp::new_second(2)),
1695 vec![0, 1],
1696 ),
1697 (
1698 (Timestamp::new_second(2), Timestamp::new_second(3)),
1699 vec![1],
1700 ),
1701 ]),
1702 vec![
1703 (
1704 (Timestamp::new_second(1), Timestamp::new_second(2)),
1705 BTreeSet::from([0, 1]),
1706 ),
1707 (
1708 (Timestamp::new_second(2), Timestamp::new_second(3)),
1709 BTreeSet::from([1]),
1710 ),
1711 ],
1712 ),
1713 (
1714 BTreeMap::from([
1715 (
1716 (Timestamp::new_second(1), Timestamp::new_second(2)),
1717 vec![0],
1718 ),
1719 (
1720 (Timestamp::new_second(2), Timestamp::new_second(3)),
1721 vec![0, 1],
1722 ),
1723 ]),
1724 vec![
1725 (
1726 (Timestamp::new_second(1), Timestamp::new_second(2)),
1727 BTreeSet::from([0]),
1728 ),
1729 (
1730 (Timestamp::new_second(2), Timestamp::new_second(3)),
1731 BTreeSet::from([0, 1]),
1732 ),
1733 ],
1734 ),
1735 (
1737 BTreeMap::from([
1738 (
1739 (Timestamp::new_second(1), Timestamp::new_second(2)),
1740 vec![0],
1741 ),
1742 (
1743 (Timestamp::new_second(2), Timestamp::new_second(3)),
1744 vec![0, 1],
1745 ),
1746 (
1747 (Timestamp::new_second(3), Timestamp::new_second(4)),
1748 vec![1],
1749 ),
1750 ]),
1751 vec![
1752 (
1753 (Timestamp::new_second(1), Timestamp::new_second(2)),
1754 BTreeSet::from([0]),
1755 ),
1756 (
1757 (Timestamp::new_second(2), Timestamp::new_second(3)),
1758 BTreeSet::from([0, 1]),
1759 ),
1760 (
1761 (Timestamp::new_second(3), Timestamp::new_second(4)),
1762 BTreeSet::from([1]),
1763 ),
1764 ],
1765 ),
1766 (
1767 BTreeMap::from([
1768 (
1769 (Timestamp::new_second(1), Timestamp::new_second(2)),
1770 vec![0, 2],
1771 ),
1772 (
1773 (Timestamp::new_second(2), Timestamp::new_second(3)),
1774 vec![0, 1, 2],
1775 ),
1776 (
1777 (Timestamp::new_second(3), Timestamp::new_second(4)),
1778 vec![1, 2],
1779 ),
1780 ]),
1781 vec![(
1782 (Timestamp::new_second(1), Timestamp::new_second(4)),
1783 BTreeSet::from([0, 1, 2]),
1784 )],
1785 ),
1786 (
1787 BTreeMap::from([
1788 (
1789 (Timestamp::new_second(1), Timestamp::new_second(2)),
1790 vec![0, 2],
1791 ),
1792 (
1793 (Timestamp::new_second(2), Timestamp::new_second(3)),
1794 vec![1, 2],
1795 ),
1796 ]),
1797 vec![(
1798 (Timestamp::new_second(1), Timestamp::new_second(3)),
1799 BTreeSet::from([0, 1, 2]),
1800 )],
1801 ),
1802 (
1803 BTreeMap::from([
1804 (
1805 (Timestamp::new_second(1), Timestamp::new_second(2)),
1806 vec![0, 1],
1807 ),
1808 (
1809 (Timestamp::new_second(2), Timestamp::new_second(3)),
1810 vec![0, 1, 2],
1811 ),
1812 (
1813 (Timestamp::new_second(3), Timestamp::new_second(4)),
1814 vec![1, 2],
1815 ),
1816 ]),
1817 vec![(
1818 (Timestamp::new_second(1), Timestamp::new_second(4)),
1819 BTreeSet::from([0, 1, 2]),
1820 )],
1821 ),
1822 (
1823 BTreeMap::from([
1824 (
1825 (Timestamp::new_second(1), Timestamp::new_second(2)),
1826 vec![0, 1],
1827 ),
1828 (
1829 (Timestamp::new_second(2), Timestamp::new_second(3)),
1830 vec![1, 2],
1831 ),
1832 ]),
1833 vec![
1834 (
1835 (Timestamp::new_second(1), Timestamp::new_second(2)),
1836 BTreeSet::from([0, 1]),
1837 ),
1838 (
1839 (Timestamp::new_second(2), Timestamp::new_second(3)),
1840 BTreeSet::from([1, 2]),
1841 ),
1842 ],
1843 ),
1844 (
1846 BTreeMap::from([
1847 (
1848 (Timestamp::new_second(1), Timestamp::new_second(2)),
1849 vec![0, 1],
1850 ),
1851 (
1852 (Timestamp::new_second(2), Timestamp::new_second(3)),
1853 vec![2],
1854 ),
1855 ]),
1856 vec![
1857 (
1858 (Timestamp::new_second(1), Timestamp::new_second(2)),
1859 BTreeSet::from([0, 1]),
1860 ),
1861 (
1862 (Timestamp::new_second(2), Timestamp::new_second(3)),
1863 BTreeSet::from([2]),
1864 ),
1865 ],
1866 ),
1867 ];
1868
1869 for (input, expected) in testcases {
1870 let expected = expected
1871 .into_iter()
1872 .map(|(r, s)| (r.into(), s))
1873 .collect_vec();
1874 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1875 assert_eq!(
1876 compute_all_working_ranges(&input, false),
1877 expected,
1878 "input: {:?}",
1879 input
1880 );
1881 }
1882 }
1883
1884 #[test]
1885 fn test_split_overlap_range() {
1886 let testcases = vec![
1887 (
1889 vec![PartitionRange {
1890 start: Timestamp::new_second(1),
1891 end: Timestamp::new_second(2),
1892 num_rows: 2,
1893 identifier: 0,
1894 }],
1895 BTreeMap::from_iter(
1896 vec![(
1897 (Timestamp::new_second(1), Timestamp::new_second(2)),
1898 vec![0],
1899 )]
1900 .into_iter(),
1901 ),
1902 ),
1903 (
1905 vec![
1906 PartitionRange {
1907 start: Timestamp::new_second(1),
1908 end: Timestamp::new_second(2),
1909 num_rows: 2,
1910 identifier: 0,
1911 },
1912 PartitionRange {
1913 start: Timestamp::new_second(1),
1914 end: Timestamp::new_second(2),
1915 num_rows: 2,
1916 identifier: 1,
1917 },
1918 ],
1919 BTreeMap::from_iter(
1920 vec![(
1921 (Timestamp::new_second(1), Timestamp::new_second(2)),
1922 vec![0, 1],
1923 )]
1924 .into_iter(),
1925 ),
1926 ),
1927 (
1928 vec![
1929 PartitionRange {
1930 start: Timestamp::new_second(1),
1931 end: Timestamp::new_second(3),
1932 num_rows: 2,
1933 identifier: 0,
1934 },
1935 PartitionRange {
1936 start: Timestamp::new_second(2),
1937 end: Timestamp::new_second(4),
1938 num_rows: 2,
1939 identifier: 1,
1940 },
1941 ],
1942 BTreeMap::from_iter(
1943 vec![
1944 (
1945 (Timestamp::new_second(1), Timestamp::new_second(2)),
1946 vec![0],
1947 ),
1948 (
1949 (Timestamp::new_second(2), Timestamp::new_second(3)),
1950 vec![0, 1],
1951 ),
1952 (
1953 (Timestamp::new_second(3), Timestamp::new_second(4)),
1954 vec![1],
1955 ),
1956 ]
1957 .into_iter(),
1958 ),
1959 ),
1960 (
1962 vec![
1963 PartitionRange {
1964 start: Timestamp::new_second(1),
1965 end: Timestamp::new_second(3),
1966 num_rows: 2,
1967 identifier: 0,
1968 },
1969 PartitionRange {
1970 start: Timestamp::new_second(2),
1971 end: Timestamp::new_second(4),
1972 num_rows: 2,
1973 identifier: 1,
1974 },
1975 PartitionRange {
1976 start: Timestamp::new_second(1),
1977 end: Timestamp::new_second(4),
1978 num_rows: 2,
1979 identifier: 2,
1980 },
1981 ],
1982 BTreeMap::from_iter(
1983 vec![
1984 (
1985 (Timestamp::new_second(1), Timestamp::new_second(2)),
1986 vec![0, 2],
1987 ),
1988 (
1989 (Timestamp::new_second(2), Timestamp::new_second(3)),
1990 vec![0, 1, 2],
1991 ),
1992 (
1993 (Timestamp::new_second(3), Timestamp::new_second(4)),
1994 vec![1, 2],
1995 ),
1996 ]
1997 .into_iter(),
1998 ),
1999 ),
2000 (
2001 vec![
2002 PartitionRange {
2003 start: Timestamp::new_second(1),
2004 end: Timestamp::new_second(3),
2005 num_rows: 2,
2006 identifier: 0,
2007 },
2008 PartitionRange {
2009 start: Timestamp::new_second(1),
2010 end: Timestamp::new_second(4),
2011 num_rows: 2,
2012 identifier: 1,
2013 },
2014 PartitionRange {
2015 start: Timestamp::new_second(2),
2016 end: Timestamp::new_second(4),
2017 num_rows: 2,
2018 identifier: 2,
2019 },
2020 ],
2021 BTreeMap::from_iter(
2022 vec![
2023 (
2024 (Timestamp::new_second(1), Timestamp::new_second(2)),
2025 vec![0, 1],
2026 ),
2027 (
2028 (Timestamp::new_second(2), Timestamp::new_second(3)),
2029 vec![0, 1, 2],
2030 ),
2031 (
2032 (Timestamp::new_second(3), Timestamp::new_second(4)),
2033 vec![1, 2],
2034 ),
2035 ]
2036 .into_iter(),
2037 ),
2038 ),
2039 ];
2040
2041 for (input, expected) in testcases {
2042 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2043 assert_eq!(split_overlapping_ranges(&input), expected);
2044 }
2045 }
2046
2047 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2048 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2049 Self {
2050 offset: offset as usize,
2051 len: len as usize,
2052 first_val: min_val,
2053 last_val: max_val,
2054 }
2055 }
2056 }
2057
2058 #[test]
2059 fn test_find_successive_runs() {
2060 let testcases = vec![
2061 (
2062 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2063 Some(SortOptions {
2064 descending: false,
2065 nulls_first: false,
2066 }),
2067 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2068 ),
2069 (
2070 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2071 Some(SortOptions {
2072 descending: false,
2073 nulls_first: false,
2074 }),
2075 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2076 ),
2077 (
2078 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2079 Some(SortOptions {
2080 descending: false,
2081 nulls_first: false,
2082 }),
2083 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2084 ),
2085 (
2086 vec![Some(1), Some(2), Some(1), Some(3)],
2087 Some(SortOptions {
2088 descending: false,
2089 nulls_first: false,
2090 }),
2091 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2092 ),
2093 (
2094 vec![Some(1), Some(2), Some(1), Some(3)],
2095 Some(SortOptions {
2096 descending: true,
2097 nulls_first: false,
2098 }),
2099 vec![
2100 (0, 1, Some(1), Some(1)),
2101 (1, 2, Some(2), Some(1)),
2102 (3, 1, Some(3), Some(3)),
2103 ],
2104 ),
2105 (
2106 vec![Some(1), Some(2), None, Some(3)],
2107 Some(SortOptions {
2108 descending: false,
2109 nulls_first: true,
2110 }),
2111 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2112 ),
2113 (
2114 vec![Some(1), Some(2), None, Some(3)],
2115 Some(SortOptions {
2116 descending: false,
2117 nulls_first: false,
2118 }),
2119 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2120 ),
2121 (
2122 vec![Some(2), Some(1), None, Some(3)],
2123 Some(SortOptions {
2124 descending: true,
2125 nulls_first: true,
2126 }),
2127 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2128 ),
2129 (
2130 vec![],
2131 Some(SortOptions {
2132 descending: false,
2133 nulls_first: true,
2134 }),
2135 vec![(0, 0, None, None)],
2136 ),
2137 (
2138 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2139 Some(SortOptions {
2140 descending: true,
2141 nulls_first: true,
2142 }),
2143 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2144 ),
2145 (
2146 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2147 Some(SortOptions {
2148 descending: true,
2149 nulls_first: false,
2150 }),
2151 vec![
2152 (0, 2, None, None),
2153 (2, 3, Some(2), Some(1)),
2154 (5, 2, Some(5), Some(4)),
2155 ],
2156 ),
2157 ];
2158 for (input, sort_opts, expected) in testcases {
2159 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2160 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2161 assert_eq!(
2162 ret, expected,
2163 "input: {:?}, opt: {:?},expected: {:?}",
2164 input, sort_opts, expected
2165 );
2166 }
2167 }
2168
2169 #[test]
2170 fn test_cmp_with_opts() {
2171 let testcases = vec![
2172 (
2173 Some(1),
2174 Some(2),
2175 Some(SortOptions {
2176 descending: false,
2177 nulls_first: false,
2178 }),
2179 std::cmp::Ordering::Less,
2180 ),
2181 (
2182 Some(1),
2183 Some(2),
2184 Some(SortOptions {
2185 descending: true,
2186 nulls_first: false,
2187 }),
2188 std::cmp::Ordering::Greater,
2189 ),
2190 (
2191 Some(1),
2192 None,
2193 Some(SortOptions {
2194 descending: false,
2195 nulls_first: true,
2196 }),
2197 std::cmp::Ordering::Greater,
2198 ),
2199 (
2200 Some(1),
2201 None,
2202 Some(SortOptions {
2203 descending: true,
2204 nulls_first: true,
2205 }),
2206 std::cmp::Ordering::Greater,
2207 ),
2208 (
2209 Some(1),
2210 None,
2211 Some(SortOptions {
2212 descending: true,
2213 nulls_first: false,
2214 }),
2215 std::cmp::Ordering::Less,
2216 ),
2217 (
2218 Some(1),
2219 None,
2220 Some(SortOptions {
2221 descending: false,
2222 nulls_first: false,
2223 }),
2224 std::cmp::Ordering::Less,
2225 ),
2226 (
2227 None,
2228 None,
2229 Some(SortOptions {
2230 descending: true,
2231 nulls_first: true,
2232 }),
2233 std::cmp::Ordering::Equal,
2234 ),
2235 (
2236 None,
2237 None,
2238 Some(SortOptions {
2239 descending: false,
2240 nulls_first: true,
2241 }),
2242 std::cmp::Ordering::Equal,
2243 ),
2244 (
2245 None,
2246 None,
2247 Some(SortOptions {
2248 descending: true,
2249 nulls_first: false,
2250 }),
2251 std::cmp::Ordering::Equal,
2252 ),
2253 (
2254 None,
2255 None,
2256 Some(SortOptions {
2257 descending: false,
2258 nulls_first: false,
2259 }),
2260 std::cmp::Ordering::Equal,
2261 ),
2262 ];
2263 for (a, b, opts, expected) in testcases {
2264 assert_eq!(
2265 cmp_with_opts(&a, &b, &opts),
2266 expected,
2267 "a: {:?}, b: {:?}, opts: {:?}",
2268 a,
2269 b,
2270 opts
2271 );
2272 }
2273 }
2274
2275 #[test]
2276 fn test_find_slice_from_range() {
2277 let test_cases = vec![
2278 (
2280 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2281 false,
2282 TimeRange {
2283 start: Timestamp::new_millisecond(2),
2284 end: Timestamp::new_millisecond(4),
2285 },
2286 Ok((1, 2)),
2287 ),
2288 (
2289 Arc::new(TimestampMillisecondArray::from_iter_values([
2290 -2, -1, 0, 1, 2, 3, 4, 5,
2291 ])) as ArrayRef,
2292 false,
2293 TimeRange {
2294 start: Timestamp::new_millisecond(-1),
2295 end: Timestamp::new_millisecond(4),
2296 },
2297 Ok((1, 5)),
2298 ),
2299 (
2300 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2301 false,
2302 TimeRange {
2303 start: Timestamp::new_millisecond(2),
2304 end: Timestamp::new_millisecond(5),
2305 },
2306 Ok((1, 2)),
2307 ),
2308 (
2309 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2310 false,
2311 TimeRange {
2312 start: Timestamp::new_millisecond(2),
2313 end: Timestamp::new_millisecond(5),
2314 },
2315 Ok((1, 3)),
2316 ),
2317 (
2318 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2319 false,
2320 TimeRange {
2321 start: Timestamp::new_millisecond(2),
2322 end: Timestamp::new_millisecond(5),
2323 },
2324 Ok((1, 2)),
2325 ),
2326 (
2327 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2328 false,
2329 TimeRange {
2330 start: Timestamp::new_millisecond(6),
2331 end: Timestamp::new_millisecond(7),
2332 },
2333 Ok((5, 0)),
2334 ),
2335 (
2337 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2338 true,
2339 TimeRange {
2340 end: Timestamp::new_millisecond(4),
2341 start: Timestamp::new_millisecond(1),
2342 },
2343 Ok((1, 3)),
2344 ),
2345 (
2346 Arc::new(TimestampMillisecondArray::from_iter_values([
2347 5, 4, 3, 2, 1, 0,
2348 ])) as ArrayRef,
2349 true,
2350 TimeRange {
2351 end: Timestamp::new_millisecond(4),
2352 start: Timestamp::new_millisecond(1),
2353 },
2354 Ok((2, 3)),
2355 ),
2356 (
2357 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2358 true,
2359 TimeRange {
2360 end: Timestamp::new_millisecond(4),
2361 start: Timestamp::new_millisecond(1),
2362 },
2363 Ok((1, 2)),
2364 ),
2365 (
2366 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2367 true,
2368 TimeRange {
2369 end: Timestamp::new_millisecond(4),
2370 start: Timestamp::new_millisecond(1),
2371 },
2372 Ok((2, 2)),
2373 ),
2374 (
2375 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2376 true,
2377 TimeRange {
2378 end: Timestamp::new_millisecond(5),
2379 start: Timestamp::new_millisecond(2),
2380 },
2381 Ok((1, 3)),
2382 ),
2383 (
2384 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2385 true,
2386 TimeRange {
2387 end: Timestamp::new_millisecond(5),
2388 start: Timestamp::new_millisecond(2),
2389 },
2390 Ok((1, 2)),
2391 ),
2392 (
2393 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2394 true,
2395 TimeRange {
2396 end: Timestamp::new_millisecond(5),
2397 start: Timestamp::new_millisecond(2),
2398 },
2399 Ok((1, 3)),
2400 ),
2401 (
2402 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2403 true,
2404 TimeRange {
2405 end: Timestamp::new_millisecond(5),
2406 start: Timestamp::new_millisecond(2),
2407 },
2408 Ok((1, 2)),
2409 ),
2410 (
2411 Arc::new(TimestampMillisecondArray::from_iter_values([
2412 10, 9, 8, 7, 6,
2413 ])) as ArrayRef,
2414 true,
2415 TimeRange {
2416 end: Timestamp::new_millisecond(5),
2417 start: Timestamp::new_millisecond(2),
2418 },
2419 Ok((5, 0)),
2420 ),
2421 (
2423 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2424 true,
2425 TimeRange {
2426 end: Timestamp::new_millisecond(4),
2427 start: Timestamp::new_millisecond(3),
2428 },
2429 Ok((0, 1)),
2430 ),
2431 (
2432 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2433 true,
2434 TimeRange {
2435 end: Timestamp::new_millisecond(4),
2436 start: Timestamp::new_millisecond(3),
2437 },
2438 Ok((1, 1)),
2439 ),
2440 (
2441 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2442 true,
2443 TimeRange {
2444 end: Timestamp::new_millisecond(4),
2445 start: Timestamp::new_millisecond(3),
2446 },
2447 Ok((2, 1)),
2448 ),
2449 ];
2450
2451 for (sort_vals, descending, range, expected) in test_cases {
2452 let sort_column = SortColumn {
2453 values: sort_vals,
2454 options: Some(SortOptions {
2455 descending,
2456 ..Default::default()
2457 }),
2458 };
2459 let ret = find_slice_from_range(&sort_column, &range);
2460 match (ret, expected) {
2461 (Ok(ret), Ok(expected)) => {
2462 assert_eq!(
2463 ret, expected,
2464 "sort_vals: {:?}, range: {:?}",
2465 sort_column, range
2466 )
2467 }
2468 (Err(err), Err(expected)) => {
2469 let expected: &str = expected;
2470 assert!(
2471 err.to_string().contains(expected),
2472 "err: {:?}, expected: {:?}",
2473 err,
2474 expected
2475 );
2476 }
2477 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2478 }
2479 }
2480 }
2481
2482 #[derive(Debug)]
2483 struct TestStream {
2484 expression: PhysicalSortExpr,
2485 fetch: Option<usize>,
2486 input: Vec<(PartitionRange, DfRecordBatch)>,
2487 output: Vec<DfRecordBatch>,
2488 schema: SchemaRef,
2489 }
2490 use datafusion::physical_plan::expressions::Column;
2491 impl TestStream {
2492 fn new(
2493 ts_col: Column,
2494 opt: SortOptions,
2495 fetch: Option<usize>,
2496 schema: impl Into<arrow_schema::Fields>,
2497 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2498 expected: Vec<Vec<ArrayRef>>,
2499 ) -> Self {
2500 let expression = PhysicalSortExpr {
2501 expr: Arc::new(ts_col),
2502 options: opt,
2503 };
2504 let schema = Schema::new(schema.into());
2505 let schema = Arc::new(schema);
2506 let input = input
2507 .into_iter()
2508 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2509 .collect_vec();
2510 let output_batchs = expected
2511 .into_iter()
2512 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2513 .collect_vec();
2514 Self {
2515 expression,
2516 fetch,
2517 input,
2518 output: output_batchs,
2519 schema,
2520 }
2521 }
2522
2523 async fn run_test(&self) -> Vec<DfRecordBatch> {
2524 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2525
2526 let mock_input = MockInputExec::new(batches, self.schema.clone());
2527
2528 let exec = WindowedSortExec::try_new(
2529 self.expression.clone(),
2530 self.fetch,
2531 vec![ranges],
2532 Arc::new(mock_input),
2533 )
2534 .unwrap();
2535
2536 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2537
2538 let real_output = exec_stream.collect::<Vec<_>>().await;
2539 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2540 real_output
2541 }
2542 }
2543
2544 #[tokio::test]
2545 async fn test_window_sort_stream() {
2546 let test_cases = [
2547 TestStream::new(
2548 Column::new("ts", 0),
2549 SortOptions {
2550 descending: false,
2551 nulls_first: true,
2552 },
2553 None,
2554 vec![Field::new(
2555 "ts",
2556 DataType::Timestamp(TimeUnit::Millisecond, None),
2557 false,
2558 )],
2559 vec![],
2560 vec![],
2561 ),
2562 TestStream::new(
2563 Column::new("ts", 0),
2564 SortOptions {
2565 descending: false,
2566 nulls_first: true,
2567 },
2568 None,
2569 vec![Field::new(
2570 "ts",
2571 DataType::Timestamp(TimeUnit::Millisecond, None),
2572 false,
2573 )],
2574 vec![
2575 (
2577 PartitionRange {
2578 start: Timestamp::new_millisecond(1),
2579 end: Timestamp::new_millisecond(2),
2580 num_rows: 1,
2581 identifier: 0,
2582 },
2583 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2584 ),
2585 (
2586 PartitionRange {
2587 start: Timestamp::new_millisecond(1),
2588 end: Timestamp::new_millisecond(3),
2589 num_rows: 1,
2590 identifier: 0,
2591 },
2592 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2593 ),
2594 ],
2595 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2596 [2],
2597 ))]],
2598 ),
2599 TestStream::new(
2600 Column::new("ts", 0),
2601 SortOptions {
2602 descending: false,
2603 nulls_first: true,
2604 },
2605 None,
2606 vec![Field::new(
2607 "ts",
2608 DataType::Timestamp(TimeUnit::Millisecond, None),
2609 false,
2610 )],
2611 vec![
2612 (
2614 PartitionRange {
2615 start: Timestamp::new_millisecond(1),
2616 end: Timestamp::new_millisecond(2),
2617 num_rows: 1,
2618 identifier: 0,
2619 },
2620 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2621 ),
2622 (
2623 PartitionRange {
2624 start: Timestamp::new_millisecond(1),
2625 end: Timestamp::new_millisecond(3),
2626 num_rows: 1,
2627 identifier: 0,
2628 },
2629 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2630 ),
2631 ],
2632 vec![],
2633 ),
2634 TestStream::new(
2635 Column::new("ts", 0),
2636 SortOptions {
2637 descending: false,
2638 nulls_first: true,
2639 },
2640 None,
2641 vec![Field::new(
2642 "ts",
2643 DataType::Timestamp(TimeUnit::Millisecond, None),
2644 false,
2645 )],
2646 vec![
2647 (
2650 PartitionRange {
2651 start: Timestamp::new_millisecond(1),
2652 end: Timestamp::new_millisecond(2),
2653 num_rows: 1,
2654 identifier: 0,
2655 },
2656 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2657 ),
2658 (
2659 PartitionRange {
2660 start: Timestamp::new_millisecond(1),
2661 end: Timestamp::new_millisecond(3),
2662 num_rows: 1,
2663 identifier: 0,
2664 },
2665 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2666 ),
2667 ],
2668 vec![
2669 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2670 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2671 ],
2672 ),
2673 TestStream::new(
2674 Column::new("ts", 0),
2675 SortOptions {
2676 descending: false,
2677 nulls_first: true,
2678 },
2679 None,
2680 vec![Field::new(
2681 "ts",
2682 DataType::Timestamp(TimeUnit::Millisecond, None),
2683 false,
2684 )],
2685 vec![
2686 (
2688 PartitionRange {
2689 start: Timestamp::new_millisecond(1),
2690 end: Timestamp::new_millisecond(3),
2691 num_rows: 1,
2692 identifier: 0,
2693 },
2694 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2695 1, 2,
2696 ]))],
2697 ),
2698 (
2699 PartitionRange {
2700 start: Timestamp::new_millisecond(1),
2701 end: Timestamp::new_millisecond(4),
2702 num_rows: 1,
2703 identifier: 0,
2704 },
2705 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2706 2, 3,
2707 ]))],
2708 ),
2709 ],
2710 vec![
2711 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2712 1, 2,
2713 ]))],
2714 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2716 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2717 ],
2718 ),
2719 TestStream::new(
2720 Column::new("ts", 0),
2721 SortOptions {
2722 descending: false,
2723 nulls_first: true,
2724 },
2725 None,
2726 vec![Field::new(
2727 "ts",
2728 DataType::Timestamp(TimeUnit::Millisecond, None),
2729 false,
2730 )],
2731 vec![
2732 (
2734 PartitionRange {
2735 start: Timestamp::new_millisecond(1),
2736 end: Timestamp::new_millisecond(3),
2737 num_rows: 1,
2738 identifier: 0,
2739 },
2740 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2741 1, 2,
2742 ]))],
2743 ),
2744 (
2745 PartitionRange {
2746 start: Timestamp::new_millisecond(1),
2747 end: Timestamp::new_millisecond(4),
2748 num_rows: 1,
2749 identifier: 1,
2750 },
2751 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2752 1, 2, 3,
2753 ]))],
2754 ),
2755 ],
2756 vec![
2757 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2758 1, 1, 2, 2,
2759 ]))],
2760 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2761 ],
2762 ),
2763 TestStream::new(
2764 Column::new("ts", 0),
2765 SortOptions {
2766 descending: false,
2767 nulls_first: true,
2768 },
2769 None,
2770 vec![Field::new(
2771 "ts",
2772 DataType::Timestamp(TimeUnit::Millisecond, None),
2773 false,
2774 )],
2775 vec![
2776 (
2778 PartitionRange {
2779 start: Timestamp::new_millisecond(1),
2780 end: Timestamp::new_millisecond(3),
2781 num_rows: 1,
2782 identifier: 0,
2783 },
2784 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2785 1, 2,
2786 ]))],
2787 ),
2788 (
2789 PartitionRange {
2790 start: Timestamp::new_millisecond(1),
2791 end: Timestamp::new_millisecond(4),
2792 num_rows: 1,
2793 identifier: 1,
2794 },
2795 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2796 1, 2, 3,
2797 ]))],
2798 ),
2799 (
2800 PartitionRange {
2801 start: Timestamp::new_millisecond(4),
2802 end: Timestamp::new_millisecond(6),
2803 num_rows: 1,
2804 identifier: 1,
2805 },
2806 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2807 4, 5,
2808 ]))],
2809 ),
2810 ],
2811 vec![
2812 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2813 1, 1, 2, 2,
2814 ]))],
2815 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2816 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2817 4, 5,
2818 ]))],
2819 ],
2820 ),
2821 TestStream::new(
2823 Column::new("ts", 0),
2824 SortOptions {
2825 descending: false,
2826 nulls_first: true,
2827 },
2828 Some(6),
2829 vec![Field::new(
2830 "ts",
2831 DataType::Timestamp(TimeUnit::Millisecond, None),
2832 false,
2833 )],
2834 vec![
2835 (
2837 PartitionRange {
2838 start: Timestamp::new_millisecond(1),
2839 end: Timestamp::new_millisecond(3),
2840 num_rows: 1,
2841 identifier: 0,
2842 },
2843 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2844 1, 2,
2845 ]))],
2846 ),
2847 (
2848 PartitionRange {
2849 start: Timestamp::new_millisecond(1),
2850 end: Timestamp::new_millisecond(4),
2851 num_rows: 1,
2852 identifier: 1,
2853 },
2854 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2855 1, 2, 3,
2856 ]))],
2857 ),
2858 (
2859 PartitionRange {
2860 start: Timestamp::new_millisecond(3),
2861 end: Timestamp::new_millisecond(6),
2862 num_rows: 1,
2863 identifier: 1,
2864 },
2865 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2866 4, 5,
2867 ]))],
2868 ),
2869 ],
2870 vec![
2871 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2872 1, 1, 2, 2,
2873 ]))],
2874 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2875 vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2876 ],
2877 ),
2878 TestStream::new(
2879 Column::new("ts", 0),
2880 SortOptions {
2881 descending: false,
2882 nulls_first: true,
2883 },
2884 Some(3),
2885 vec![Field::new(
2886 "ts",
2887 DataType::Timestamp(TimeUnit::Millisecond, None),
2888 false,
2889 )],
2890 vec![
2891 (
2893 PartitionRange {
2894 start: Timestamp::new_millisecond(1),
2895 end: Timestamp::new_millisecond(3),
2896 num_rows: 1,
2897 identifier: 0,
2898 },
2899 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2900 1, 2,
2901 ]))],
2902 ),
2903 (
2904 PartitionRange {
2905 start: Timestamp::new_millisecond(1),
2906 end: Timestamp::new_millisecond(4),
2907 num_rows: 1,
2908 identifier: 1,
2909 },
2910 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2911 1, 2, 3,
2912 ]))],
2913 ),
2914 (
2915 PartitionRange {
2916 start: Timestamp::new_millisecond(3),
2917 end: Timestamp::new_millisecond(6),
2918 num_rows: 1,
2919 identifier: 1,
2920 },
2921 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2922 4, 5,
2923 ]))],
2924 ),
2925 ],
2926 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2927 [1, 1, 2],
2928 ))]],
2929 ),
2930 TestStream::new(
2932 Column::new("ts", 0),
2933 SortOptions {
2934 descending: true,
2935 nulls_first: true,
2936 },
2937 None,
2938 vec![Field::new(
2939 "ts",
2940 DataType::Timestamp(TimeUnit::Millisecond, None),
2941 false,
2942 )],
2943 vec![
2944 (
2946 PartitionRange {
2947 start: Timestamp::new_millisecond(3),
2948 end: Timestamp::new_millisecond(6),
2949 num_rows: 1,
2950 identifier: 1,
2951 },
2952 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2953 5, 4,
2954 ]))],
2955 ),
2956 (
2957 PartitionRange {
2958 start: Timestamp::new_millisecond(1),
2959 end: Timestamp::new_millisecond(4),
2960 num_rows: 1,
2961 identifier: 1,
2962 },
2963 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2964 3, 2, 1,
2965 ]))],
2966 ),
2967 (
2968 PartitionRange {
2969 start: Timestamp::new_millisecond(1),
2970 end: Timestamp::new_millisecond(3),
2971 num_rows: 1,
2972 identifier: 0,
2973 },
2974 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2975 2, 1,
2976 ]))],
2977 ),
2978 ],
2979 vec![
2980 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2981 5, 4,
2982 ]))],
2983 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2984 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2985 2, 2, 1, 1,
2986 ]))],
2987 ],
2988 ),
2989 TestStream::new(
2990 Column::new("ts", 0),
2991 SortOptions {
2992 descending: false,
2993 nulls_first: true,
2994 },
2995 None,
2996 vec![Field::new(
2997 "ts",
2998 DataType::Timestamp(TimeUnit::Millisecond, None),
2999 false,
3000 )],
3001 vec![
3002 (
3004 PartitionRange {
3005 start: Timestamp::new_millisecond(1),
3006 end: Timestamp::new_millisecond(10),
3007 num_rows: 1,
3008 identifier: 0,
3009 },
3010 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3011 1, 5, 9,
3012 ]))],
3013 ),
3014 (
3015 PartitionRange {
3016 start: Timestamp::new_millisecond(3),
3017 end: Timestamp::new_millisecond(7),
3018 num_rows: 1,
3019 identifier: 1,
3020 },
3021 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3022 3, 4, 5, 6,
3023 ]))],
3024 ),
3025 ],
3026 vec![
3027 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3028 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3029 3, 4, 5, 5, 6, 9,
3030 ]))],
3031 ],
3032 ),
3033 TestStream::new(
3034 Column::new("ts", 0),
3035 SortOptions {
3036 descending: false,
3037 nulls_first: true,
3038 },
3039 None,
3040 vec![Field::new(
3041 "ts",
3042 DataType::Timestamp(TimeUnit::Millisecond, None),
3043 false,
3044 )],
3045 vec![
3046 (
3048 PartitionRange {
3049 start: Timestamp::new_millisecond(1),
3050 end: Timestamp::new_millisecond(3),
3051 num_rows: 1,
3052 identifier: 0,
3053 },
3054 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3055 1, 2,
3056 ]))],
3057 ),
3058 (
3059 PartitionRange {
3060 start: Timestamp::new_millisecond(1),
3061 end: Timestamp::new_millisecond(10),
3062 num_rows: 1,
3063 identifier: 1,
3064 },
3065 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3066 1, 3, 4, 5, 6, 8,
3067 ]))],
3068 ),
3069 (
3070 PartitionRange {
3071 start: Timestamp::new_millisecond(7),
3072 end: Timestamp::new_millisecond(10),
3073 num_rows: 1,
3074 identifier: 1,
3075 },
3076 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3077 7, 8, 9,
3078 ]))],
3079 ),
3080 ],
3081 vec![
3082 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3083 1, 1, 2,
3084 ]))],
3085 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3086 3, 4, 5, 6,
3087 ]))],
3088 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3089 7, 8, 8, 9,
3090 ]))],
3091 ],
3092 ),
3093 TestStream::new(
3094 Column::new("ts", 0),
3095 SortOptions {
3096 descending: false,
3097 nulls_first: true,
3098 },
3099 None,
3100 vec![Field::new(
3101 "ts",
3102 DataType::Timestamp(TimeUnit::Millisecond, None),
3103 false,
3104 )],
3105 vec![
3106 (
3108 PartitionRange {
3109 start: Timestamp::new_millisecond(1),
3110 end: Timestamp::new_millisecond(11),
3111 num_rows: 1,
3112 identifier: 0,
3113 },
3114 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3115 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3116 ]))],
3117 ),
3118 (
3119 PartitionRange {
3120 start: Timestamp::new_millisecond(5),
3121 end: Timestamp::new_millisecond(7),
3122 num_rows: 1,
3123 identifier: 1,
3124 },
3125 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3126 5, 6,
3127 ]))],
3128 ),
3129 ],
3130 vec![
3131 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3132 1, 2, 3, 4,
3133 ]))],
3134 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3135 5, 5, 6, 6, 7, 8, 9, 10,
3136 ]))],
3137 ],
3138 ),
3139 ];
3140
3141 let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3142
3143 for (idx, testcase) in &indexed_test_cases {
3144 let output = testcase.run_test().await;
3145 assert_eq!(output, testcase.output, "case {idx} failed.");
3146 }
3147 }
3148
3149 #[tokio::test]
3150 async fn fuzzy_ish_test_window_sort_stream() {
3151 let test_cnt = 100;
3152 let part_cnt_bound = 100;
3153 let range_size_bound = 100;
3154 let range_offset_bound = 100;
3155 let in_range_datapoint_cnt_bound = 100;
3156 let fetch_bound = 100;
3157
3158 let mut rng = fastrand::Rng::new();
3159 let rng_seed = rng.u64(..);
3160 rng.seed(rng_seed);
3161 let mut bound_val = None;
3162 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3164 let mut full_testcase_list = Vec::new();
3165 for _case_id in 0..test_cnt {
3166 let descending = rng.bool();
3167 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3168 if descending {
3169 return Box::new(|a: &T, b: &T| b.cmp(a));
3170 }
3171 Box::new(|a: &T, b: &T| a.cmp(b))
3172 }
3173 let unit = match rng.u8(0..3) {
3174 0 => TimeUnit::Second,
3175 1 => TimeUnit::Millisecond,
3176 2 => TimeUnit::Microsecond,
3177 _ => TimeUnit::Nanosecond,
3178 };
3179 let fetch = if rng.bool() {
3180 Some(rng.usize(0..fetch_bound))
3181 } else {
3182 None
3183 };
3184
3185 let mut input_ranged_data = vec![];
3186 let mut output_data: Vec<i64> = vec![];
3187 for part_id in 0..rng.usize(0..part_cnt_bound) {
3189 let (start, end) = if descending {
3190 let end = bound_val
3191 .map(|i| i - rng.i64(0..range_offset_bound))
3192 .unwrap_or_else(|| rng.i64(..));
3193 bound_val = Some(end);
3194 let start = end - rng.i64(1..range_size_bound);
3195 let start = Timestamp::new(start, unit.into());
3196 let end = Timestamp::new(end, unit.into());
3197 (start, end)
3198 } else {
3199 let start = bound_val
3200 .map(|i| i + rng.i64(0..range_offset_bound))
3201 .unwrap_or_else(|| rng.i64(..));
3202 bound_val = Some(start);
3203 let end = start + rng.i64(1..range_size_bound);
3204 let start = Timestamp::new(start, unit.into());
3205 let end = Timestamp::new(end, unit.into());
3206 (start, end)
3207 };
3208
3209 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3210 let data_gen = iter
3211 .map(|_| rng.i64(start.value()..end.value()))
3212 .sorted_by(ret_cmp_fn(descending))
3213 .collect_vec();
3214 output_data.extend(data_gen.clone());
3215 let arr = new_ts_array(unit, data_gen);
3216 let range = PartitionRange {
3217 start,
3218 end,
3219 num_rows: arr.len(),
3220 identifier: part_id,
3221 };
3222 input_ranged_data.push((range, vec![arr]));
3223 }
3224
3225 output_data.sort_by(ret_cmp_fn(descending));
3226 if let Some(fetch) = fetch {
3227 output_data.truncate(fetch);
3228 }
3229 let output_arr = new_ts_array(unit, output_data);
3230
3231 let test_stream = TestStream::new(
3232 Column::new("ts", 0),
3233 SortOptions {
3234 descending,
3235 nulls_first: true,
3236 },
3237 fetch,
3238 vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3239 input_ranged_data.clone(),
3240 vec![vec![output_arr]],
3241 );
3242 full_testcase_list.push(test_stream);
3243 }
3244
3245 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3246 let res = test_stream.run_test().await;
3247 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3248 let expected = test_stream.output;
3249 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3250
3251 if res_concat != expected_concat {
3252 {
3253 let mut f_input = std::io::stderr();
3254 f_input.write_all(b"[").unwrap();
3255 for (input_range, input_arr) in test_stream.input {
3256 let range_json = json!({
3257 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3258 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3259 "num_rows": input_range.num_rows,
3260 "identifier": input_range.identifier,
3261 });
3262 let buf = Vec::new();
3263 let mut input_writer = ArrayWriter::new(buf);
3264 input_writer.write(&input_arr).unwrap();
3265 input_writer.finish().unwrap();
3266 let res_str =
3267 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3268 let whole_json =
3269 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3270 f_input.write_all(whole_json.as_bytes()).unwrap();
3271 }
3272 f_input.write_all(b"]").unwrap();
3273 }
3274 {
3275 let mut f_res = std::io::stderr();
3276 f_res.write_all(b"[").unwrap();
3277 for batch in &res {
3278 let mut res_writer = ArrayWriter::new(f_res);
3279 res_writer.write(batch).unwrap();
3280 res_writer.finish().unwrap();
3281 f_res = res_writer.into_inner();
3282 f_res.write_all(b",").unwrap();
3283 }
3284 f_res.write_all(b"]").unwrap();
3285
3286 let f_res_concat = std::io::stderr();
3287 let mut res_writer = ArrayWriter::new(f_res_concat);
3288 res_writer.write(&res_concat).unwrap();
3289 res_writer.finish().unwrap();
3290
3291 let f_expected = std::io::stderr();
3292 let mut expected_writer = ArrayWriter::new(f_expected);
3293 expected_writer.write(&expected_concat).unwrap();
3294 expected_writer.finish().unwrap();
3295 }
3296 panic!(
3297 "case failed, case id: {0}, output and expected output to stderr",
3298 case_id
3299 );
3300 }
3301 assert_eq!(
3302 res_concat, expected_concat,
3303 "case failed, case id: {}, rng seed: {}",
3304 case_id, rng_seed
3305 );
3306 }
3307 }
3308}