1use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{DataFusionError, internal_err};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68 expression: PhysicalSortExpr,
70 fetch: Option<usize>,
72 ranges: Vec<Vec<PartitionRange>>,
76 all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81 input: Arc<dyn ExecutionPlan>,
82 metrics: ExecutionPlanMetricsSet,
84 properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88 ranges: &[Vec<PartitionRange>],
89 descending: bool,
90) -> Result<()> {
91 let is_valid = ranges.iter().all(|r| {
92 if descending {
93 r.windows(2).all(|w| w[0].end >= w[1].end)
94 } else {
95 r.windows(2).all(|w| w[0].start <= w[1].start)
96 }
97 });
98
99 if !is_valid {
100 let msg = if descending {
101 "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102 } else {
103 "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104 };
105 let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106 Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107 } else {
108 Ok(())
109 }
110}
111
112impl WindowedSortExec {
113 pub fn try_new(
114 expression: PhysicalSortExpr,
115 fetch: Option<usize>,
116 ranges: Vec<Vec<PartitionRange>>,
117 input: Arc<dyn ExecutionPlan>,
118 ) -> Result<Self> {
119 check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121 let mut eq_properties = input.equivalence_properties().clone();
122 eq_properties.reorder(vec![expression.clone()])?;
123
124 let properties = input.properties();
125 let properties = PlanProperties::new(
126 eq_properties,
127 input.output_partitioning().clone(),
128 properties.emission_type,
129 properties.boundedness,
130 );
131
132 let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133 for r in &ranges {
134 let overlap_counts = split_overlapping_ranges(r);
135 let working_ranges =
136 compute_all_working_ranges(&overlap_counts, expression.options.descending);
137 all_avail_working_range.push(working_ranges);
138 }
139
140 Ok(Self {
141 expression,
142 fetch,
143 ranges,
144 all_avail_working_range,
145 input,
146 metrics: ExecutionPlanMetricsSet::new(),
147 properties,
148 })
149 }
150
151 pub fn to_stream(
155 &self,
156 context: Arc<TaskContext>,
157 partition: usize,
158 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159 let input_stream: DfSendableRecordBatchStream =
160 self.input.execute(partition, context.clone())?;
161
162 let df_stream = Box::pin(WindowedSortStream::new(
163 context,
164 self,
165 input_stream,
166 partition,
167 )) as _;
168
169 Ok(df_stream)
170 }
171}
172
173impl DisplayAs for WindowedSortExec {
174 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175 write!(
176 f,
177 "WindowedSortExec: expr={} num_ranges={}",
178 self.expression,
179 self.ranges.len()
180 )?;
181 if let Some(fetch) = self.fetch {
182 write!(f, " fetch={}", fetch)?;
183 }
184 Ok(())
185 }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189 fn as_any(&self) -> &dyn Any {
190 self
191 }
192
193 fn schema(&self) -> SchemaRef {
194 self.input.schema()
195 }
196
197 fn properties(&self) -> &PlanProperties {
198 &self.properties
199 }
200
201 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202 vec![&self.input]
203 }
204
205 fn with_new_children(
206 self: Arc<Self>,
207 children: Vec<Arc<dyn ExecutionPlan>>,
208 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209 let new_input = if let Some(first) = children.first() {
210 first
211 } else {
212 internal_err!("No children found")?
213 };
214 let new = Self::try_new(
215 self.expression.clone(),
216 self.fetch,
217 self.ranges.clone(),
218 new_input.clone(),
219 )?;
220 Ok(Arc::new(new))
221 }
222
223 fn execute(
224 &self,
225 partition: usize,
226 context: Arc<TaskContext>,
227 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228 self.to_stream(context, partition)
229 }
230
231 fn metrics(&self) -> Option<MetricsSet> {
232 Some(self.metrics.clone_inner())
233 }
234
235 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241 vec![false]
242 }
243
244 fn name(&self) -> &str {
245 "WindowedSortExec"
246 }
247}
248
249pub struct WindowedSortStream {
256 memory_pool: Arc<dyn MemoryPool>,
258 in_progress: Vec<DfRecordBatch>,
260 last_value: Option<Timestamp>,
262 sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264 merge_stream: VecDeque<DfSendableRecordBatchStream>,
266 merge_count: usize,
268 working_idx: usize,
270 input: DfSendableRecordBatchStream,
272 is_terminated: bool,
274 schema: SchemaRef,
276 expression: PhysicalSortExpr,
278 fetch: Option<usize>,
280 produced: usize,
282 batch_size: usize,
284 all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288 #[allow(dead_code)] ranges: Vec<PartitionRange>,
291 metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296 pub fn new(
297 context: Arc<TaskContext>,
298 exec: &WindowedSortExec,
299 input: DfSendableRecordBatchStream,
300 partition: usize,
301 ) -> Self {
302 Self {
303 memory_pool: context.runtime_env().memory_pool.clone(),
304 in_progress: Vec::new(),
305 last_value: None,
306 sorted_input_runs: Vec::new(),
307 merge_stream: VecDeque::new(),
308 merge_count: 0,
309 working_idx: 0,
310 schema: input.schema(),
311 input,
312 is_terminated: false,
313 expression: exec.expression.clone(),
314 fetch: exec.fetch,
315 produced: 0,
316 batch_size: context.session_config().batch_size(),
317 all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318 ranges: exec.ranges[partition].clone(),
319 metrics: BaselineMetrics::new(&exec.metrics, partition),
320 }
321 }
322}
323
324impl WindowedSortStream {
325 #[cfg(debug_assertions)]
326 fn check_subset_ranges(&self, cur_range: &TimeRange) {
327 let cur_is_subset_to = self
328 .ranges
329 .iter()
330 .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331 .collect_vec();
332 if cur_is_subset_to.is_empty() {
333 error!("Current range is not a subset of any PartitionRange");
334 let subset_ranges = self
336 .ranges
337 .iter()
338 .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339 .collect_vec();
340 let only_overlap = self
341 .ranges
342 .iter()
343 .filter(|r| {
344 let r = TimeRange::from(*r);
345 r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346 })
347 .collect_vec();
348 error!(
349 "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350 subset_ranges.len(),
351 only_overlap.len(),
352 subset_ranges,
353 only_overlap
354 );
355 } else {
356 let only_overlap = self
357 .ranges
358 .iter()
359 .filter(|r| {
360 let r = TimeRange::from(*r);
361 r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362 })
363 .collect_vec();
364 error!(
365 "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366 cur_is_subset_to.len(),
367 only_overlap.len(),
368 cur_is_subset_to,
369 only_overlap
370 );
371 }
372 let all_overlap_working_range = self
373 .all_avail_working_range
374 .iter()
375 .filter(|(range, _)| range.is_overlapping(cur_range))
376 .map(|(range, _)| range)
377 .collect_vec();
378 error!(
379 "Found {} working ranges that overlap with current range: {:?}",
380 all_overlap_working_range.len(),
381 all_overlap_working_range
382 );
383 }
384
385 fn poll_result_stream(
387 &mut self,
388 cx: &mut Context<'_>,
389 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390 while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391 match merge_stream.as_mut().poll_next(cx) {
392 Poll::Ready(Some(Ok(batch))) => {
393 let ret = if let Some(remaining) = self.remaining_fetch() {
394 if remaining == 0 {
395 self.is_terminated = true;
396 None
397 } else if remaining < batch.num_rows() {
398 self.produced += remaining;
399 Some(Ok(batch.slice(0, remaining)))
400 } else {
401 self.produced += batch.num_rows();
402 Some(Ok(batch))
403 }
404 } else {
405 self.produced += batch.num_rows();
406 Some(Ok(batch))
407 };
408 return Poll::Ready(ret);
409 }
410 Poll::Ready(Some(Err(e))) => {
411 return Poll::Ready(Some(Err(e)));
412 }
413 Poll::Ready(None) => {
414 self.merge_stream.pop_front();
417 continue;
418 }
419 Poll::Pending => {
420 return Poll::Pending;
421 }
422 }
423 }
424 Poll::Ready(None)
426 }
427
428 pub fn poll_next_inner(
432 mut self: Pin<&mut Self>,
433 cx: &mut Context<'_>,
434 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435 match self.poll_result_stream(cx) {
437 Poll::Ready(None) => {
438 if self.is_terminated {
439 return Poll::Ready(None);
440 }
441 }
442 x => return x,
443 };
444
445 while !self.is_terminated {
447 let SortedRunSet {
449 runs_with_batch,
450 sort_column,
451 } = match self.input.as_mut().poll_next(cx) {
452 Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
453 Poll::Ready(Some(Err(e))) => {
454 return Poll::Ready(Some(Err(e)));
455 }
456 Poll::Ready(None) => {
457 self.is_terminated = true;
459 self.build_sorted_stream()?;
460 self.start_new_merge_sort()?;
461 break;
462 }
463 Poll::Pending => return Poll::Pending,
464 };
465
466 let mut last_remaining = None;
472 let mut run_iter = runs_with_batch.into_iter();
473 loop {
474 let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
475 break;
476 };
477 if sorted_rb.num_rows() == 0 {
478 continue;
479 }
480 let Some(cur_range) = run_info.get_time_range() else {
482 internal_err!("Found NULL in time index column")?
483 };
484 let Some(working_range) = self.get_working_range() else {
485 internal_err!("No working range found")?
486 };
487
488 if sort_column.options.unwrap_or_default().descending {
490 if cur_range.end > working_range.end {
491 error!("Invalid range: {:?} > {:?}", cur_range, working_range);
492 #[cfg(debug_assertions)]
493 self.check_subset_ranges(&cur_range);
494 internal_err!(
495 "Current batch have data on the right side of working range, something is very wrong"
496 )?;
497 }
498 } else if cur_range.start < working_range.start {
499 error!("Invalid range: {:?} < {:?}", cur_range, working_range);
500 #[cfg(debug_assertions)]
501 self.check_subset_ranges(&cur_range);
502 internal_err!(
503 "Current batch have data on the left side of working range, something is very wrong"
504 )?;
505 }
506
507 if cur_range.is_subset(&working_range) {
508 self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
511 } else if let Some(intersection) = cur_range.intersection(&working_range) {
512 let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
514 let (offset, len) = find_slice_from_range(
515 &SortColumn {
516 values: cur_sort_column.clone(),
517 options: sort_column.options,
518 },
519 &intersection,
520 )?;
521
522 if offset != 0 {
523 internal_err!(
524 "Current batch have data on the left side of working range, something is very wrong"
525 )?;
526 }
527
528 let sliced_rb = sorted_rb.slice(offset, len);
529
530 self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
532 self.build_sorted_stream()?;
534
535 self.start_new_merge_sort()?;
537
538 let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
539 if r_len != 0 {
540 let remaining_rb = sorted_rb.slice(r_offset, r_len);
542 let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
543 let new_run_info = SucRun {
544 offset: run_info.offset + r_offset,
545 len: r_len,
546 first_val: new_first_val,
547 last_val: run_info.last_val,
548 };
549 last_remaining = Some((remaining_rb, new_run_info));
550 }
551 } else {
557 self.build_sorted_stream()?;
560 self.start_new_merge_sort()?;
561
562 last_remaining = Some((sorted_rb, run_info));
564 }
565 }
566
567 match self.poll_result_stream(cx) {
569 Poll::Ready(None) => {
570 if self.is_terminated {
571 return Poll::Ready(None);
572 }
573 }
574 x => return x,
575 };
576 }
577 self.poll_result_stream(cx)
579 }
580
581 fn push_batch(&mut self, batch: DfRecordBatch) {
582 self.in_progress.push(batch);
583 }
584
585 fn try_concat_batch(
589 &mut self,
590 batch: DfRecordBatch,
591 run_info: &SucRun<Timestamp>,
592 opt: Option<SortOptions>,
593 ) -> datafusion_common::Result<()> {
594 let is_ok_to_concat =
595 cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
596
597 if is_ok_to_concat {
598 self.push_batch(batch);
599 } else {
601 self.build_sorted_stream()?;
603 self.push_batch(batch);
604 }
605 self.last_value = run_info.last_val;
606 Ok(())
607 }
608
609 fn get_working_range(&self) -> Option<TimeRange> {
611 self.all_avail_working_range
612 .get(self.working_idx)
613 .map(|(range, _)| *range)
614 }
615
616 fn set_next_working_range(&mut self) {
618 self.working_idx += 1;
619 }
620
621 fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
623 if self.in_progress.is_empty() {
624 return Ok(());
625 }
626 let data = std::mem::take(&mut self.in_progress);
627
628 let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
629 self.sorted_input_runs.push(Box::pin(new_stream));
630 Ok(())
631 }
632
633 fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
635 if !self.in_progress.is_empty() {
636 return internal_err!("Starting a merge sort when in_progress is not empty")?;
637 }
638
639 self.set_next_working_range();
640
641 let streams = std::mem::take(&mut self.sorted_input_runs);
642 if streams.is_empty() {
643 return Ok(());
644 } else if streams.len() == 1 {
645 self.merge_stream
646 .push_back(streams.into_iter().next().unwrap());
647 return Ok(());
648 }
649
650 let fetch = self.remaining_fetch();
651 let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
652 .register(&self.memory_pool);
653 self.merge_count += 1;
654
655 let resulting_stream = StreamingMergeBuilder::new()
656 .with_streams(streams)
657 .with_schema(self.schema())
658 .with_expressions(&[self.expression.clone()].into())
659 .with_metrics(self.metrics.clone())
660 .with_batch_size(self.batch_size)
661 .with_fetch(fetch)
662 .with_reservation(reservation)
663 .build()?;
664 self.merge_stream.push_back(resulting_stream);
665 Ok(())
667 }
668
669 fn remaining_fetch(&self) -> Option<usize> {
672 let total_now = self.produced;
673 self.fetch.map(|p| p.saturating_sub(total_now))
674 }
675}
676
677impl Stream for WindowedSortStream {
678 type Item = datafusion_common::Result<DfRecordBatch>;
679
680 fn poll_next(
681 mut self: Pin<&mut Self>,
682 cx: &mut Context<'_>,
683 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
684 let result = self.as_mut().poll_next_inner(cx);
685 self.metrics.record_poll(result)
686 }
687}
688
689impl RecordBatchStream for WindowedSortStream {
690 fn schema(&self) -> SchemaRef {
691 self.schema.clone()
692 }
693}
694
695fn split_batch_to_sorted_run(
697 batch: DfRecordBatch,
698 expression: &PhysicalSortExpr,
699) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
700 let sort_column = expression.evaluate_to_sort_column(&batch)?;
702 let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
703 if let Some(run) = sorted_runs_offset.first()
704 && sorted_runs_offset.len() == 1
705 {
706 if !(run.offset == 0 && run.len == batch.num_rows()) {
707 internal_err!(
708 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
709 run.offset,
710 run.len,
711 batch.num_rows()
712 )?;
713 }
714 Ok(SortedRunSet {
716 runs_with_batch: vec![(batch, run.clone())],
717 sort_column,
718 })
719 } else {
720 let mut ret = Vec::with_capacity(sorted_runs_offset.len());
722 for run in sorted_runs_offset {
723 if run.offset + run.len > batch.num_rows() {
724 internal_err!(
725 "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
726 run.offset,
727 run.len,
728 batch.num_rows()
729 )?;
730 }
731 let new_rb = batch.slice(run.offset, run.len);
732 ret.push((new_rb, run));
733 }
734 Ok(SortedRunSet {
735 runs_with_batch: ret,
736 sort_column,
737 })
738 }
739}
740
741#[macro_export]
745macro_rules! downcast_ts_array {
746 ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
747 {
748 match $data_type {
749 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
750 $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
751 }
752 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
753 $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
754 }
755 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
756 $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
757 }
758 arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
759 $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
760 }
761 $($p => $fallback,)*
762 }
763 };
764}
765
766fn find_slice_from_range(
770 sort_column: &SortColumn,
771 range: &TimeRange,
772) -> datafusion_common::Result<(usize, usize)> {
773 let ty = sort_column.values.data_type();
774 let time_unit = if let DataType::Timestamp(unit, _) = ty {
775 unit
776 } else {
777 return Err(DataFusionError::Internal(format!(
778 "Unsupported sort column type: {}",
779 sort_column.values.data_type()
780 )));
781 };
782 let array = &sort_column.values;
783 let opt = &sort_column.options.unwrap_or_default();
784 let descending = opt.descending;
785
786 let typed_sorted_range = [range.start, range.end]
787 .iter()
788 .map(|t| {
789 t.convert_to(time_unit.into())
790 .ok_or_else(|| {
791 DataFusionError::Internal(format!(
792 "Failed to convert timestamp from {:?} to {:?}",
793 t.unit(),
794 time_unit
795 ))
796 })
797 .and_then(|typed_ts| {
798 let value = Value::Timestamp(typed_ts);
799 value
800 .try_to_scalar_value(&value.data_type())
801 .map_err(|e| DataFusionError::External(Box::new(e) as _))
802 })
803 })
804 .try_collect::<_, Vec<_>, _>()?;
805
806 let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
807
808 let (start, end) = if descending {
810 let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
814 let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
817 (start, end)
818 } else {
819 let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
822 let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
825 (start, end)
826 };
827
828 Ok((start, end - start))
829}
830
831#[macro_export]
835macro_rules! array_iter_helper {
836 ($t:ty, $unit:expr, $arr:expr) => {{
837 let typed = $arr
838 .as_any()
839 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
840 .unwrap();
841 let iter = typed.iter().enumerate();
842 Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
843 }};
844}
845
846fn cmp_with_opts<T: Ord>(
850 a: &Option<T>,
851 b: &Option<T>,
852 opt: &Option<SortOptions>,
853) -> std::cmp::Ordering {
854 let opt = opt.unwrap_or_default();
855
856 if let (Some(a), Some(b)) = (a, b) {
857 if opt.descending { b.cmp(a) } else { a.cmp(b) }
858 } else if opt.nulls_first {
859 a.cmp(b)
862 } else {
863 match (a, b) {
864 (Some(a), Some(b)) => a.cmp(b),
865 (Some(_), None) => std::cmp::Ordering::Less,
866 (None, Some(_)) => std::cmp::Ordering::Greater,
867 (None, None) => std::cmp::Ordering::Equal,
868 }
869 }
870}
871
872#[derive(Debug, Clone)]
873struct SortedRunSet<N: Ord> {
874 runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
876 sort_column: SortColumn,
878}
879
880#[derive(Debug, Clone, PartialEq)]
882struct SucRun<N: Ord> {
883 offset: usize,
885 len: usize,
887 first_val: Option<N>,
889 last_val: Option<N>,
891}
892
893impl SucRun<Timestamp> {
894 fn get_time_range(&self) -> Option<TimeRange> {
896 let start = self.first_val.min(self.last_val);
897 let end = self
898 .first_val
899 .max(self.last_val)
900 .map(|i| Timestamp::new(i.value() + 1, i.unit()));
901 start.zip(end).map(|(s, e)| TimeRange::new(s, e))
902 }
903}
904
905fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
907 iter: T,
908 sort_opts: &Option<SortOptions>,
909) -> Vec<SucRun<N>> {
910 let mut runs = Vec::new();
911 let mut last_value = None;
912 let mut iter_len = None;
913
914 let mut last_offset = 0;
915 let mut first_val: Option<N> = None;
916 let mut last_val: Option<N> = None;
917
918 for (idx, t) in iter {
919 if let Some(last_value) = &last_value
920 && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
921 {
922 let len = idx - last_offset;
924 let run = SucRun {
925 offset: last_offset,
926 len,
927 first_val,
928 last_val,
929 };
930 runs.push(run);
931 first_val = None;
932 last_val = None;
933
934 last_offset = idx;
935 }
936 last_value = Some(t);
937 if let Some(t) = t {
938 first_val = first_val.or(Some(t));
939 last_val = Some(t).or(last_val);
940 }
941 iter_len = Some(idx);
942 }
943 let run = SucRun {
944 offset: last_offset,
945 len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
946 first_val,
947 last_val,
948 };
949 runs.push(run);
950
951 runs
952}
953
954fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
958 let ty = sort_column.values.data_type();
959 if let DataType::Timestamp(unit, _) = ty {
960 let array = &sort_column.values;
961 let iter = downcast_ts_array!(
962 array.data_type() => (array_iter_helper, array),
963 _ => internal_err!("Unsupported sort column type: {ty}")?
964 );
965
966 let raw = find_successive_runs(iter, &sort_column.options);
967 let ts_runs = raw
968 .into_iter()
969 .map(|run| SucRun {
970 offset: run.offset,
971 len: run.len,
972 first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
973 last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
974 })
975 .collect_vec();
976 Ok(ts_runs)
977 } else {
978 Err(DataFusionError::Internal(format!(
979 "Unsupported sort column type: {ty}"
980 )))
981 }
982}
983
984#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
988struct TimeRange {
989 start: Timestamp,
990 end: Timestamp,
991}
992
993impl From<&PartitionRange> for TimeRange {
994 fn from(range: &PartitionRange) -> Self {
995 Self::new(range.start, range.end)
996 }
997}
998
999impl From<(Timestamp, Timestamp)> for TimeRange {
1000 fn from(range: (Timestamp, Timestamp)) -> Self {
1001 Self::new(range.0, range.1)
1002 }
1003}
1004
1005impl From<&(Timestamp, Timestamp)> for TimeRange {
1006 fn from(range: &(Timestamp, Timestamp)) -> Self {
1007 Self::new(range.0, range.1)
1008 }
1009}
1010
1011impl TimeRange {
1012 fn new(start: Timestamp, end: Timestamp) -> Self {
1014 if start > end {
1015 Self {
1016 start: end,
1017 end: start,
1018 }
1019 } else {
1020 Self { start, end }
1021 }
1022 }
1023
1024 fn is_subset(&self, other: &Self) -> bool {
1025 self.start >= other.start && self.end <= other.end
1026 }
1027
1028 fn is_overlapping(&self, other: &Self) -> bool {
1030 !(self.start >= other.end || self.end <= other.start)
1031 }
1032
1033 fn intersection(&self, other: &Self) -> Option<Self> {
1034 if self.is_overlapping(other) {
1035 Some(Self::new(
1036 self.start.max(other.start),
1037 self.end.min(other.end),
1038 ))
1039 } else {
1040 None
1041 }
1042 }
1043
1044 fn difference(&self, other: &Self) -> Vec<Self> {
1045 if !self.is_overlapping(other) {
1046 vec![*self]
1047 } else {
1048 let mut ret = Vec::new();
1049 if self.start < other.start && self.end > other.end {
1050 ret.push(Self::new(self.start, other.start));
1051 ret.push(Self::new(other.end, self.end));
1052 } else if self.start < other.start {
1053 ret.push(Self::new(self.start, other.start));
1054 } else if self.end > other.end {
1055 ret.push(Self::new(other.end, self.end));
1056 }
1057 ret
1058 }
1059 }
1060}
1061
1062fn split_range_by(
1064 input_range: &TimeRange,
1065 input_parts: &[usize],
1066 split_by: &TimeRange,
1067 split_idx: usize,
1068) -> Vec<Action> {
1069 let mut ret = Vec::new();
1070 if input_range.is_overlapping(split_by) {
1071 let input_parts = input_parts.to_vec();
1072 let new_parts = {
1073 let mut new_parts = input_parts.clone();
1074 new_parts.push(split_idx);
1075 new_parts
1076 };
1077
1078 ret.push(Action::Pop(*input_range));
1079 if let Some(intersection) = input_range.intersection(split_by) {
1080 ret.push(Action::Push(intersection, new_parts.clone()));
1081 }
1082 for diff in input_range.difference(split_by) {
1083 ret.push(Action::Push(diff, input_parts.clone()));
1084 }
1085 }
1086 ret
1087}
1088
1089#[derive(Debug, Clone, PartialEq, Eq)]
1090enum Action {
1091 Pop(TimeRange),
1092 Push(TimeRange, Vec<usize>),
1093}
1094
1095fn compute_all_working_ranges(
1103 overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1104 descending: bool,
1105) -> Vec<(TimeRange, BTreeSet<usize>)> {
1106 let mut ret = Vec::new();
1107 let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1108 let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1109 Box::new(overlap_counts.iter().rev()) as _
1110 } else {
1111 Box::new(overlap_counts.iter()) as _
1112 };
1113 for (range, set) in overlap_iter {
1114 match &mut cur_range_set {
1115 None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1116 Some((working_range, working_set)) => {
1117 let need_expand = {
1122 let last_part = working_set.last();
1123 let inter: BTreeSet<usize> = working_set
1124 .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1125 .cloned()
1126 .collect();
1127 if let Some(one) = inter.first()
1128 && inter.len() == 1
1129 && Some(one) == last_part
1130 {
1131 if set.iter().all(|p| Some(p) >= last_part) {
1133 false
1135 } else {
1136 true
1138 }
1139 } else if inter.is_empty() {
1140 false
1142 } else {
1143 true
1145 }
1146 };
1147
1148 if need_expand {
1149 if descending {
1150 working_range.start = range.start;
1151 } else {
1152 working_range.end = range.end;
1153 }
1154 working_set.extend(set.iter().cloned());
1155 } else {
1156 ret.push((*working_range, std::mem::take(working_set)));
1157 cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1158 }
1159 }
1160 }
1161 }
1162
1163 if let Some(cur_range_set) = cur_range_set {
1164 ret.push(cur_range_set)
1165 }
1166
1167 ret
1168}
1169
1170fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1173 let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1175 for (idx, range) in ranges.iter().enumerate() {
1176 let key: TimeRange = (range.start, range.end).into();
1177 let mut actions = Vec::new();
1178 let mut untouched = vec![key];
1179 let forward_iter = ret
1183 .range(key..)
1184 .take_while(|(range, _)| range.is_overlapping(&key));
1185 let backward_iter = ret
1186 .range(..key)
1187 .rev()
1188 .take_while(|(range, _)| range.is_overlapping(&key));
1189
1190 for (range, parts) in forward_iter.chain(backward_iter) {
1191 untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1192 let act = split_range_by(range, parts, &key, idx);
1193 actions.extend(act.into_iter());
1194 }
1195
1196 for action in actions {
1197 match action {
1198 Action::Pop(range) => {
1199 ret.remove(&range);
1200 }
1201 Action::Push(range, parts) => {
1202 ret.insert(range, parts);
1203 }
1204 }
1205 }
1206
1207 for range in untouched {
1209 ret.insert(range, vec![idx]);
1210 }
1211 }
1212 ret
1213}
1214
1215fn get_timestamp_from_idx(
1217 array: &ArrayRef,
1218 offset: usize,
1219) -> datafusion_common::Result<Option<Timestamp>> {
1220 let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1221 unit
1222 } else {
1223 return Err(DataFusionError::Internal(format!(
1224 "Unsupported sort column type: {}",
1225 array.data_type()
1226 )));
1227 };
1228 let ty = array.data_type();
1229 let array = array.slice(offset, 1);
1230 let mut iter = downcast_ts_array!(
1231 array.data_type() => (array_iter_helper, array),
1232 _ => internal_err!("Unsupported sort column type: {ty}")?
1233 );
1234 let (_idx, val) = iter.next().ok_or_else(|| {
1235 DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1236 })?;
1237 let val = if let Some(val) = val {
1238 val
1239 } else {
1240 return Ok(None);
1241 };
1242 let gt_timestamp = Timestamp::new(val, time_unit.into());
1243 Ok(Some(gt_timestamp))
1244}
1245
1246#[cfg(test)]
1247mod test {
1248 use std::io::Write;
1249 use std::sync::Arc;
1250
1251 use arrow::array::{ArrayRef, TimestampMillisecondArray};
1252 use arrow::compute::concat_batches;
1253 use arrow::json::ArrayWriter;
1254 use arrow_schema::{Field, Schema, TimeUnit};
1255 use futures::StreamExt;
1256 use pretty_assertions::assert_eq;
1257 use serde_json::json;
1258
1259 use super::*;
1260 use crate::test_util::{MockInputExec, new_ts_array};
1261
1262 #[test]
1263 fn test_overlapping() {
1264 let testcases = [
1265 (
1266 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1267 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1268 false,
1269 ),
1270 (
1271 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1272 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1273 true,
1274 ),
1275 (
1276 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1277 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1278 true,
1279 ),
1280 (
1281 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1282 (
1283 Timestamp::new_millisecond(1000),
1284 Timestamp::new_millisecond(1002),
1285 ),
1286 true,
1287 ),
1288 (
1289 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1290 (
1291 Timestamp::new_millisecond(1001),
1292 Timestamp::new_millisecond(1002),
1293 ),
1294 false,
1295 ),
1296 (
1297 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1298 (
1299 Timestamp::new_millisecond(1002),
1300 Timestamp::new_millisecond(1003),
1301 ),
1302 false,
1303 ),
1304 ];
1305
1306 for (range1, range2, expected) in testcases.iter() {
1307 assert_eq!(
1308 TimeRange::from(range1).is_overlapping(&range2.into()),
1309 *expected,
1310 "range1: {:?}, range2: {:?}",
1311 range1,
1312 range2
1313 );
1314 }
1315 }
1316
1317 #[test]
1318 fn test_split() {
1319 let testcases = [
1320 (
1322 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1323 vec![0],
1324 (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1325 1,
1326 vec![],
1327 ),
1328 (
1330 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1331 vec![0],
1332 (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1333 1,
1334 vec![
1335 Action::Pop(
1336 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1337 ),
1338 Action::Push(
1339 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1340 vec![0, 1],
1341 ),
1342 ],
1343 ),
1344 (
1345 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1346 vec![0],
1347 (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1348 1,
1349 vec![
1350 Action::Pop(
1351 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1352 ),
1353 Action::Push(
1354 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1355 vec![0, 1],
1356 ),
1357 ],
1358 ),
1359 (
1360 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1361 vec![0],
1362 (
1363 Timestamp::new_millisecond(1000),
1364 Timestamp::new_millisecond(1002),
1365 ),
1366 1,
1367 vec![
1368 Action::Pop(
1369 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1370 ),
1371 Action::Push(
1372 (
1373 Timestamp::new_millisecond(1000),
1374 Timestamp::new_millisecond(1001),
1375 )
1376 .into(),
1377 vec![0, 1],
1378 ),
1379 ],
1380 ),
1381 (
1383 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1384 vec![0],
1385 (
1386 Timestamp::new_millisecond(1001),
1387 Timestamp::new_millisecond(1002),
1388 ),
1389 1,
1390 vec![
1391 Action::Pop(
1392 (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1393 ),
1394 Action::Push(
1395 (
1396 Timestamp::new_millisecond(1001),
1397 Timestamp::new_millisecond(1002),
1398 )
1399 .into(),
1400 vec![0, 1],
1401 ),
1402 Action::Push(
1403 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1404 vec![0],
1405 ),
1406 ],
1407 ),
1408 (
1410 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1411 vec![0],
1412 (
1413 Timestamp::new_millisecond(1001),
1414 Timestamp::new_millisecond(1002),
1415 ),
1416 1,
1417 vec![
1418 Action::Pop(
1419 (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1420 ),
1421 Action::Push(
1422 (
1423 Timestamp::new_millisecond(1001),
1424 Timestamp::new_millisecond(1002),
1425 )
1426 .into(),
1427 vec![0, 1],
1428 ),
1429 Action::Push(
1430 (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1431 vec![0],
1432 ),
1433 Action::Push(
1434 (
1435 Timestamp::new_millisecond(1002),
1436 Timestamp::new_millisecond(1004),
1437 )
1438 .into(),
1439 vec![0],
1440 ),
1441 ],
1442 ),
1443 ];
1444
1445 for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1446 assert_eq!(
1447 split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1448 *expected,
1449 "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1450 range,
1451 parts,
1452 split_by,
1453 split_idx
1454 );
1455 }
1456 }
1457
1458 #[test]
1459 fn test_compute_working_ranges_rev() {
1460 let testcases = vec![
1461 (
1462 BTreeMap::from([(
1463 (Timestamp::new_second(1), Timestamp::new_second(2)),
1464 vec![0],
1465 )]),
1466 vec![(
1467 (Timestamp::new_second(1), Timestamp::new_second(2)),
1468 BTreeSet::from([0]),
1469 )],
1470 ),
1471 (
1472 BTreeMap::from([(
1473 (Timestamp::new_second(1), Timestamp::new_second(2)),
1474 vec![0, 1],
1475 )]),
1476 vec![(
1477 (Timestamp::new_second(1), Timestamp::new_second(2)),
1478 BTreeSet::from([0, 1]),
1479 )],
1480 ),
1481 (
1482 BTreeMap::from([
1483 (
1484 (Timestamp::new_second(2), Timestamp::new_second(3)),
1485 vec![0],
1486 ),
1487 (
1488 (Timestamp::new_second(1), Timestamp::new_second(2)),
1489 vec![0, 1],
1490 ),
1491 ]),
1492 vec![
1493 (
1494 (Timestamp::new_second(2), Timestamp::new_second(3)),
1495 BTreeSet::from([0]),
1496 ),
1497 (
1498 (Timestamp::new_second(1), Timestamp::new_second(2)),
1499 BTreeSet::from([0, 1]),
1500 ),
1501 ],
1502 ),
1503 (
1504 BTreeMap::from([
1505 (
1506 (Timestamp::new_second(2), Timestamp::new_second(3)),
1507 vec![0, 1],
1508 ),
1509 (
1510 (Timestamp::new_second(1), Timestamp::new_second(2)),
1511 vec![1],
1512 ),
1513 ]),
1514 vec![
1515 (
1516 (Timestamp::new_second(2), Timestamp::new_second(3)),
1517 BTreeSet::from([0, 1]),
1518 ),
1519 (
1520 (Timestamp::new_second(1), Timestamp::new_second(2)),
1521 BTreeSet::from([1]),
1522 ),
1523 ],
1524 ),
1525 (
1526 BTreeMap::from([
1527 (
1528 (Timestamp::new_second(3), Timestamp::new_second(4)),
1529 vec![0],
1530 ),
1531 (
1532 (Timestamp::new_second(2), Timestamp::new_second(3)),
1533 vec![0, 1],
1534 ),
1535 (
1536 (Timestamp::new_second(1), Timestamp::new_second(2)),
1537 vec![1],
1538 ),
1539 ]),
1540 vec![
1541 (
1542 (Timestamp::new_second(3), Timestamp::new_second(4)),
1543 BTreeSet::from([0]),
1544 ),
1545 (
1546 (Timestamp::new_second(2), Timestamp::new_second(3)),
1547 BTreeSet::from([0, 1]),
1548 ),
1549 (
1550 (Timestamp::new_second(1), Timestamp::new_second(2)),
1551 BTreeSet::from([1]),
1552 ),
1553 ],
1554 ),
1555 (
1556 BTreeMap::from([
1557 (
1558 (Timestamp::new_second(3), Timestamp::new_second(4)),
1559 vec![0, 2],
1560 ),
1561 (
1562 (Timestamp::new_second(2), Timestamp::new_second(3)),
1563 vec![0, 1, 2],
1564 ),
1565 (
1566 (Timestamp::new_second(1), Timestamp::new_second(2)),
1567 vec![1, 2],
1568 ),
1569 ]),
1570 vec![(
1571 (Timestamp::new_second(1), Timestamp::new_second(4)),
1572 BTreeSet::from([0, 1, 2]),
1573 )],
1574 ),
1575 (
1576 BTreeMap::from([
1577 (
1578 (Timestamp::new_second(2), Timestamp::new_second(3)),
1579 vec![0, 2],
1580 ),
1581 (
1582 (Timestamp::new_second(1), Timestamp::new_second(2)),
1583 vec![1, 2],
1584 ),
1585 ]),
1586 vec![(
1587 (Timestamp::new_second(1), Timestamp::new_second(3)),
1588 BTreeSet::from([0, 1, 2]),
1589 )],
1590 ),
1591 (
1592 BTreeMap::from([
1593 (
1594 (Timestamp::new_second(3), Timestamp::new_second(4)),
1595 vec![0, 1],
1596 ),
1597 (
1598 (Timestamp::new_second(2), Timestamp::new_second(3)),
1599 vec![0, 1, 2],
1600 ),
1601 (
1602 (Timestamp::new_second(1), Timestamp::new_second(2)),
1603 vec![1, 2],
1604 ),
1605 ]),
1606 vec![(
1607 (Timestamp::new_second(1), Timestamp::new_second(4)),
1608 BTreeSet::from([0, 1, 2]),
1609 )],
1610 ),
1611 (
1612 BTreeMap::from([
1613 (
1614 (Timestamp::new_second(2), Timestamp::new_second(3)),
1615 vec![0, 1],
1616 ),
1617 (
1618 (Timestamp::new_second(1), Timestamp::new_second(2)),
1619 vec![1, 2],
1620 ),
1621 ]),
1622 vec![
1623 (
1624 (Timestamp::new_second(2), Timestamp::new_second(3)),
1625 BTreeSet::from([0, 1]),
1626 ),
1627 (
1628 (Timestamp::new_second(1), Timestamp::new_second(2)),
1629 BTreeSet::from([1, 2]),
1630 ),
1631 ],
1632 ),
1633 (
1635 BTreeMap::from([
1636 (
1637 (Timestamp::new_second(2), Timestamp::new_second(3)),
1638 vec![0],
1639 ),
1640 (
1641 (Timestamp::new_second(1), Timestamp::new_second(2)),
1642 vec![1, 2],
1643 ),
1644 ]),
1645 vec![
1646 (
1647 (Timestamp::new_second(2), Timestamp::new_second(3)),
1648 BTreeSet::from([0]),
1649 ),
1650 (
1651 (Timestamp::new_second(1), Timestamp::new_second(2)),
1652 BTreeSet::from([1, 2]),
1653 ),
1654 ],
1655 ),
1656 ];
1657
1658 for (input, expected) in testcases {
1659 let expected = expected
1660 .into_iter()
1661 .map(|(r, s)| (r.into(), s))
1662 .collect_vec();
1663 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1664 assert_eq!(
1665 compute_all_working_ranges(&input, true),
1666 expected,
1667 "input: {:?}",
1668 input
1669 );
1670 }
1671 }
1672
1673 #[test]
1674 fn test_compute_working_ranges() {
1675 let testcases = vec![
1676 (
1677 BTreeMap::from([(
1678 (Timestamp::new_second(1), Timestamp::new_second(2)),
1679 vec![0],
1680 )]),
1681 vec![(
1682 (Timestamp::new_second(1), Timestamp::new_second(2)),
1683 BTreeSet::from([0]),
1684 )],
1685 ),
1686 (
1687 BTreeMap::from([(
1688 (Timestamp::new_second(1), Timestamp::new_second(2)),
1689 vec![0, 1],
1690 )]),
1691 vec![(
1692 (Timestamp::new_second(1), Timestamp::new_second(2)),
1693 BTreeSet::from([0, 1]),
1694 )],
1695 ),
1696 (
1697 BTreeMap::from([
1698 (
1699 (Timestamp::new_second(1), Timestamp::new_second(2)),
1700 vec![0, 1],
1701 ),
1702 (
1703 (Timestamp::new_second(2), Timestamp::new_second(3)),
1704 vec![1],
1705 ),
1706 ]),
1707 vec![
1708 (
1709 (Timestamp::new_second(1), Timestamp::new_second(2)),
1710 BTreeSet::from([0, 1]),
1711 ),
1712 (
1713 (Timestamp::new_second(2), Timestamp::new_second(3)),
1714 BTreeSet::from([1]),
1715 ),
1716 ],
1717 ),
1718 (
1719 BTreeMap::from([
1720 (
1721 (Timestamp::new_second(1), Timestamp::new_second(2)),
1722 vec![0],
1723 ),
1724 (
1725 (Timestamp::new_second(2), Timestamp::new_second(3)),
1726 vec![0, 1],
1727 ),
1728 ]),
1729 vec![
1730 (
1731 (Timestamp::new_second(1), Timestamp::new_second(2)),
1732 BTreeSet::from([0]),
1733 ),
1734 (
1735 (Timestamp::new_second(2), Timestamp::new_second(3)),
1736 BTreeSet::from([0, 1]),
1737 ),
1738 ],
1739 ),
1740 (
1742 BTreeMap::from([
1743 (
1744 (Timestamp::new_second(1), Timestamp::new_second(2)),
1745 vec![0],
1746 ),
1747 (
1748 (Timestamp::new_second(2), Timestamp::new_second(3)),
1749 vec![0, 1],
1750 ),
1751 (
1752 (Timestamp::new_second(3), Timestamp::new_second(4)),
1753 vec![1],
1754 ),
1755 ]),
1756 vec![
1757 (
1758 (Timestamp::new_second(1), Timestamp::new_second(2)),
1759 BTreeSet::from([0]),
1760 ),
1761 (
1762 (Timestamp::new_second(2), Timestamp::new_second(3)),
1763 BTreeSet::from([0, 1]),
1764 ),
1765 (
1766 (Timestamp::new_second(3), Timestamp::new_second(4)),
1767 BTreeSet::from([1]),
1768 ),
1769 ],
1770 ),
1771 (
1772 BTreeMap::from([
1773 (
1774 (Timestamp::new_second(1), Timestamp::new_second(2)),
1775 vec![0, 2],
1776 ),
1777 (
1778 (Timestamp::new_second(2), Timestamp::new_second(3)),
1779 vec![0, 1, 2],
1780 ),
1781 (
1782 (Timestamp::new_second(3), Timestamp::new_second(4)),
1783 vec![1, 2],
1784 ),
1785 ]),
1786 vec![(
1787 (Timestamp::new_second(1), Timestamp::new_second(4)),
1788 BTreeSet::from([0, 1, 2]),
1789 )],
1790 ),
1791 (
1792 BTreeMap::from([
1793 (
1794 (Timestamp::new_second(1), Timestamp::new_second(2)),
1795 vec![0, 2],
1796 ),
1797 (
1798 (Timestamp::new_second(2), Timestamp::new_second(3)),
1799 vec![1, 2],
1800 ),
1801 ]),
1802 vec![(
1803 (Timestamp::new_second(1), Timestamp::new_second(3)),
1804 BTreeSet::from([0, 1, 2]),
1805 )],
1806 ),
1807 (
1808 BTreeMap::from([
1809 (
1810 (Timestamp::new_second(1), Timestamp::new_second(2)),
1811 vec![0, 1],
1812 ),
1813 (
1814 (Timestamp::new_second(2), Timestamp::new_second(3)),
1815 vec![0, 1, 2],
1816 ),
1817 (
1818 (Timestamp::new_second(3), Timestamp::new_second(4)),
1819 vec![1, 2],
1820 ),
1821 ]),
1822 vec![(
1823 (Timestamp::new_second(1), Timestamp::new_second(4)),
1824 BTreeSet::from([0, 1, 2]),
1825 )],
1826 ),
1827 (
1828 BTreeMap::from([
1829 (
1830 (Timestamp::new_second(1), Timestamp::new_second(2)),
1831 vec![0, 1],
1832 ),
1833 (
1834 (Timestamp::new_second(2), Timestamp::new_second(3)),
1835 vec![1, 2],
1836 ),
1837 ]),
1838 vec![
1839 (
1840 (Timestamp::new_second(1), Timestamp::new_second(2)),
1841 BTreeSet::from([0, 1]),
1842 ),
1843 (
1844 (Timestamp::new_second(2), Timestamp::new_second(3)),
1845 BTreeSet::from([1, 2]),
1846 ),
1847 ],
1848 ),
1849 (
1851 BTreeMap::from([
1852 (
1853 (Timestamp::new_second(1), Timestamp::new_second(2)),
1854 vec![0, 1],
1855 ),
1856 (
1857 (Timestamp::new_second(2), Timestamp::new_second(3)),
1858 vec![2],
1859 ),
1860 ]),
1861 vec![
1862 (
1863 (Timestamp::new_second(1), Timestamp::new_second(2)),
1864 BTreeSet::from([0, 1]),
1865 ),
1866 (
1867 (Timestamp::new_second(2), Timestamp::new_second(3)),
1868 BTreeSet::from([2]),
1869 ),
1870 ],
1871 ),
1872 ];
1873
1874 for (input, expected) in testcases {
1875 let expected = expected
1876 .into_iter()
1877 .map(|(r, s)| (r.into(), s))
1878 .collect_vec();
1879 let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1880 assert_eq!(
1881 compute_all_working_ranges(&input, false),
1882 expected,
1883 "input: {:?}",
1884 input
1885 );
1886 }
1887 }
1888
1889 #[test]
1890 fn test_split_overlap_range() {
1891 let testcases = vec![
1892 (
1894 vec![PartitionRange {
1895 start: Timestamp::new_second(1),
1896 end: Timestamp::new_second(2),
1897 num_rows: 2,
1898 identifier: 0,
1899 }],
1900 BTreeMap::from_iter(
1901 vec![(
1902 (Timestamp::new_second(1), Timestamp::new_second(2)),
1903 vec![0],
1904 )]
1905 .into_iter(),
1906 ),
1907 ),
1908 (
1910 vec![
1911 PartitionRange {
1912 start: Timestamp::new_second(1),
1913 end: Timestamp::new_second(2),
1914 num_rows: 2,
1915 identifier: 0,
1916 },
1917 PartitionRange {
1918 start: Timestamp::new_second(1),
1919 end: Timestamp::new_second(2),
1920 num_rows: 2,
1921 identifier: 1,
1922 },
1923 ],
1924 BTreeMap::from_iter(
1925 vec![(
1926 (Timestamp::new_second(1), Timestamp::new_second(2)),
1927 vec![0, 1],
1928 )]
1929 .into_iter(),
1930 ),
1931 ),
1932 (
1933 vec![
1934 PartitionRange {
1935 start: Timestamp::new_second(1),
1936 end: Timestamp::new_second(3),
1937 num_rows: 2,
1938 identifier: 0,
1939 },
1940 PartitionRange {
1941 start: Timestamp::new_second(2),
1942 end: Timestamp::new_second(4),
1943 num_rows: 2,
1944 identifier: 1,
1945 },
1946 ],
1947 BTreeMap::from_iter(
1948 vec![
1949 (
1950 (Timestamp::new_second(1), Timestamp::new_second(2)),
1951 vec![0],
1952 ),
1953 (
1954 (Timestamp::new_second(2), Timestamp::new_second(3)),
1955 vec![0, 1],
1956 ),
1957 (
1958 (Timestamp::new_second(3), Timestamp::new_second(4)),
1959 vec![1],
1960 ),
1961 ]
1962 .into_iter(),
1963 ),
1964 ),
1965 (
1967 vec![
1968 PartitionRange {
1969 start: Timestamp::new_second(1),
1970 end: Timestamp::new_second(3),
1971 num_rows: 2,
1972 identifier: 0,
1973 },
1974 PartitionRange {
1975 start: Timestamp::new_second(2),
1976 end: Timestamp::new_second(4),
1977 num_rows: 2,
1978 identifier: 1,
1979 },
1980 PartitionRange {
1981 start: Timestamp::new_second(1),
1982 end: Timestamp::new_second(4),
1983 num_rows: 2,
1984 identifier: 2,
1985 },
1986 ],
1987 BTreeMap::from_iter(
1988 vec![
1989 (
1990 (Timestamp::new_second(1), Timestamp::new_second(2)),
1991 vec![0, 2],
1992 ),
1993 (
1994 (Timestamp::new_second(2), Timestamp::new_second(3)),
1995 vec![0, 1, 2],
1996 ),
1997 (
1998 (Timestamp::new_second(3), Timestamp::new_second(4)),
1999 vec![1, 2],
2000 ),
2001 ]
2002 .into_iter(),
2003 ),
2004 ),
2005 (
2006 vec![
2007 PartitionRange {
2008 start: Timestamp::new_second(1),
2009 end: Timestamp::new_second(3),
2010 num_rows: 2,
2011 identifier: 0,
2012 },
2013 PartitionRange {
2014 start: Timestamp::new_second(1),
2015 end: Timestamp::new_second(4),
2016 num_rows: 2,
2017 identifier: 1,
2018 },
2019 PartitionRange {
2020 start: Timestamp::new_second(2),
2021 end: Timestamp::new_second(4),
2022 num_rows: 2,
2023 identifier: 2,
2024 },
2025 ],
2026 BTreeMap::from_iter(
2027 vec![
2028 (
2029 (Timestamp::new_second(1), Timestamp::new_second(2)),
2030 vec![0, 1],
2031 ),
2032 (
2033 (Timestamp::new_second(2), Timestamp::new_second(3)),
2034 vec![0, 1, 2],
2035 ),
2036 (
2037 (Timestamp::new_second(3), Timestamp::new_second(4)),
2038 vec![1, 2],
2039 ),
2040 ]
2041 .into_iter(),
2042 ),
2043 ),
2044 ];
2045
2046 for (input, expected) in testcases {
2047 let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2048 assert_eq!(split_overlapping_ranges(&input), expected);
2049 }
2050 }
2051
2052 impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2053 fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2054 Self {
2055 offset: offset as usize,
2056 len: len as usize,
2057 first_val: min_val,
2058 last_val: max_val,
2059 }
2060 }
2061 }
2062
2063 #[test]
2064 fn test_find_successive_runs() {
2065 let testcases = vec![
2066 (
2067 vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2068 Some(SortOptions {
2069 descending: false,
2070 nulls_first: false,
2071 }),
2072 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2073 ),
2074 (
2075 vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2076 Some(SortOptions {
2077 descending: false,
2078 nulls_first: false,
2079 }),
2080 vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2081 ),
2082 (
2083 vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2084 Some(SortOptions {
2085 descending: false,
2086 nulls_first: false,
2087 }),
2088 vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2089 ),
2090 (
2091 vec![Some(1), Some(2), Some(1), Some(3)],
2092 Some(SortOptions {
2093 descending: false,
2094 nulls_first: false,
2095 }),
2096 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2097 ),
2098 (
2099 vec![Some(1), Some(2), Some(1), Some(3)],
2100 Some(SortOptions {
2101 descending: true,
2102 nulls_first: false,
2103 }),
2104 vec![
2105 (0, 1, Some(1), Some(1)),
2106 (1, 2, Some(2), Some(1)),
2107 (3, 1, Some(3), Some(3)),
2108 ],
2109 ),
2110 (
2111 vec![Some(1), Some(2), None, Some(3)],
2112 Some(SortOptions {
2113 descending: false,
2114 nulls_first: true,
2115 }),
2116 vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2117 ),
2118 (
2119 vec![Some(1), Some(2), None, Some(3)],
2120 Some(SortOptions {
2121 descending: false,
2122 nulls_first: false,
2123 }),
2124 vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2125 ),
2126 (
2127 vec![Some(2), Some(1), None, Some(3)],
2128 Some(SortOptions {
2129 descending: true,
2130 nulls_first: true,
2131 }),
2132 vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2133 ),
2134 (
2135 vec![],
2136 Some(SortOptions {
2137 descending: false,
2138 nulls_first: true,
2139 }),
2140 vec![(0, 0, None, None)],
2141 ),
2142 (
2143 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2144 Some(SortOptions {
2145 descending: true,
2146 nulls_first: true,
2147 }),
2148 vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2149 ),
2150 (
2151 vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2152 Some(SortOptions {
2153 descending: true,
2154 nulls_first: false,
2155 }),
2156 vec![
2157 (0, 2, None, None),
2158 (2, 3, Some(2), Some(1)),
2159 (5, 2, Some(5), Some(4)),
2160 ],
2161 ),
2162 ];
2163 for (input, sort_opts, expected) in testcases {
2164 let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2165 let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2166 assert_eq!(
2167 ret, expected,
2168 "input: {:?}, opt: {:?},expected: {:?}",
2169 input, sort_opts, expected
2170 );
2171 }
2172 }
2173
2174 #[test]
2175 fn test_cmp_with_opts() {
2176 let testcases = vec![
2177 (
2178 Some(1),
2179 Some(2),
2180 Some(SortOptions {
2181 descending: false,
2182 nulls_first: false,
2183 }),
2184 std::cmp::Ordering::Less,
2185 ),
2186 (
2187 Some(1),
2188 Some(2),
2189 Some(SortOptions {
2190 descending: true,
2191 nulls_first: false,
2192 }),
2193 std::cmp::Ordering::Greater,
2194 ),
2195 (
2196 Some(1),
2197 None,
2198 Some(SortOptions {
2199 descending: false,
2200 nulls_first: true,
2201 }),
2202 std::cmp::Ordering::Greater,
2203 ),
2204 (
2205 Some(1),
2206 None,
2207 Some(SortOptions {
2208 descending: true,
2209 nulls_first: true,
2210 }),
2211 std::cmp::Ordering::Greater,
2212 ),
2213 (
2214 Some(1),
2215 None,
2216 Some(SortOptions {
2217 descending: true,
2218 nulls_first: false,
2219 }),
2220 std::cmp::Ordering::Less,
2221 ),
2222 (
2223 Some(1),
2224 None,
2225 Some(SortOptions {
2226 descending: false,
2227 nulls_first: false,
2228 }),
2229 std::cmp::Ordering::Less,
2230 ),
2231 (
2232 None,
2233 None,
2234 Some(SortOptions {
2235 descending: true,
2236 nulls_first: true,
2237 }),
2238 std::cmp::Ordering::Equal,
2239 ),
2240 (
2241 None,
2242 None,
2243 Some(SortOptions {
2244 descending: false,
2245 nulls_first: true,
2246 }),
2247 std::cmp::Ordering::Equal,
2248 ),
2249 (
2250 None,
2251 None,
2252 Some(SortOptions {
2253 descending: true,
2254 nulls_first: false,
2255 }),
2256 std::cmp::Ordering::Equal,
2257 ),
2258 (
2259 None,
2260 None,
2261 Some(SortOptions {
2262 descending: false,
2263 nulls_first: false,
2264 }),
2265 std::cmp::Ordering::Equal,
2266 ),
2267 ];
2268 for (a, b, opts, expected) in testcases {
2269 assert_eq!(
2270 cmp_with_opts(&a, &b, &opts),
2271 expected,
2272 "a: {:?}, b: {:?}, opts: {:?}",
2273 a,
2274 b,
2275 opts
2276 );
2277 }
2278 }
2279
2280 #[test]
2281 fn test_find_slice_from_range() {
2282 let test_cases = vec![
2283 (
2285 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2286 false,
2287 TimeRange {
2288 start: Timestamp::new_millisecond(2),
2289 end: Timestamp::new_millisecond(4),
2290 },
2291 Ok((1, 2)),
2292 ),
2293 (
2294 Arc::new(TimestampMillisecondArray::from_iter_values([
2295 -2, -1, 0, 1, 2, 3, 4, 5,
2296 ])) as ArrayRef,
2297 false,
2298 TimeRange {
2299 start: Timestamp::new_millisecond(-1),
2300 end: Timestamp::new_millisecond(4),
2301 },
2302 Ok((1, 5)),
2303 ),
2304 (
2305 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2306 false,
2307 TimeRange {
2308 start: Timestamp::new_millisecond(2),
2309 end: Timestamp::new_millisecond(5),
2310 },
2311 Ok((1, 2)),
2312 ),
2313 (
2314 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2315 false,
2316 TimeRange {
2317 start: Timestamp::new_millisecond(2),
2318 end: Timestamp::new_millisecond(5),
2319 },
2320 Ok((1, 3)),
2321 ),
2322 (
2323 Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2324 false,
2325 TimeRange {
2326 start: Timestamp::new_millisecond(2),
2327 end: Timestamp::new_millisecond(5),
2328 },
2329 Ok((1, 2)),
2330 ),
2331 (
2332 Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2333 false,
2334 TimeRange {
2335 start: Timestamp::new_millisecond(6),
2336 end: Timestamp::new_millisecond(7),
2337 },
2338 Ok((5, 0)),
2339 ),
2340 (
2342 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2343 true,
2344 TimeRange {
2345 end: Timestamp::new_millisecond(4),
2346 start: Timestamp::new_millisecond(1),
2347 },
2348 Ok((1, 3)),
2349 ),
2350 (
2351 Arc::new(TimestampMillisecondArray::from_iter_values([
2352 5, 4, 3, 2, 1, 0,
2353 ])) as ArrayRef,
2354 true,
2355 TimeRange {
2356 end: Timestamp::new_millisecond(4),
2357 start: Timestamp::new_millisecond(1),
2358 },
2359 Ok((2, 3)),
2360 ),
2361 (
2362 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2363 true,
2364 TimeRange {
2365 end: Timestamp::new_millisecond(4),
2366 start: Timestamp::new_millisecond(1),
2367 },
2368 Ok((1, 2)),
2369 ),
2370 (
2371 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2372 true,
2373 TimeRange {
2374 end: Timestamp::new_millisecond(4),
2375 start: Timestamp::new_millisecond(1),
2376 },
2377 Ok((2, 2)),
2378 ),
2379 (
2380 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2381 true,
2382 TimeRange {
2383 end: Timestamp::new_millisecond(5),
2384 start: Timestamp::new_millisecond(2),
2385 },
2386 Ok((1, 3)),
2387 ),
2388 (
2389 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2390 true,
2391 TimeRange {
2392 end: Timestamp::new_millisecond(5),
2393 start: Timestamp::new_millisecond(2),
2394 },
2395 Ok((1, 2)),
2396 ),
2397 (
2398 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2399 true,
2400 TimeRange {
2401 end: Timestamp::new_millisecond(5),
2402 start: Timestamp::new_millisecond(2),
2403 },
2404 Ok((1, 3)),
2405 ),
2406 (
2407 Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2408 true,
2409 TimeRange {
2410 end: Timestamp::new_millisecond(5),
2411 start: Timestamp::new_millisecond(2),
2412 },
2413 Ok((1, 2)),
2414 ),
2415 (
2416 Arc::new(TimestampMillisecondArray::from_iter_values([
2417 10, 9, 8, 7, 6,
2418 ])) as ArrayRef,
2419 true,
2420 TimeRange {
2421 end: Timestamp::new_millisecond(5),
2422 start: Timestamp::new_millisecond(2),
2423 },
2424 Ok((5, 0)),
2425 ),
2426 (
2428 Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2429 true,
2430 TimeRange {
2431 end: Timestamp::new_millisecond(4),
2432 start: Timestamp::new_millisecond(3),
2433 },
2434 Ok((0, 1)),
2435 ),
2436 (
2437 Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2438 true,
2439 TimeRange {
2440 end: Timestamp::new_millisecond(4),
2441 start: Timestamp::new_millisecond(3),
2442 },
2443 Ok((1, 1)),
2444 ),
2445 (
2446 Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2447 true,
2448 TimeRange {
2449 end: Timestamp::new_millisecond(4),
2450 start: Timestamp::new_millisecond(3),
2451 },
2452 Ok((2, 1)),
2453 ),
2454 ];
2455
2456 for (sort_vals, descending, range, expected) in test_cases {
2457 let sort_column = SortColumn {
2458 values: sort_vals,
2459 options: Some(SortOptions {
2460 descending,
2461 ..Default::default()
2462 }),
2463 };
2464 let ret = find_slice_from_range(&sort_column, &range);
2465 match (ret, expected) {
2466 (Ok(ret), Ok(expected)) => {
2467 assert_eq!(
2468 ret, expected,
2469 "sort_vals: {:?}, range: {:?}",
2470 sort_column, range
2471 )
2472 }
2473 (Err(err), Err(expected)) => {
2474 let expected: &str = expected;
2475 assert!(
2476 err.to_string().contains(expected),
2477 "err: {:?}, expected: {:?}",
2478 err,
2479 expected
2480 );
2481 }
2482 (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2483 }
2484 }
2485 }
2486
2487 #[derive(Debug)]
2488 struct TestStream {
2489 expression: PhysicalSortExpr,
2490 fetch: Option<usize>,
2491 input: Vec<(PartitionRange, DfRecordBatch)>,
2492 output: Vec<DfRecordBatch>,
2493 schema: SchemaRef,
2494 }
2495 use datafusion::physical_plan::expressions::Column;
2496 impl TestStream {
2497 fn new(
2498 ts_col: Column,
2499 opt: SortOptions,
2500 fetch: Option<usize>,
2501 schema: impl Into<arrow_schema::Fields>,
2502 input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2503 expected: Vec<Vec<ArrayRef>>,
2504 ) -> Self {
2505 let expression = PhysicalSortExpr {
2506 expr: Arc::new(ts_col),
2507 options: opt,
2508 };
2509 let schema = Schema::new(schema.into());
2510 let schema = Arc::new(schema);
2511 let input = input
2512 .into_iter()
2513 .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2514 .collect_vec();
2515 let output_batchs = expected
2516 .into_iter()
2517 .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2518 .collect_vec();
2519 Self {
2520 expression,
2521 fetch,
2522 input,
2523 output: output_batchs,
2524 schema,
2525 }
2526 }
2527
2528 async fn run_test(&self) -> Vec<DfRecordBatch> {
2529 let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2530
2531 let mock_input = MockInputExec::new(batches, self.schema.clone());
2532
2533 let exec = WindowedSortExec::try_new(
2534 self.expression.clone(),
2535 self.fetch,
2536 vec![ranges],
2537 Arc::new(mock_input),
2538 )
2539 .unwrap();
2540
2541 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2542
2543 let real_output = exec_stream.collect::<Vec<_>>().await;
2544 let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2545 real_output
2546 }
2547 }
2548
2549 #[tokio::test]
2550 async fn test_window_sort_stream() {
2551 let test_cases = [
2552 TestStream::new(
2553 Column::new("ts", 0),
2554 SortOptions {
2555 descending: false,
2556 nulls_first: true,
2557 },
2558 None,
2559 vec![Field::new(
2560 "ts",
2561 DataType::Timestamp(TimeUnit::Millisecond, None),
2562 false,
2563 )],
2564 vec![],
2565 vec![],
2566 ),
2567 TestStream::new(
2568 Column::new("ts", 0),
2569 SortOptions {
2570 descending: false,
2571 nulls_first: true,
2572 },
2573 None,
2574 vec![Field::new(
2575 "ts",
2576 DataType::Timestamp(TimeUnit::Millisecond, None),
2577 false,
2578 )],
2579 vec![
2580 (
2582 PartitionRange {
2583 start: Timestamp::new_millisecond(1),
2584 end: Timestamp::new_millisecond(2),
2585 num_rows: 1,
2586 identifier: 0,
2587 },
2588 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2589 ),
2590 (
2591 PartitionRange {
2592 start: Timestamp::new_millisecond(1),
2593 end: Timestamp::new_millisecond(3),
2594 num_rows: 1,
2595 identifier: 0,
2596 },
2597 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2598 ),
2599 ],
2600 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2601 [2],
2602 ))]],
2603 ),
2604 TestStream::new(
2605 Column::new("ts", 0),
2606 SortOptions {
2607 descending: false,
2608 nulls_first: true,
2609 },
2610 None,
2611 vec![Field::new(
2612 "ts",
2613 DataType::Timestamp(TimeUnit::Millisecond, None),
2614 false,
2615 )],
2616 vec![
2617 (
2619 PartitionRange {
2620 start: Timestamp::new_millisecond(1),
2621 end: Timestamp::new_millisecond(2),
2622 num_rows: 1,
2623 identifier: 0,
2624 },
2625 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2626 ),
2627 (
2628 PartitionRange {
2629 start: Timestamp::new_millisecond(1),
2630 end: Timestamp::new_millisecond(3),
2631 num_rows: 1,
2632 identifier: 0,
2633 },
2634 vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2635 ),
2636 ],
2637 vec![],
2638 ),
2639 TestStream::new(
2640 Column::new("ts", 0),
2641 SortOptions {
2642 descending: false,
2643 nulls_first: true,
2644 },
2645 None,
2646 vec![Field::new(
2647 "ts",
2648 DataType::Timestamp(TimeUnit::Millisecond, None),
2649 false,
2650 )],
2651 vec![
2652 (
2655 PartitionRange {
2656 start: Timestamp::new_millisecond(1),
2657 end: Timestamp::new_millisecond(2),
2658 num_rows: 1,
2659 identifier: 0,
2660 },
2661 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2662 ),
2663 (
2664 PartitionRange {
2665 start: Timestamp::new_millisecond(1),
2666 end: Timestamp::new_millisecond(3),
2667 num_rows: 1,
2668 identifier: 0,
2669 },
2670 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2671 ),
2672 ],
2673 vec![
2674 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2675 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2676 ],
2677 ),
2678 TestStream::new(
2679 Column::new("ts", 0),
2680 SortOptions {
2681 descending: false,
2682 nulls_first: true,
2683 },
2684 None,
2685 vec![Field::new(
2686 "ts",
2687 DataType::Timestamp(TimeUnit::Millisecond, None),
2688 false,
2689 )],
2690 vec![
2691 (
2693 PartitionRange {
2694 start: Timestamp::new_millisecond(1),
2695 end: Timestamp::new_millisecond(3),
2696 num_rows: 1,
2697 identifier: 0,
2698 },
2699 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2700 1, 2,
2701 ]))],
2702 ),
2703 (
2704 PartitionRange {
2705 start: Timestamp::new_millisecond(1),
2706 end: Timestamp::new_millisecond(4),
2707 num_rows: 1,
2708 identifier: 0,
2709 },
2710 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2711 2, 3,
2712 ]))],
2713 ),
2714 ],
2715 vec![
2716 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2717 1, 2,
2718 ]))],
2719 vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2721 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2722 ],
2723 ),
2724 TestStream::new(
2725 Column::new("ts", 0),
2726 SortOptions {
2727 descending: false,
2728 nulls_first: true,
2729 },
2730 None,
2731 vec![Field::new(
2732 "ts",
2733 DataType::Timestamp(TimeUnit::Millisecond, None),
2734 false,
2735 )],
2736 vec![
2737 (
2739 PartitionRange {
2740 start: Timestamp::new_millisecond(1),
2741 end: Timestamp::new_millisecond(3),
2742 num_rows: 1,
2743 identifier: 0,
2744 },
2745 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2746 1, 2,
2747 ]))],
2748 ),
2749 (
2750 PartitionRange {
2751 start: Timestamp::new_millisecond(1),
2752 end: Timestamp::new_millisecond(4),
2753 num_rows: 1,
2754 identifier: 1,
2755 },
2756 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2757 1, 2, 3,
2758 ]))],
2759 ),
2760 ],
2761 vec![
2762 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2763 1, 1, 2, 2,
2764 ]))],
2765 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2766 ],
2767 ),
2768 TestStream::new(
2769 Column::new("ts", 0),
2770 SortOptions {
2771 descending: false,
2772 nulls_first: true,
2773 },
2774 None,
2775 vec![Field::new(
2776 "ts",
2777 DataType::Timestamp(TimeUnit::Millisecond, None),
2778 false,
2779 )],
2780 vec![
2781 (
2783 PartitionRange {
2784 start: Timestamp::new_millisecond(1),
2785 end: Timestamp::new_millisecond(3),
2786 num_rows: 1,
2787 identifier: 0,
2788 },
2789 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2790 1, 2,
2791 ]))],
2792 ),
2793 (
2794 PartitionRange {
2795 start: Timestamp::new_millisecond(1),
2796 end: Timestamp::new_millisecond(4),
2797 num_rows: 1,
2798 identifier: 1,
2799 },
2800 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2801 1, 2, 3,
2802 ]))],
2803 ),
2804 (
2805 PartitionRange {
2806 start: Timestamp::new_millisecond(4),
2807 end: Timestamp::new_millisecond(6),
2808 num_rows: 1,
2809 identifier: 1,
2810 },
2811 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2812 4, 5,
2813 ]))],
2814 ),
2815 ],
2816 vec![
2817 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2818 1, 1, 2, 2,
2819 ]))],
2820 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2821 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2822 4, 5,
2823 ]))],
2824 ],
2825 ),
2826 TestStream::new(
2828 Column::new("ts", 0),
2829 SortOptions {
2830 descending: false,
2831 nulls_first: true,
2832 },
2833 Some(6),
2834 vec![Field::new(
2835 "ts",
2836 DataType::Timestamp(TimeUnit::Millisecond, None),
2837 false,
2838 )],
2839 vec![
2840 (
2842 PartitionRange {
2843 start: Timestamp::new_millisecond(1),
2844 end: Timestamp::new_millisecond(3),
2845 num_rows: 1,
2846 identifier: 0,
2847 },
2848 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2849 1, 2,
2850 ]))],
2851 ),
2852 (
2853 PartitionRange {
2854 start: Timestamp::new_millisecond(1),
2855 end: Timestamp::new_millisecond(4),
2856 num_rows: 1,
2857 identifier: 1,
2858 },
2859 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2860 1, 2, 3,
2861 ]))],
2862 ),
2863 (
2864 PartitionRange {
2865 start: Timestamp::new_millisecond(3),
2866 end: Timestamp::new_millisecond(6),
2867 num_rows: 1,
2868 identifier: 1,
2869 },
2870 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2871 4, 5,
2872 ]))],
2873 ),
2874 ],
2875 vec![
2876 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2877 1, 1, 2, 2,
2878 ]))],
2879 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2880 vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2881 ],
2882 ),
2883 TestStream::new(
2884 Column::new("ts", 0),
2885 SortOptions {
2886 descending: false,
2887 nulls_first: true,
2888 },
2889 Some(3),
2890 vec![Field::new(
2891 "ts",
2892 DataType::Timestamp(TimeUnit::Millisecond, None),
2893 false,
2894 )],
2895 vec![
2896 (
2898 PartitionRange {
2899 start: Timestamp::new_millisecond(1),
2900 end: Timestamp::new_millisecond(3),
2901 num_rows: 1,
2902 identifier: 0,
2903 },
2904 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2905 1, 2,
2906 ]))],
2907 ),
2908 (
2909 PartitionRange {
2910 start: Timestamp::new_millisecond(1),
2911 end: Timestamp::new_millisecond(4),
2912 num_rows: 1,
2913 identifier: 1,
2914 },
2915 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2916 1, 2, 3,
2917 ]))],
2918 ),
2919 (
2920 PartitionRange {
2921 start: Timestamp::new_millisecond(3),
2922 end: Timestamp::new_millisecond(6),
2923 num_rows: 1,
2924 identifier: 1,
2925 },
2926 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2927 4, 5,
2928 ]))],
2929 ),
2930 ],
2931 vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2932 [1, 1, 2],
2933 ))]],
2934 ),
2935 TestStream::new(
2937 Column::new("ts", 0),
2938 SortOptions {
2939 descending: true,
2940 nulls_first: true,
2941 },
2942 None,
2943 vec![Field::new(
2944 "ts",
2945 DataType::Timestamp(TimeUnit::Millisecond, None),
2946 false,
2947 )],
2948 vec![
2949 (
2951 PartitionRange {
2952 start: Timestamp::new_millisecond(3),
2953 end: Timestamp::new_millisecond(6),
2954 num_rows: 1,
2955 identifier: 1,
2956 },
2957 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2958 5, 4,
2959 ]))],
2960 ),
2961 (
2962 PartitionRange {
2963 start: Timestamp::new_millisecond(1),
2964 end: Timestamp::new_millisecond(4),
2965 num_rows: 1,
2966 identifier: 1,
2967 },
2968 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2969 3, 2, 1,
2970 ]))],
2971 ),
2972 (
2973 PartitionRange {
2974 start: Timestamp::new_millisecond(1),
2975 end: Timestamp::new_millisecond(3),
2976 num_rows: 1,
2977 identifier: 0,
2978 },
2979 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2980 2, 1,
2981 ]))],
2982 ),
2983 ],
2984 vec![
2985 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2986 5, 4,
2987 ]))],
2988 vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2989 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2990 2, 2, 1, 1,
2991 ]))],
2992 ],
2993 ),
2994 TestStream::new(
2995 Column::new("ts", 0),
2996 SortOptions {
2997 descending: false,
2998 nulls_first: true,
2999 },
3000 None,
3001 vec![Field::new(
3002 "ts",
3003 DataType::Timestamp(TimeUnit::Millisecond, None),
3004 false,
3005 )],
3006 vec![
3007 (
3009 PartitionRange {
3010 start: Timestamp::new_millisecond(1),
3011 end: Timestamp::new_millisecond(10),
3012 num_rows: 1,
3013 identifier: 0,
3014 },
3015 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3016 1, 5, 9,
3017 ]))],
3018 ),
3019 (
3020 PartitionRange {
3021 start: Timestamp::new_millisecond(3),
3022 end: Timestamp::new_millisecond(7),
3023 num_rows: 1,
3024 identifier: 1,
3025 },
3026 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3027 3, 4, 5, 6,
3028 ]))],
3029 ),
3030 ],
3031 vec![
3032 vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3033 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3034 3, 4, 5, 5, 6, 9,
3035 ]))],
3036 ],
3037 ),
3038 TestStream::new(
3039 Column::new("ts", 0),
3040 SortOptions {
3041 descending: false,
3042 nulls_first: true,
3043 },
3044 None,
3045 vec![Field::new(
3046 "ts",
3047 DataType::Timestamp(TimeUnit::Millisecond, None),
3048 false,
3049 )],
3050 vec![
3051 (
3053 PartitionRange {
3054 start: Timestamp::new_millisecond(1),
3055 end: Timestamp::new_millisecond(3),
3056 num_rows: 1,
3057 identifier: 0,
3058 },
3059 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3060 1, 2,
3061 ]))],
3062 ),
3063 (
3064 PartitionRange {
3065 start: Timestamp::new_millisecond(1),
3066 end: Timestamp::new_millisecond(10),
3067 num_rows: 1,
3068 identifier: 1,
3069 },
3070 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3071 1, 3, 4, 5, 6, 8,
3072 ]))],
3073 ),
3074 (
3075 PartitionRange {
3076 start: Timestamp::new_millisecond(7),
3077 end: Timestamp::new_millisecond(10),
3078 num_rows: 1,
3079 identifier: 1,
3080 },
3081 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3082 7, 8, 9,
3083 ]))],
3084 ),
3085 ],
3086 vec![
3087 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3088 1, 1, 2,
3089 ]))],
3090 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3091 3, 4, 5, 6,
3092 ]))],
3093 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3094 7, 8, 8, 9,
3095 ]))],
3096 ],
3097 ),
3098 TestStream::new(
3099 Column::new("ts", 0),
3100 SortOptions {
3101 descending: false,
3102 nulls_first: true,
3103 },
3104 None,
3105 vec![Field::new(
3106 "ts",
3107 DataType::Timestamp(TimeUnit::Millisecond, None),
3108 false,
3109 )],
3110 vec![
3111 (
3113 PartitionRange {
3114 start: Timestamp::new_millisecond(1),
3115 end: Timestamp::new_millisecond(11),
3116 num_rows: 1,
3117 identifier: 0,
3118 },
3119 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3120 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3121 ]))],
3122 ),
3123 (
3124 PartitionRange {
3125 start: Timestamp::new_millisecond(5),
3126 end: Timestamp::new_millisecond(7),
3127 num_rows: 1,
3128 identifier: 1,
3129 },
3130 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3131 5, 6,
3132 ]))],
3133 ),
3134 ],
3135 vec![
3136 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3137 1, 2, 3, 4,
3138 ]))],
3139 vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3140 5, 5, 6, 6, 7, 8, 9, 10,
3141 ]))],
3142 ],
3143 ),
3144 ];
3145
3146 let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3147
3148 for (idx, testcase) in &indexed_test_cases {
3149 let output = testcase.run_test().await;
3150 assert_eq!(output, testcase.output, "case {idx} failed.");
3151 }
3152 }
3153
3154 #[tokio::test]
3155 async fn fuzzy_ish_test_window_sort_stream() {
3156 let test_cnt = 100;
3157 let part_cnt_bound = 100;
3158 let range_size_bound = 100;
3159 let range_offset_bound = 100;
3160 let in_range_datapoint_cnt_bound = 100;
3161 let fetch_bound = 100;
3162
3163 let mut rng = fastrand::Rng::new();
3164 let rng_seed = rng.u64(..);
3165 rng.seed(rng_seed);
3166 let mut bound_val = None;
3167 type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3169 let mut full_testcase_list = Vec::new();
3170 for _case_id in 0..test_cnt {
3171 let descending = rng.bool();
3172 fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3173 if descending {
3174 return Box::new(|a: &T, b: &T| b.cmp(a));
3175 }
3176 Box::new(|a: &T, b: &T| a.cmp(b))
3177 }
3178 let unit = match rng.u8(0..3) {
3179 0 => TimeUnit::Second,
3180 1 => TimeUnit::Millisecond,
3181 2 => TimeUnit::Microsecond,
3182 _ => TimeUnit::Nanosecond,
3183 };
3184 let fetch = if rng.bool() {
3185 Some(rng.usize(0..fetch_bound))
3186 } else {
3187 None
3188 };
3189
3190 let mut input_ranged_data = vec![];
3191 let mut output_data: Vec<i64> = vec![];
3192 for part_id in 0..rng.usize(0..part_cnt_bound) {
3194 let (start, end) = if descending {
3195 let end = bound_val
3196 .map(|i| i - rng.i64(0..range_offset_bound))
3197 .unwrap_or_else(|| rng.i64(..));
3198 bound_val = Some(end);
3199 let start = end - rng.i64(1..range_size_bound);
3200 let start = Timestamp::new(start, unit.into());
3201 let end = Timestamp::new(end, unit.into());
3202 (start, end)
3203 } else {
3204 let start = bound_val
3205 .map(|i| i + rng.i64(0..range_offset_bound))
3206 .unwrap_or_else(|| rng.i64(..));
3207 bound_val = Some(start);
3208 let end = start + rng.i64(1..range_size_bound);
3209 let start = Timestamp::new(start, unit.into());
3210 let end = Timestamp::new(end, unit.into());
3211 (start, end)
3212 };
3213
3214 let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3215 let data_gen = iter
3216 .map(|_| rng.i64(start.value()..end.value()))
3217 .sorted_by(ret_cmp_fn(descending))
3218 .collect_vec();
3219 output_data.extend(data_gen.clone());
3220 let arr = new_ts_array(unit, data_gen);
3221 let range = PartitionRange {
3222 start,
3223 end,
3224 num_rows: arr.len(),
3225 identifier: part_id,
3226 };
3227 input_ranged_data.push((range, vec![arr]));
3228 }
3229
3230 output_data.sort_by(ret_cmp_fn(descending));
3231 if let Some(fetch) = fetch {
3232 output_data.truncate(fetch);
3233 }
3234 let output_arr = new_ts_array(unit, output_data);
3235
3236 let test_stream = TestStream::new(
3237 Column::new("ts", 0),
3238 SortOptions {
3239 descending,
3240 nulls_first: true,
3241 },
3242 fetch,
3243 vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3244 input_ranged_data.clone(),
3245 vec![vec![output_arr]],
3246 );
3247 full_testcase_list.push(test_stream);
3248 }
3249
3250 for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3251 let res = test_stream.run_test().await;
3252 let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3253 let expected = test_stream.output;
3254 let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3255
3256 if res_concat != expected_concat {
3257 {
3258 let mut f_input = std::io::stderr();
3259 f_input.write_all(b"[").unwrap();
3260 for (input_range, input_arr) in test_stream.input {
3261 let range_json = json!({
3262 "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3263 "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3264 "num_rows": input_range.num_rows,
3265 "identifier": input_range.identifier,
3266 });
3267 let buf = Vec::new();
3268 let mut input_writer = ArrayWriter::new(buf);
3269 input_writer.write(&input_arr).unwrap();
3270 input_writer.finish().unwrap();
3271 let res_str =
3272 String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3273 let whole_json =
3274 format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3275 f_input.write_all(whole_json.as_bytes()).unwrap();
3276 }
3277 f_input.write_all(b"]").unwrap();
3278 }
3279 {
3280 let mut f_res = std::io::stderr();
3281 f_res.write_all(b"[").unwrap();
3282 for batch in &res {
3283 let mut res_writer = ArrayWriter::new(f_res);
3284 res_writer.write(batch).unwrap();
3285 res_writer.finish().unwrap();
3286 f_res = res_writer.into_inner();
3287 f_res.write_all(b",").unwrap();
3288 }
3289 f_res.write_all(b"]").unwrap();
3290
3291 let f_res_concat = std::io::stderr();
3292 let mut res_writer = ArrayWriter::new(f_res_concat);
3293 res_writer.write(&res_concat).unwrap();
3294 res_writer.finish().unwrap();
3295
3296 let f_expected = std::io::stderr();
3297 let mut expected_writer = ArrayWriter::new(f_expected);
3298 expected_writer.write(&expected_concat).unwrap();
3299 expected_writer.finish().unwrap();
3300 }
3301 panic!(
3302 "case failed, case id: {0}, output and expected output to stderr",
3303 case_id
3304 );
3305 }
3306 assert_eq!(
3307 res_concat, expected_concat,
3308 "case failed, case id: {}, rng seed: {}",
3309 case_id, rng_seed
3310 );
3311 }
3312 }
3313}