1use std::any::Any;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25
26use arrow::array::{
27 ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray,
28 TimestampNanosecondArray, TimestampSecondArray,
29};
30use arrow::compute::{concat, concat_batches, take_record_batch};
31use arrow_schema::{Schema, SchemaRef};
32use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
33use common_telemetry::warn;
34use common_time::Timestamp;
35use common_time::timestamp::TimeUnit;
36use datafusion::common::arrow::compute::sort_to_indices;
37use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
38use datafusion::execution::{RecordBatchStream, TaskContext};
39use datafusion::physical_plan::execution_plan::CardinalityEffect;
40use datafusion::physical_plan::filter_pushdown::{
41 ChildFilterDescription, FilterDescription, FilterPushdownPhase,
42};
43use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
44use datafusion::physical_plan::{
45 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK,
46 TopKDynamicFilters,
47};
48use datafusion_common::tree_node::{Transformed, TreeNode};
49use datafusion_common::{DataFusionError, internal_err};
50use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
51use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
52use futures::{Stream, StreamExt};
53use itertools::Itertools;
54use parking_lot::RwLock;
55use snafu::location;
56use store_api::region_engine::PartitionRange;
57
58use crate::error::Result;
59use crate::window_sort::check_partition_range_monotonicity;
60use crate::{array_iter_helper, downcast_ts_array};
61
62fn get_primary_end(range: &PartitionRange, descending: bool) -> Timestamp {
67 if descending { range.end } else { range.start }
68}
69
70fn group_ranges_by_primary_end(
76 ranges: &[PartitionRange],
77 descending: bool,
78) -> Vec<(Timestamp, usize, usize)> {
79 if ranges.is_empty() {
80 return vec![];
81 }
82
83 let mut groups = Vec::new();
84 let mut group_start = 0;
85 let mut current_primary_end = get_primary_end(&ranges[0], descending);
86
87 for (idx, range) in ranges.iter().enumerate().skip(1) {
88 let primary_end = get_primary_end(range, descending);
89 if primary_end != current_primary_end {
90 groups.push((current_primary_end, group_start, idx));
92 group_start = idx;
94 current_primary_end = primary_end;
95 }
96 }
97 groups.push((current_primary_end, group_start, ranges.len()));
99
100 groups
101}
102
103#[derive(Debug, Clone)]
109pub struct PartSortExec {
110 expression: PhysicalSortExpr,
112 limit: Option<usize>,
113 input: Arc<dyn ExecutionPlan>,
114 metrics: ExecutionPlanMetricsSet,
116 partition_ranges: Vec<Vec<PartitionRange>>,
117 properties: PlanProperties,
118 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
122}
123
124impl PartSortExec {
125 pub fn try_new(
126 expression: PhysicalSortExpr,
127 limit: Option<usize>,
128 partition_ranges: Vec<Vec<PartitionRange>>,
129 input: Arc<dyn ExecutionPlan>,
130 ) -> Result<Self> {
131 check_partition_range_monotonicity(&partition_ranges, expression.options.descending)?;
132
133 let metrics = ExecutionPlanMetricsSet::new();
134 let properties = input.properties();
135 let properties = PlanProperties::new(
136 input.equivalence_properties().clone(),
137 input.output_partitioning().clone(),
138 properties.emission_type,
139 properties.boundedness,
140 );
141
142 let filter = limit
143 .is_some()
144 .then(|| Self::create_filter(expression.expr.clone()));
145
146 Ok(Self {
147 expression,
148 limit,
149 input,
150 metrics,
151 partition_ranges,
152 properties,
153 filter,
154 })
155 }
156
157 fn create_filter(expr: Arc<dyn PhysicalExpr>) -> Arc<RwLock<TopKDynamicFilters>> {
159 Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
160 DynamicFilterPhysicalExpr::new(vec![expr], lit(true)),
161 ))))
162 }
163
164 pub fn to_stream(
165 &self,
166 context: Arc<TaskContext>,
167 partition: usize,
168 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
169 let input_stream: DfSendableRecordBatchStream =
170 self.input.execute(partition, context.clone())?;
171
172 if partition >= self.partition_ranges.len() {
173 internal_err!(
174 "Partition index out of range: {} >= {} at {}",
175 partition,
176 self.partition_ranges.len(),
177 snafu::location!()
178 )?;
179 }
180
181 let df_stream = Box::pin(PartSortStream::new(
182 context,
183 self,
184 self.limit,
185 input_stream,
186 self.partition_ranges[partition].clone(),
187 partition,
188 self.filter.clone(),
189 )?) as _;
190
191 Ok(df_stream)
192 }
193}
194
195impl DisplayAs for PartSortExec {
196 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197 write!(
198 f,
199 "PartSortExec: expr={} num_ranges={}",
200 self.expression,
201 self.partition_ranges.len(),
202 )?;
203 if let Some(limit) = self.limit {
204 write!(f, " limit={}", limit)?;
205 }
206 Ok(())
207 }
208}
209
210impl ExecutionPlan for PartSortExec {
211 fn name(&self) -> &str {
212 "PartSortExec"
213 }
214
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn schema(&self) -> SchemaRef {
220 self.input.schema()
221 }
222
223 fn properties(&self) -> &PlanProperties {
224 &self.properties
225 }
226
227 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228 vec![&self.input]
229 }
230
231 fn with_new_children(
232 self: Arc<Self>,
233 children: Vec<Arc<dyn ExecutionPlan>>,
234 ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
235 let new_input = if let Some(first) = children.first() {
236 first
237 } else {
238 internal_err!("No children found")?
239 };
240 let new = Self::try_new(
241 self.expression.clone(),
242 self.limit,
243 self.partition_ranges.clone(),
244 new_input.clone(),
245 )?;
246 Ok(Arc::new(new))
247 }
248
249 fn execute(
250 &self,
251 partition: usize,
252 context: Arc<TaskContext>,
253 ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
254 self.to_stream(context, partition)
255 }
256
257 fn metrics(&self) -> Option<MetricsSet> {
258 Some(self.metrics.clone_inner())
259 }
260
261 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
267 vec![false]
268 }
269
270 fn cardinality_effect(&self) -> CardinalityEffect {
271 if self.limit.is_none() {
272 CardinalityEffect::Equal
273 } else {
274 CardinalityEffect::LowerEqual
275 }
276 }
277
278 fn gather_filters_for_pushdown(
279 &self,
280 phase: FilterPushdownPhase,
281 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
282 _config: &datafusion::config::ConfigOptions,
283 ) -> datafusion_common::Result<FilterDescription> {
284 if !matches!(phase, FilterPushdownPhase::Post) {
285 return FilterDescription::from_children(parent_filters, &self.children());
286 }
287
288 let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?;
289
290 if let Some(filter) = &self.filter {
291 child = child.with_self_filter(filter.read().expr());
292 }
293
294 Ok(FilterDescription::new().with_child(child))
295 }
296
297 fn reset_state(self: Arc<Self>) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
298 let new_filter = self
300 .limit
301 .is_some()
302 .then(|| Self::create_filter(self.expression.expr.clone()));
303
304 Ok(Arc::new(Self {
305 expression: self.expression.clone(),
306 limit: self.limit,
307 input: self.input.clone(),
308 metrics: self.metrics.clone(),
309 partition_ranges: self.partition_ranges.clone(),
310 properties: self.properties.clone(),
311 filter: new_filter,
312 }))
313 }
314}
315
316enum PartSortBuffer {
317 All(Vec<DfRecordBatch>),
318 Top(TopK, usize),
323}
324
325impl PartSortBuffer {
326 pub fn is_empty(&self) -> bool {
327 match self {
328 PartSortBuffer::All(v) => v.is_empty(),
329 PartSortBuffer::Top(_, cnt) => *cnt == 0,
330 }
331 }
332}
333
334struct PartSortStream {
335 reservation: MemoryReservation,
337 buffer: PartSortBuffer,
338 expression: PhysicalSortExpr,
339 limit: Option<usize>,
340 input: DfSendableRecordBatchStream,
341 input_complete: bool,
342 schema: SchemaRef,
343 partition_ranges: Vec<PartitionRange>,
344 #[allow(dead_code)] partition: usize,
346 cur_part_idx: usize,
347 evaluating_batch: Option<DfRecordBatch>,
348 metrics: BaselineMetrics,
349 context: Arc<TaskContext>,
350 root_metrics: ExecutionPlanMetricsSet,
351 range_groups: Vec<(Timestamp, usize, usize)>,
354 cur_group_idx: usize,
356 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
359}
360
361impl PartSortStream {
362 fn new(
363 context: Arc<TaskContext>,
364 sort: &PartSortExec,
365 limit: Option<usize>,
366 input: DfSendableRecordBatchStream,
367 partition_ranges: Vec<PartitionRange>,
368 partition: usize,
369 filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
370 ) -> datafusion_common::Result<Self> {
371 let buffer = if let Some(limit) = limit {
372 let Some(filter) = filter.clone() else {
373 return internal_err!(
374 "TopKDynamicFilters must be provided when limit is set at {}",
375 snafu::location!()
376 );
377 };
378
379 PartSortBuffer::Top(
380 TopK::try_new(
381 partition,
382 sort.schema().clone(),
383 vec![],
384 [sort.expression.clone()].into(),
385 limit,
386 context.session_config().batch_size(),
387 context.runtime_env(),
388 &sort.metrics,
389 filter.clone(),
390 )?,
391 0,
392 )
393 } else {
394 PartSortBuffer::All(Vec::new())
395 };
396
397 let descending = sort.expression.options.descending;
399 let range_groups = group_ranges_by_primary_end(&partition_ranges, descending);
400
401 Ok(Self {
402 reservation: MemoryConsumer::new("PartSortStream".to_string())
403 .register(&context.runtime_env().memory_pool),
404 buffer,
405 expression: sort.expression.clone(),
406 limit,
407 input,
408 input_complete: false,
409 schema: sort.input.schema(),
410 partition_ranges,
411 partition,
412 cur_part_idx: 0,
413 evaluating_batch: None,
414 metrics: BaselineMetrics::new(&sort.metrics, partition),
415 context,
416 root_metrics: sort.metrics.clone(),
417 range_groups,
418 cur_group_idx: 0,
419 filter,
420 })
421 }
422}
423
424macro_rules! array_check_helper {
425 ($t:ty, $unit:expr, $arr:expr, $cur_range:expr, $min_max_idx:expr) => {{
426 if $cur_range.start.unit().as_arrow_time_unit() != $unit
427 || $cur_range.end.unit().as_arrow_time_unit() != $unit
428 {
429 internal_err!(
430 "PartitionRange unit mismatch, expect {:?}, found {:?}",
431 $cur_range.start.unit(),
432 $unit
433 )?;
434 }
435 let arr = $arr
436 .as_any()
437 .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
438 .unwrap();
439
440 let min = arr.value($min_max_idx.0);
441 let max = arr.value($min_max_idx.1);
442 let (min, max) = if min < max{
443 (min, max)
444 } else {
445 (max, min)
446 };
447 let cur_min = $cur_range.start.value();
448 let cur_max = $cur_range.end.value();
449 if !(min >= cur_min && max < cur_max) {
451 internal_err!(
452 "Sort column min/max value out of partition range: sort_column.min_max=[{:?}, {:?}] not in PartitionRange=[{:?}, {:?}]",
453 min,
454 max,
455 cur_min,
456 cur_max
457 )?;
458 }
459 }};
460}
461
462impl PartSortStream {
463 fn check_in_range(
467 &self,
468 sort_column: &ArrayRef,
469 min_max_idx: (usize, usize),
470 ) -> datafusion_common::Result<()> {
471 let Some(cur_range) = self.get_current_group_effective_range() else {
473 internal_err!(
474 "No effective range for current group {} at {}",
475 self.cur_group_idx,
476 snafu::location!()
477 )?
478 };
479
480 downcast_ts_array!(
481 sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx),
482 _ => internal_err!(
483 "Unsupported data type for sort column: {:?}",
484 sort_column.data_type()
485 )?,
486 );
487
488 Ok(())
489 }
490
491 fn try_find_next_range(
496 &self,
497 sort_column: &ArrayRef,
498 ) -> datafusion_common::Result<Option<usize>> {
499 if sort_column.is_empty() {
500 return Ok(None);
501 }
502
503 if self.cur_part_idx >= self.partition_ranges.len() {
505 internal_err!(
506 "Partition index out of range: {} >= {} at {}",
507 self.cur_part_idx,
508 self.partition_ranges.len(),
509 snafu::location!()
510 )?;
511 }
512 let cur_range = self.partition_ranges[self.cur_part_idx];
513
514 let sort_column_iter = downcast_ts_array!(
515 sort_column.data_type() => (array_iter_helper, sort_column),
516 _ => internal_err!(
517 "Unsupported data type for sort column: {:?}",
518 sort_column.data_type()
519 )?,
520 );
521
522 for (idx, val) in sort_column_iter {
523 if let Some(val) = val
525 && (val >= cur_range.end.value() || val < cur_range.start.value())
526 {
527 return Ok(Some(idx));
528 }
529 }
530
531 Ok(None)
532 }
533
534 fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
535 match &mut self.buffer {
536 PartSortBuffer::All(v) => v.push(batch),
537 PartSortBuffer::Top(top, cnt) => {
538 *cnt += batch.num_rows();
539 top.insert_batch(batch)?;
540 }
541 }
542
543 Ok(())
544 }
545
546 fn can_stop_early(&mut self, schema: &Arc<Schema>) -> datafusion_common::Result<bool> {
550 let topk_cnt = match &self.buffer {
551 PartSortBuffer::Top(_, cnt) => *cnt,
552 _ => return Ok(false),
553 };
554 if Some(topk_cnt) < self.limit {
556 return Ok(false);
557 }
558 let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() {
559 self.range_groups[self.cur_group_idx + 1].0
560 } else {
561 return Ok(false);
563 };
564
565 let filter = self
569 .filter
570 .as_ref()
571 .expect("TopKDynamicFilters must be provided when limit is set");
572 let filter = filter.read().expr().current()?;
573 let mut ts_index = None;
574 let filter = filter
576 .transform_down(|c| {
577 if let Some(column) = c.as_any().downcast_ref::<Column>() {
579 ts_index = Some(column.index());
580 Ok(Transformed::yes(
581 Arc::new(Column::new(column.name(), 0)) as Arc<dyn PhysicalExpr>
582 ))
583 } else {
584 Ok(Transformed::no(c))
585 }
586 })?
587 .data;
588 let Some(ts_index) = ts_index else {
589 return Ok(false); };
591 let field = if schema.fields().len() <= ts_index {
592 warn!(
593 "Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}",
594 self.partition, schema, ts_index
595 );
596 return Ok(false); } else {
598 schema.field(ts_index)
599 };
600 let schema = Arc::new(Schema::new(vec![field.clone()]));
601 let primary_end_array = match next_group_primary_end.unit() {
603 TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![
604 next_group_primary_end.value(),
605 ])) as ArrayRef,
606 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![
607 next_group_primary_end.value(),
608 ])) as ArrayRef,
609 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![
610 next_group_primary_end.value(),
611 ])) as ArrayRef,
612 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![
613 next_group_primary_end.value(),
614 ])) as ArrayRef,
615 };
616 let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?;
617 let res = filter.evaluate(&primary_end_batch)?;
618 let array = res.into_array(primary_end_batch.num_rows())?;
619 let filter = array.as_boolean().clone();
620 let overlap = filter.iter().next().flatten();
621 if let Some(false) = overlap {
622 Ok(true)
623 } else {
624 Ok(false)
625 }
626 }
627
628 fn is_in_current_group(&self, part_idx: usize) -> bool {
630 if self.cur_group_idx >= self.range_groups.len() {
631 return false;
632 }
633 let (_, start, end) = self.range_groups[self.cur_group_idx];
634 part_idx >= start && part_idx < end
635 }
636
637 fn advance_to_next_group(&mut self) -> bool {
639 self.cur_group_idx += 1;
640 self.cur_group_idx < self.range_groups.len()
641 }
642
643 fn get_current_group_effective_range(&self) -> Option<PartitionRange> {
647 if self.cur_group_idx >= self.range_groups.len() {
648 return None;
649 }
650 let (_, start_idx, end_idx) = self.range_groups[self.cur_group_idx];
651 if start_idx >= end_idx || start_idx >= self.partition_ranges.len() {
652 return None;
653 }
654
655 let ranges_in_group =
656 &self.partition_ranges[start_idx..end_idx.min(self.partition_ranges.len())];
657 if ranges_in_group.is_empty() {
658 return None;
659 }
660
661 let mut min_start = ranges_in_group[0].start;
663 let mut max_end = ranges_in_group[0].end;
664 for range in ranges_in_group.iter().skip(1) {
665 if range.start < min_start {
666 min_start = range.start;
667 }
668 if range.end > max_end {
669 max_end = range.end;
670 }
671 }
672
673 Some(PartitionRange {
674 start: min_start,
675 end: max_end,
676 num_rows: 0, identifier: 0, })
679 }
680
681 fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
685 match &mut self.buffer {
686 PartSortBuffer::All(_) => self.sort_all_buffer(),
687 PartSortBuffer::Top(_, _) => self.sort_top_buffer(),
688 }
689 }
690
691 fn sort_all_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
693 let PartSortBuffer::All(buffer) =
694 std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new()))
695 else {
696 unreachable!("buffer type is checked before and should be All variant")
697 };
698
699 if buffer.is_empty() {
700 return Ok(DfRecordBatch::new_empty(self.schema.clone()));
701 }
702 let mut sort_columns = Vec::with_capacity(buffer.len());
703 let mut opt = None;
704 for batch in buffer.iter() {
705 let sort_column = self.expression.evaluate_to_sort_column(batch)?;
706 opt = opt.or(sort_column.options);
707 sort_columns.push(sort_column.values);
708 }
709
710 let sort_column =
711 concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| {
712 DataFusionError::ArrowError(
713 Box::new(e),
714 Some(format!("Fail to concat sort columns at {}", location!())),
715 )
716 })?;
717
718 let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| {
719 DataFusionError::ArrowError(
720 Box::new(e),
721 Some(format!("Fail to sort to indices at {}", location!())),
722 )
723 })?;
724 if indices.is_empty() {
725 return Ok(DfRecordBatch::new_empty(self.schema.clone()));
726 }
727
728 self.check_in_range(
729 &sort_column,
730 (
731 indices.value(0) as usize,
732 indices.value(indices.len() - 1) as usize,
733 ),
734 )
735 .inspect_err(|_e| {
736 #[cfg(debug_assertions)]
737 common_telemetry::error!(
738 "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}",
739 self.partition,
740 self.cur_part_idx,
741 sort_column.len(),
742 _e
743 );
744 })?;
745
746 let total_mem: usize = buffer.iter().map(|r| r.get_array_memory_size()).sum();
748 self.reservation.try_grow(total_mem * 2)?;
749
750 let full_input = concat_batches(&self.schema, &buffer).map_err(|e| {
751 DataFusionError::ArrowError(
752 Box::new(e),
753 Some(format!(
754 "Fail to concat input batches when sorting at {}",
755 location!()
756 )),
757 )
758 })?;
759
760 let sorted = take_record_batch(&full_input, &indices).map_err(|e| {
761 DataFusionError::ArrowError(
762 Box::new(e),
763 Some(format!(
764 "Fail to take result record batch when sorting at {}",
765 location!()
766 )),
767 )
768 })?;
769
770 drop(full_input);
771 self.reservation.shrink(2 * total_mem);
773 Ok(sorted)
774 }
775
776 fn sort_top_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
778 let Some(filter) = self.filter.clone() else {
779 return internal_err!(
780 "TopKDynamicFilters must be provided when sorting with limit at {}",
781 snafu::location!()
782 );
783 };
784
785 let new_top_buffer = TopK::try_new(
786 self.partition,
787 self.schema().clone(),
788 vec![],
789 [self.expression.clone()].into(),
790 self.limit.unwrap(),
791 self.context.session_config().batch_size(),
792 self.context.runtime_env(),
793 &self.root_metrics,
794 filter,
795 )?;
796 let PartSortBuffer::Top(top_k, _) =
797 std::mem::replace(&mut self.buffer, PartSortBuffer::Top(new_top_buffer, 0))
798 else {
799 unreachable!("buffer type is checked before and should be Top variant")
800 };
801
802 let mut result_stream = top_k.emit()?;
803 let mut placeholder_ctx = std::task::Context::from_waker(futures::task::noop_waker_ref());
804 let mut results = vec![];
805 loop {
807 match result_stream.poll_next_unpin(&mut placeholder_ctx) {
808 Poll::Ready(Some(batch)) => {
809 let batch = batch?;
810 results.push(batch);
811 }
812 Poll::Pending => {
813 #[cfg(debug_assertions)]
814 unreachable!("TopK result stream should always be ready")
815 }
816 Poll::Ready(None) => {
817 break;
818 }
819 }
820 }
821
822 let concat_batch = concat_batches(&self.schema, &results).map_err(|e| {
823 DataFusionError::ArrowError(
824 Box::new(e),
825 Some(format!(
826 "Fail to concat top k result record batch when sorting at {}",
827 location!()
828 )),
829 )
830 })?;
831
832 Ok(concat_batch)
833 }
834
835 fn sorted_buffer_if_non_empty(&mut self) -> datafusion_common::Result<Option<DfRecordBatch>> {
837 if self.buffer.is_empty() {
838 return Ok(None);
839 }
840
841 let sorted = self.sort_buffer()?;
842 if sorted.num_rows() == 0 {
843 Ok(None)
844 } else {
845 Ok(Some(sorted))
846 }
847 }
848
849 fn split_batch(
866 &mut self,
867 batch: DfRecordBatch,
868 ) -> datafusion_common::Result<Option<DfRecordBatch>> {
869 if matches!(self.buffer, PartSortBuffer::Top(_, _)) {
870 self.split_batch_topk(batch)?;
871 return Ok(None);
872 }
873
874 self.split_batch_all(batch)
875 }
876
877 fn split_batch_topk(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
883 if batch.num_rows() == 0 {
884 return Ok(());
885 }
886
887 let sort_column = self
888 .expression
889 .expr
890 .evaluate(&batch)?
891 .into_array(batch.num_rows())?;
892
893 let next_range_idx = self.try_find_next_range(&sort_column)?;
894 let Some(idx) = next_range_idx else {
895 self.push_buffer(batch)?;
896 return Ok(());
898 };
899
900 let this_range = batch.slice(0, idx);
901 let remaining_range = batch.slice(idx, batch.num_rows() - idx);
902 if this_range.num_rows() != 0 {
903 self.push_buffer(this_range)?;
904 }
905
906 self.cur_part_idx += 1;
908
909 if self.cur_part_idx >= self.partition_ranges.len() {
911 debug_assert!(remaining_range.num_rows() == 0);
912 self.input_complete = true;
913 return Ok(());
914 }
915
916 let in_same_group = self.is_in_current_group(self.cur_part_idx);
918
919 if !in_same_group && self.can_stop_early(&batch.schema())? {
922 self.input_complete = true;
923 self.evaluating_batch = None;
924 return Ok(());
925 }
926
927 if !in_same_group {
929 self.advance_to_next_group();
930 }
931
932 let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
933 if self.try_find_next_range(&next_sort_column)?.is_some() {
934 self.evaluating_batch = Some(remaining_range);
937 } else if remaining_range.num_rows() != 0 {
938 self.push_buffer(remaining_range)?;
941 }
942
943 Ok(())
944 }
945
946 fn split_batch_all(
947 &mut self,
948 batch: DfRecordBatch,
949 ) -> datafusion_common::Result<Option<DfRecordBatch>> {
950 if batch.num_rows() == 0 {
951 return Ok(None);
952 }
953
954 let sort_column = self
955 .expression
956 .expr
957 .evaluate(&batch)?
958 .into_array(batch.num_rows())?;
959
960 let next_range_idx = self.try_find_next_range(&sort_column)?;
961 let Some(idx) = next_range_idx else {
962 self.push_buffer(batch)?;
963 return Ok(None);
965 };
966
967 let this_range = batch.slice(0, idx);
968 let remaining_range = batch.slice(idx, batch.num_rows() - idx);
969 if this_range.num_rows() != 0 {
970 self.push_buffer(this_range)?;
971 }
972
973 self.cur_part_idx += 1;
975
976 if self.cur_part_idx >= self.partition_ranges.len() {
978 debug_assert!(remaining_range.num_rows() == 0);
980
981 return self.sorted_buffer_if_non_empty();
983 }
984
985 if self.is_in_current_group(self.cur_part_idx) {
987 let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
989 if self.try_find_next_range(&next_sort_column)?.is_some() {
990 self.evaluating_batch = Some(remaining_range);
992 } else {
993 if remaining_range.num_rows() != 0 {
995 self.push_buffer(remaining_range)?;
996 }
997 }
998 return Ok(None);
1000 }
1001
1002 let sorted_batch = self.sorted_buffer_if_non_empty()?;
1004 self.advance_to_next_group();
1005
1006 let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
1007 if self.try_find_next_range(&next_sort_column)?.is_some() {
1008 self.evaluating_batch = Some(remaining_range);
1011 } else {
1012 if remaining_range.num_rows() != 0 {
1015 self.push_buffer(remaining_range)?;
1016 }
1017 }
1018
1019 Ok(sorted_batch)
1020 }
1021
1022 pub fn poll_next_inner(
1023 mut self: Pin<&mut Self>,
1024 cx: &mut Context<'_>,
1025 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
1026 loop {
1027 if self.input_complete {
1028 if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
1029 return Poll::Ready(Some(Ok(sorted_batch)));
1030 }
1031 return Poll::Ready(None);
1032 }
1033
1034 if let Some(evaluating_batch) = self.evaluating_batch.take()
1037 && evaluating_batch.num_rows() != 0
1038 {
1039 if self.cur_part_idx >= self.partition_ranges.len() {
1041 if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
1043 return Poll::Ready(Some(Ok(sorted_batch)));
1044 }
1045 return Poll::Ready(None);
1046 }
1047
1048 if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
1049 return Poll::Ready(Some(Ok(sorted_batch)));
1050 }
1051 continue;
1052 }
1053
1054 let res = self.input.as_mut().poll_next(cx);
1056 match res {
1057 Poll::Ready(Some(Ok(batch))) => {
1058 if let Some(sorted_batch) = self.split_batch(batch)? {
1059 return Poll::Ready(Some(Ok(sorted_batch)));
1060 }
1061 }
1062 Poll::Ready(None) => {
1064 self.input_complete = true;
1065 }
1066 Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
1067 Poll::Pending => return Poll::Pending,
1068 }
1069 }
1070 }
1071}
1072
1073impl Stream for PartSortStream {
1074 type Item = datafusion_common::Result<DfRecordBatch>;
1075
1076 fn poll_next(
1077 mut self: Pin<&mut Self>,
1078 cx: &mut Context<'_>,
1079 ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
1080 let result = self.as_mut().poll_next_inner(cx);
1081 self.metrics.record_poll(result)
1082 }
1083}
1084
1085impl RecordBatchStream for PartSortStream {
1086 fn schema(&self) -> SchemaRef {
1087 self.schema.clone()
1088 }
1089}
1090
1091#[cfg(test)]
1092mod test {
1093 use std::sync::Arc;
1094
1095 use arrow::array::{
1096 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
1097 TimestampSecondArray,
1098 };
1099 use arrow::json::ArrayWriter;
1100 use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit};
1101 use common_time::Timestamp;
1102 use datafusion_physical_expr::expressions::Column;
1103 use futures::StreamExt;
1104 use store_api::region_engine::PartitionRange;
1105
1106 use super::*;
1107 use crate::test_util::{MockInputExec, new_ts_array};
1108
1109 #[tokio::test]
1110 async fn test_can_stop_early_with_empty_topk_buffer() {
1111 let unit = TimeUnit::Millisecond;
1112 let schema = Arc::new(Schema::new(vec![Field::new(
1113 "ts",
1114 DataType::Timestamp(unit, None),
1115 false,
1116 )]));
1117
1118 let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
1121 let exec = PartSortExec::try_new(
1122 PhysicalSortExpr {
1123 expr: Arc::new(Column::new("ts", 0)),
1124 options: SortOptions {
1125 descending: true,
1126 ..Default::default()
1127 },
1128 },
1129 Some(3),
1130 vec![vec![]],
1131 mock_input.clone(),
1132 )
1133 .unwrap();
1134
1135 let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1136 DynamicFilterPhysicalExpr::new(vec![], lit(false)),
1137 ))));
1138
1139 let input_stream = mock_input
1140 .execute(0, Arc::new(TaskContext::default()))
1141 .unwrap();
1142 let mut stream = PartSortStream::new(
1143 Arc::new(TaskContext::default()),
1144 &exec,
1145 Some(3),
1146 input_stream,
1147 vec![],
1148 0,
1149 Some(filter),
1150 )
1151 .unwrap();
1152
1153 let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1155 .unwrap();
1156 stream.push_buffer(batch).unwrap();
1157
1158 assert!(!stream.can_stop_early(&schema).unwrap());
1161 }
1162
1163 #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
1164 #[tokio::test]
1165 async fn fuzzy_test() {
1166 let test_cnt = 100;
1167 let part_cnt_bound = 100;
1169 let range_size_bound = 100;
1171 let range_offset_bound = 100;
1172 let batch_cnt_bound = 20;
1174 let batch_size_bound = 100;
1175
1176 let mut rng = fastrand::Rng::new();
1177 rng.seed(1337);
1178
1179 let mut test_cases = Vec::new();
1180
1181 for case_id in 0..test_cnt {
1182 let mut bound_val: Option<i64> = None;
1183 let descending = rng.bool();
1184 let nulls_first = rng.bool();
1185 let opt = SortOptions {
1186 descending,
1187 nulls_first,
1188 };
1189 let limit = if rng.bool() {
1190 Some(rng.usize(1..batch_cnt_bound * batch_size_bound))
1191 } else {
1192 None
1193 };
1194 let unit = match rng.u8(0..3) {
1195 0 => TimeUnit::Second,
1196 1 => TimeUnit::Millisecond,
1197 2 => TimeUnit::Microsecond,
1198 _ => TimeUnit::Nanosecond,
1199 };
1200
1201 let schema = Schema::new(vec![Field::new(
1202 "ts",
1203 DataType::Timestamp(unit, None),
1204 false,
1205 )]);
1206 let schema = Arc::new(schema);
1207
1208 let mut input_ranged_data = vec![];
1209 let mut output_ranges = vec![];
1210 let mut output_data = vec![];
1211 for part_id in 0..rng.usize(0..part_cnt_bound) {
1213 let (start, end) = if descending {
1215 let end = bound_val
1217 .map(
1218 |i| i
1219 .checked_sub(rng.i64(1..=range_offset_bound))
1220 .expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")
1221 )
1222 .unwrap_or_else(|| rng.i64(-100000000..100000000));
1223 bound_val = Some(end);
1224 let start = end - rng.i64(1..range_size_bound);
1225 let start = Timestamp::new(start, unit.into());
1226 let end = Timestamp::new(end, unit.into());
1227 (start, end)
1228 } else {
1229 let start = bound_val
1231 .map(|i| i + rng.i64(1..=range_offset_bound))
1232 .unwrap_or_else(|| rng.i64(..));
1233 bound_val = Some(start);
1234 let end = start + rng.i64(1..range_size_bound);
1235 let start = Timestamp::new(start, unit.into());
1236 let end = Timestamp::new(end, unit.into());
1237 (start, end)
1238 };
1239 assert!(start < end);
1240
1241 let mut per_part_sort_data = vec![];
1242 let mut batches = vec![];
1243 for _batch_idx in 0..rng.usize(1..batch_cnt_bound) {
1244 let cnt = rng.usize(0..batch_size_bound) + 1;
1245 let iter = 0..rng.usize(0..cnt);
1246 let mut data_gen = iter
1247 .map(|_| rng.i64(start.value()..end.value()))
1248 .collect_vec();
1249 if data_gen.is_empty() {
1250 continue;
1252 }
1253 data_gen.sort();
1255 per_part_sort_data.extend(data_gen.clone());
1256 let arr = new_ts_array(unit, data_gen.clone());
1257 let batch = DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap();
1258 batches.push(batch);
1259 }
1260
1261 let range = PartitionRange {
1262 start,
1263 end,
1264 num_rows: batches.iter().map(|b| b.num_rows()).sum(),
1265 identifier: part_id,
1266 };
1267 input_ranged_data.push((range, batches));
1268
1269 output_ranges.push(range);
1270 if per_part_sort_data.is_empty() {
1271 continue;
1272 }
1273 output_data.extend_from_slice(&per_part_sort_data);
1274 }
1275
1276 let mut output_data_iter = output_data.iter().peekable();
1278 let mut output_data = vec![];
1279 for range in output_ranges.clone() {
1280 let mut cur_data = vec![];
1281 while let Some(val) = output_data_iter.peek() {
1282 if **val < range.start.value() || **val >= range.end.value() {
1283 break;
1284 }
1285 cur_data.push(*output_data_iter.next().unwrap());
1286 }
1287
1288 if cur_data.is_empty() {
1289 continue;
1290 }
1291
1292 if descending {
1293 cur_data.sort_by(|a, b| b.cmp(a));
1294 } else {
1295 cur_data.sort();
1296 }
1297 output_data.push(cur_data);
1298 }
1299
1300 let expected_output = if let Some(limit) = limit {
1301 let mut accumulated = Vec::new();
1302 let mut seen = 0usize;
1303 for mut range_values in output_data {
1304 seen += range_values.len();
1305 accumulated.append(&mut range_values);
1306 if seen >= limit {
1307 break;
1308 }
1309 }
1310
1311 if accumulated.is_empty() {
1312 None
1313 } else {
1314 if descending {
1315 accumulated.sort_by(|a, b| b.cmp(a));
1316 } else {
1317 accumulated.sort();
1318 }
1319 accumulated.truncate(limit.min(accumulated.len()));
1320
1321 Some(
1322 DfRecordBatch::try_new(
1323 schema.clone(),
1324 vec![new_ts_array(unit, accumulated)],
1325 )
1326 .unwrap(),
1327 )
1328 }
1329 } else {
1330 let batches = output_data
1331 .into_iter()
1332 .map(|a| {
1333 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap()
1334 })
1335 .collect_vec();
1336 if batches.is_empty() {
1337 None
1338 } else {
1339 Some(concat_batches(&schema, &batches).unwrap())
1340 }
1341 };
1342
1343 test_cases.push((
1344 case_id,
1345 unit,
1346 input_ranged_data,
1347 schema,
1348 opt,
1349 limit,
1350 expected_output,
1351 ));
1352 }
1353
1354 for (case_id, _unit, input_ranged_data, schema, opt, limit, expected_output) in test_cases {
1355 run_test(
1356 case_id,
1357 input_ranged_data,
1358 schema,
1359 opt,
1360 limit,
1361 expected_output,
1362 None,
1363 )
1364 .await;
1365 }
1366 }
1367
1368 #[tokio::test]
1369 async fn simple_cases() {
1370 let testcases = vec![
1371 (
1372 TimeUnit::Millisecond,
1373 vec![
1374 ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
1375 ((5, 10), vec![vec![5, 6], vec![7, 8]]),
1376 ],
1377 false,
1378 None,
1379 vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]],
1380 ),
1381 (
1384 TimeUnit::Millisecond,
1385 vec![
1386 ((5, 10), vec![vec![5, 6], vec![7, 8, 9]]),
1387 ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1388 ],
1389 true,
1390 None,
1391 vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]],
1392 ),
1393 (
1394 TimeUnit::Millisecond,
1395 vec![
1396 ((5, 10), vec![]),
1397 ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1398 ],
1399 true,
1400 None,
1401 vec![vec![8, 7, 6, 5, 4, 3, 2, 1]],
1402 ),
1403 (
1404 TimeUnit::Millisecond,
1405 vec![
1406 ((15, 20), vec![vec![17, 18, 19]]),
1407 ((10, 15), vec![]),
1408 ((5, 10), vec![]),
1409 ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1410 ],
1411 true,
1412 None,
1413 vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]],
1414 ),
1415 (
1416 TimeUnit::Millisecond,
1417 vec![
1418 ((15, 20), vec![]),
1419 ((10, 15), vec![]),
1420 ((5, 10), vec![]),
1421 ((0, 10), vec![]),
1422 ],
1423 true,
1424 None,
1425 vec![],
1426 ),
1427 (
1432 TimeUnit::Millisecond,
1433 vec![
1434 (
1435 (15, 20),
1436 vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]],
1437 ),
1438 ((10, 15), vec![]),
1439 ((5, 10), vec![]),
1440 ((0, 10), vec![]),
1441 ],
1442 true,
1443 None,
1444 vec![
1445 vec![19, 17, 15],
1446 vec![12, 11, 10],
1447 vec![9, 8, 7, 6, 5, 4, 3, 2, 1],
1448 ],
1449 ),
1450 (
1451 TimeUnit::Millisecond,
1452 vec![
1453 (
1454 (15, 20),
1455 vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]],
1456 ),
1457 ((10, 15), vec![]),
1458 ((5, 10), vec![]),
1459 ((0, 10), vec![]),
1460 ],
1461 true,
1462 Some(2),
1463 vec![vec![19, 17]],
1464 ),
1465 ];
1466
1467 for (identifier, (unit, input_ranged_data, descending, limit, expected_output)) in
1468 testcases.into_iter().enumerate()
1469 {
1470 let schema = Schema::new(vec![Field::new(
1471 "ts",
1472 DataType::Timestamp(unit, None),
1473 false,
1474 )]);
1475 let schema = Arc::new(schema);
1476 let opt = SortOptions {
1477 descending,
1478 ..Default::default()
1479 };
1480
1481 let input_ranged_data = input_ranged_data
1482 .into_iter()
1483 .map(|(range, data)| {
1484 let part = PartitionRange {
1485 start: Timestamp::new(range.0, unit.into()),
1486 end: Timestamp::new(range.1, unit.into()),
1487 num_rows: data.iter().map(|b| b.len()).sum(),
1488 identifier,
1489 };
1490
1491 let batches = data
1492 .into_iter()
1493 .map(|b| {
1494 let arr = new_ts_array(unit, b);
1495 DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap()
1496 })
1497 .collect_vec();
1498 (part, batches)
1499 })
1500 .collect_vec();
1501
1502 let expected_output = expected_output
1503 .into_iter()
1504 .map(|a| {
1505 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap()
1506 })
1507 .collect_vec();
1508 let expected_output = if expected_output.is_empty() {
1509 None
1510 } else {
1511 Some(concat_batches(&schema, &expected_output).unwrap())
1512 };
1513
1514 run_test(
1515 identifier,
1516 input_ranged_data,
1517 schema.clone(),
1518 opt,
1519 limit,
1520 expected_output,
1521 None,
1522 )
1523 .await;
1524 }
1525 }
1526
1527 #[allow(clippy::print_stdout)]
1528 async fn run_test(
1529 case_id: usize,
1530 input_ranged_data: Vec<(PartitionRange, Vec<DfRecordBatch>)>,
1531 schema: SchemaRef,
1532 opt: SortOptions,
1533 limit: Option<usize>,
1534 expected_output: Option<DfRecordBatch>,
1535 expected_polled_rows: Option<usize>,
1536 ) {
1537 if let (Some(limit), Some(rb)) = (limit, &expected_output) {
1538 assert!(
1539 rb.num_rows() <= limit,
1540 "Expect row count in expected output({}) <= limit({})",
1541 rb.num_rows(),
1542 limit
1543 );
1544 }
1545
1546 let mut data_partition = Vec::with_capacity(input_ranged_data.len());
1547 let mut ranges = Vec::with_capacity(input_ranged_data.len());
1548 for (part_range, batches) in input_ranged_data {
1549 data_partition.push(batches);
1550 ranges.push(part_range);
1551 }
1552
1553 let mock_input = Arc::new(MockInputExec::new(data_partition, schema.clone()));
1554
1555 let exec = PartSortExec::try_new(
1556 PhysicalSortExpr {
1557 expr: Arc::new(Column::new("ts", 0)),
1558 options: opt,
1559 },
1560 limit,
1561 vec![ranges.clone()],
1562 mock_input.clone(),
1563 )
1564 .unwrap();
1565
1566 let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
1567
1568 let real_output = exec_stream.map(|r| r.unwrap()).collect::<Vec<_>>().await;
1569 if limit.is_some() {
1570 assert!(
1571 real_output.len() <= 1,
1572 "case_{case_id} expects a single output batch when limit is set, got {}",
1573 real_output.len()
1574 );
1575 }
1576
1577 let actual_output = if real_output.is_empty() {
1578 None
1579 } else {
1580 Some(concat_batches(&schema, &real_output).unwrap())
1581 };
1582
1583 if let Some(expected_polled_rows) = expected_polled_rows {
1584 let input_pulled_rows = mock_input.metrics().unwrap().output_rows().unwrap();
1585 assert_eq!(input_pulled_rows, expected_polled_rows);
1586 }
1587
1588 match (actual_output, expected_output) {
1589 (None, None) => {}
1590 (Some(actual), Some(expected)) => {
1591 if actual != expected {
1592 let mut actual_json: Vec<u8> = Vec::new();
1593 let mut writer = ArrayWriter::new(&mut actual_json);
1594 writer.write(&actual).unwrap();
1595 writer.finish().unwrap();
1596
1597 let mut expected_json: Vec<u8> = Vec::new();
1598 let mut writer = ArrayWriter::new(&mut expected_json);
1599 writer.write(&expected).unwrap();
1600 writer.finish().unwrap();
1601
1602 panic!(
1603 "case_{} failed (limit {limit:?}), opt: {:?},\nreal_output: {}\nexpected: {}",
1604 case_id,
1605 opt,
1606 String::from_utf8_lossy(&actual_json),
1607 String::from_utf8_lossy(&expected_json),
1608 );
1609 }
1610 }
1611 (None, Some(expected)) => panic!(
1612 "case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows",
1613 case_id,
1614 opt,
1615 expected.num_rows()
1616 ),
1617 (Some(actual), None) => panic!(
1618 "case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty",
1619 case_id,
1620 opt,
1621 actual.num_rows()
1622 ),
1623 }
1624 }
1625
1626 #[tokio::test]
1629 async fn test_limit_with_multiple_batches_per_partition() {
1630 let unit = TimeUnit::Millisecond;
1631 let schema = Arc::new(Schema::new(vec![Field::new(
1632 "ts",
1633 DataType::Timestamp(unit, None),
1634 false,
1635 )]));
1636
1637 let input_ranged_data = vec![(
1641 PartitionRange {
1642 start: Timestamp::new(0, unit.into()),
1643 end: Timestamp::new(10, unit.into()),
1644 num_rows: 9,
1645 identifier: 0,
1646 },
1647 vec![
1648 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1649 .unwrap(),
1650 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5, 6])])
1651 .unwrap(),
1652 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![7, 8, 9])])
1653 .unwrap(),
1654 ],
1655 )];
1656
1657 let expected_output = Some(
1658 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![9, 8, 7])])
1659 .unwrap(),
1660 );
1661
1662 run_test(
1663 1000,
1664 input_ranged_data,
1665 schema.clone(),
1666 SortOptions {
1667 descending: true,
1668 ..Default::default()
1669 },
1670 Some(3),
1671 expected_output,
1672 None,
1673 )
1674 .await;
1675
1676 let input_ranged_data = vec![
1680 (
1681 PartitionRange {
1682 start: Timestamp::new(10, unit.into()),
1683 end: Timestamp::new(20, unit.into()),
1684 num_rows: 6,
1685 identifier: 0,
1686 },
1687 vec![
1688 DfRecordBatch::try_new(
1689 schema.clone(),
1690 vec![new_ts_array(unit, vec![10, 11, 12])],
1691 )
1692 .unwrap(),
1693 DfRecordBatch::try_new(
1694 schema.clone(),
1695 vec![new_ts_array(unit, vec![13, 14, 15])],
1696 )
1697 .unwrap(),
1698 ],
1699 ),
1700 (
1701 PartitionRange {
1702 start: Timestamp::new(0, unit.into()),
1703 end: Timestamp::new(10, unit.into()),
1704 num_rows: 5,
1705 identifier: 1,
1706 },
1707 vec![
1708 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1709 .unwrap(),
1710 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5])])
1711 .unwrap(),
1712 ],
1713 ),
1714 ];
1715
1716 let expected_output = Some(
1717 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![15, 14])]).unwrap(),
1718 );
1719
1720 run_test(
1721 1001,
1722 input_ranged_data,
1723 schema.clone(),
1724 SortOptions {
1725 descending: true,
1726 ..Default::default()
1727 },
1728 Some(2),
1729 expected_output,
1730 None,
1731 )
1732 .await;
1733
1734 let input_ranged_data = vec![(
1737 PartitionRange {
1738 start: Timestamp::new(0, unit.into()),
1739 end: Timestamp::new(10, unit.into()),
1740 num_rows: 9,
1741 identifier: 0,
1742 },
1743 vec![
1744 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![7, 8, 9])])
1745 .unwrap(),
1746 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5, 6])])
1747 .unwrap(),
1748 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1749 .unwrap(),
1750 ],
1751 )];
1752
1753 let expected_output = Some(
1754 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2])]).unwrap(),
1755 );
1756
1757 run_test(
1758 1002,
1759 input_ranged_data,
1760 schema.clone(),
1761 SortOptions {
1762 descending: false,
1763 ..Default::default()
1764 },
1765 Some(2),
1766 expected_output,
1767 None,
1768 )
1769 .await;
1770 }
1771
1772 #[tokio::test]
1776 async fn test_early_termination() {
1777 let unit = TimeUnit::Millisecond;
1778 let schema = Arc::new(Schema::new(vec![Field::new(
1779 "ts",
1780 DataType::Timestamp(unit, None),
1781 false,
1782 )]));
1783
1784 let input_ranged_data = vec![
1789 (
1790 PartitionRange {
1791 start: Timestamp::new(20, unit.into()),
1792 end: Timestamp::new(30, unit.into()),
1793 num_rows: 10,
1794 identifier: 2,
1795 },
1796 vec![
1797 DfRecordBatch::try_new(
1798 schema.clone(),
1799 vec![new_ts_array(unit, vec![21, 22, 23, 24, 25])],
1800 )
1801 .unwrap(),
1802 DfRecordBatch::try_new(
1803 schema.clone(),
1804 vec![new_ts_array(unit, vec![26, 27, 28, 29, 30])],
1805 )
1806 .unwrap(),
1807 ],
1808 ),
1809 (
1810 PartitionRange {
1811 start: Timestamp::new(10, unit.into()),
1812 end: Timestamp::new(20, unit.into()),
1813 num_rows: 10,
1814 identifier: 1,
1815 },
1816 vec![
1817 DfRecordBatch::try_new(
1818 schema.clone(),
1819 vec![new_ts_array(unit, vec![11, 12, 13, 14, 15])],
1820 )
1821 .unwrap(),
1822 DfRecordBatch::try_new(
1823 schema.clone(),
1824 vec![new_ts_array(unit, vec![16, 17, 18, 19, 20])],
1825 )
1826 .unwrap(),
1827 ],
1828 ),
1829 (
1830 PartitionRange {
1831 start: Timestamp::new(0, unit.into()),
1832 end: Timestamp::new(10, unit.into()),
1833 num_rows: 10,
1834 identifier: 0,
1835 },
1836 vec![
1837 DfRecordBatch::try_new(
1838 schema.clone(),
1839 vec![new_ts_array(unit, vec![1, 2, 3, 4, 5])],
1840 )
1841 .unwrap(),
1842 DfRecordBatch::try_new(
1843 schema.clone(),
1844 vec![new_ts_array(unit, vec![6, 7, 8, 9, 10])],
1845 )
1846 .unwrap(),
1847 ],
1848 ),
1849 ];
1850
1851 let expected_output = Some(
1855 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![29, 28])]).unwrap(),
1856 );
1857
1858 run_test(
1859 1003,
1860 input_ranged_data,
1861 schema.clone(),
1862 SortOptions {
1863 descending: true,
1864 ..Default::default()
1865 },
1866 Some(2),
1867 expected_output,
1868 Some(10),
1869 )
1870 .await;
1871 }
1872
1873 #[tokio::test]
1877 async fn test_primary_end_grouping_with_limit() {
1878 let unit = TimeUnit::Millisecond;
1879 let schema = Arc::new(Schema::new(vec![Field::new(
1880 "ts",
1881 DataType::Timestamp(unit, None),
1882 false,
1883 )]));
1884
1885 let input_ranged_data = vec![
1889 (
1890 PartitionRange {
1891 start: Timestamp::new(70, unit.into()),
1892 end: Timestamp::new(100, unit.into()),
1893 num_rows: 3,
1894 identifier: 0,
1895 },
1896 vec![
1897 DfRecordBatch::try_new(
1898 schema.clone(),
1899 vec![new_ts_array(unit, vec![80, 90, 95])],
1900 )
1901 .unwrap(),
1902 ],
1903 ),
1904 (
1905 PartitionRange {
1906 start: Timestamp::new(50, unit.into()),
1907 end: Timestamp::new(100, unit.into()),
1908 num_rows: 5,
1909 identifier: 1,
1910 },
1911 vec![
1912 DfRecordBatch::try_new(
1913 schema.clone(),
1914 vec![new_ts_array(unit, vec![55, 65, 75, 85, 95])],
1915 )
1916 .unwrap(),
1917 ],
1918 ),
1919 ];
1920
1921 let expected_output = Some(
1925 DfRecordBatch::try_new(
1926 schema.clone(),
1927 vec![new_ts_array(unit, vec![95, 95, 90, 85])],
1928 )
1929 .unwrap(),
1930 );
1931
1932 run_test(
1933 2000,
1934 input_ranged_data,
1935 schema.clone(),
1936 SortOptions {
1937 descending: true,
1938 ..Default::default()
1939 },
1940 Some(4),
1941 expected_output,
1942 None,
1943 )
1944 .await;
1945 }
1946
1947 #[tokio::test]
1958 async fn test_three_ranges_keep_pulling() {
1959 let unit = TimeUnit::Millisecond;
1960 let schema = Arc::new(Schema::new(vec![Field::new(
1961 "ts",
1962 DataType::Timestamp(unit, None),
1963 false,
1964 )]));
1965
1966 let input_ranged_data = vec![
1968 (
1969 PartitionRange {
1970 start: Timestamp::new(70, unit.into()),
1971 end: Timestamp::new(100, unit.into()),
1972 num_rows: 3,
1973 identifier: 0,
1974 },
1975 vec![
1976 DfRecordBatch::try_new(
1977 schema.clone(),
1978 vec![new_ts_array(unit, vec![80, 90, 95])],
1979 )
1980 .unwrap(),
1981 ],
1982 ),
1983 (
1984 PartitionRange {
1985 start: Timestamp::new(50, unit.into()),
1986 end: Timestamp::new(100, unit.into()),
1987 num_rows: 3,
1988 identifier: 1,
1989 },
1990 vec![
1991 DfRecordBatch::try_new(
1992 schema.clone(),
1993 vec![new_ts_array(unit, vec![55, 75, 85])],
1994 )
1995 .unwrap(),
1996 ],
1997 ),
1998 (
1999 PartitionRange {
2000 start: Timestamp::new(40, unit.into()),
2001 end: Timestamp::new(95, unit.into()),
2002 num_rows: 3,
2003 identifier: 2,
2004 },
2005 vec![
2006 DfRecordBatch::try_new(
2007 schema.clone(),
2008 vec![new_ts_array(unit, vec![45, 65, 94])],
2009 )
2010 .unwrap(),
2011 ],
2012 ),
2013 ];
2014
2015 let expected_output = Some(
2019 DfRecordBatch::try_new(
2020 schema.clone(),
2021 vec![new_ts_array(unit, vec![95, 94, 90, 85])],
2022 )
2023 .unwrap(),
2024 );
2025
2026 run_test(
2027 2001,
2028 input_ranged_data,
2029 schema.clone(),
2030 SortOptions {
2031 descending: true,
2032 ..Default::default()
2033 },
2034 Some(4),
2035 expected_output,
2036 None,
2037 )
2038 .await;
2039 }
2040
2041 #[tokio::test]
2045 async fn test_threshold_based_early_termination() {
2046 let unit = TimeUnit::Millisecond;
2047 let schema = Arc::new(Schema::new(vec![Field::new(
2048 "ts",
2049 DataType::Timestamp(unit, None),
2050 false,
2051 )]));
2052
2053 let input_ranged_data = vec![
2057 (
2058 PartitionRange {
2059 start: Timestamp::new(70, unit.into()),
2060 end: Timestamp::new(100, unit.into()),
2061 num_rows: 6,
2062 identifier: 0,
2063 },
2064 vec![
2065 DfRecordBatch::try_new(
2066 schema.clone(),
2067 vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2068 )
2069 .unwrap(),
2070 ],
2071 ),
2072 (
2073 PartitionRange {
2074 start: Timestamp::new(50, unit.into()),
2075 end: Timestamp::new(90, unit.into()),
2076 num_rows: 3,
2077 identifier: 1,
2078 },
2079 vec![
2080 DfRecordBatch::try_new(
2081 schema.clone(),
2082 vec![new_ts_array(unit, vec![85, 86, 87])],
2083 )
2084 .unwrap(),
2085 ],
2086 ),
2087 ];
2088
2089 let expected_output = Some(
2093 DfRecordBatch::try_new(
2094 schema.clone(),
2095 vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2096 )
2097 .unwrap(),
2098 );
2099
2100 run_test(
2101 2002,
2102 input_ranged_data,
2103 schema.clone(),
2104 SortOptions {
2105 descending: true,
2106 ..Default::default()
2107 },
2108 Some(4),
2109 expected_output,
2110 Some(9), )
2112 .await;
2113 }
2114
2115 #[tokio::test]
2119 async fn test_continue_when_threshold_in_next_group_range() {
2120 let unit = TimeUnit::Millisecond;
2121 let schema = Arc::new(Schema::new(vec![Field::new(
2122 "ts",
2123 DataType::Timestamp(unit, None),
2124 false,
2125 )]));
2126
2127 let input_ranged_data = vec![
2131 (
2132 PartitionRange {
2133 start: Timestamp::new(90, unit.into()),
2134 end: Timestamp::new(100, unit.into()),
2135 num_rows: 6,
2136 identifier: 0,
2137 },
2138 vec![
2139 DfRecordBatch::try_new(
2140 schema.clone(),
2141 vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2142 )
2143 .unwrap(),
2144 ],
2145 ),
2146 (
2147 PartitionRange {
2148 start: Timestamp::new(50, unit.into()),
2149 end: Timestamp::new(98, unit.into()),
2150 num_rows: 3,
2151 identifier: 1,
2152 },
2153 vec![
2154 DfRecordBatch::try_new(
2156 schema.clone(),
2157 vec![new_ts_array(unit, vec![55, 60, 65])],
2158 )
2159 .unwrap(),
2160 ],
2161 ),
2162 ];
2163
2164 let expected_output = Some(
2169 DfRecordBatch::try_new(
2170 schema.clone(),
2171 vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2172 )
2173 .unwrap(),
2174 );
2175
2176 run_test(
2179 2003,
2180 input_ranged_data,
2181 schema.clone(),
2182 SortOptions {
2183 descending: true,
2184 ..Default::default()
2185 },
2186 Some(4),
2187 expected_output,
2188 Some(9), )
2190 .await;
2191 }
2192
2193 #[tokio::test]
2195 async fn test_ascending_threshold_early_termination() {
2196 let unit = TimeUnit::Millisecond;
2197 let schema = Arc::new(Schema::new(vec![Field::new(
2198 "ts",
2199 DataType::Timestamp(unit, None),
2200 false,
2201 )]));
2202
2203 let input_ranged_data = vec![
2208 (
2209 PartitionRange {
2210 start: Timestamp::new(10, unit.into()),
2211 end: Timestamp::new(50, unit.into()),
2212 num_rows: 6,
2213 identifier: 0,
2214 },
2215 vec![
2216 DfRecordBatch::try_new(
2217 schema.clone(),
2218 vec![new_ts_array(unit, vec![10, 11, 12, 13, 14, 15])],
2219 )
2220 .unwrap(),
2221 ],
2222 ),
2223 (
2224 PartitionRange {
2225 start: Timestamp::new(20, unit.into()),
2226 end: Timestamp::new(60, unit.into()),
2227 num_rows: 3,
2228 identifier: 1,
2229 },
2230 vec![
2231 DfRecordBatch::try_new(
2232 schema.clone(),
2233 vec![new_ts_array(unit, vec![25, 30, 35])],
2234 )
2235 .unwrap(),
2236 ],
2237 ),
2238 (
2240 PartitionRange {
2241 start: Timestamp::new(60, unit.into()),
2242 end: Timestamp::new(70, unit.into()),
2243 num_rows: 2,
2244 identifier: 1,
2245 },
2246 vec![
2247 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![60, 61])])
2248 .unwrap(),
2249 ],
2250 ),
2251 (
2253 PartitionRange {
2254 start: Timestamp::new(61, unit.into()),
2255 end: Timestamp::new(70, unit.into()),
2256 num_rows: 2,
2257 identifier: 1,
2258 },
2259 vec![
2260 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![71, 72])])
2261 .unwrap(),
2262 ],
2263 ),
2264 ];
2265
2266 let expected_output = Some(
2270 DfRecordBatch::try_new(
2271 schema.clone(),
2272 vec![new_ts_array(unit, vec![10, 11, 12, 13])],
2273 )
2274 .unwrap(),
2275 );
2276
2277 run_test(
2278 2004,
2279 input_ranged_data,
2280 schema.clone(),
2281 SortOptions {
2282 descending: false,
2283 ..Default::default()
2284 },
2285 Some(4),
2286 expected_output,
2287 Some(11), )
2289 .await;
2290 }
2291
2292 #[tokio::test]
2293 async fn test_ascending_threshold_early_termination_case_two() {
2294 let unit = TimeUnit::Millisecond;
2295 let schema = Arc::new(Schema::new(vec![Field::new(
2296 "ts",
2297 DataType::Timestamp(unit, None),
2298 false,
2299 )]));
2300
2301 let input_ranged_data = vec![
2308 (
2309 PartitionRange {
2310 start: Timestamp::new(0, unit.into()),
2311 end: Timestamp::new(20, unit.into()),
2312 num_rows: 4,
2313 identifier: 0,
2314 },
2315 vec![
2316 DfRecordBatch::try_new(
2317 schema.clone(),
2318 vec![new_ts_array(unit, vec![9, 10, 11, 12])],
2319 )
2320 .unwrap(),
2321 ],
2322 ),
2323 (
2324 PartitionRange {
2325 start: Timestamp::new(4, unit.into()),
2326 end: Timestamp::new(25, unit.into()),
2327 num_rows: 1,
2328 identifier: 1,
2329 },
2330 vec![
2331 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21])])
2332 .unwrap(),
2333 ],
2334 ),
2335 (
2336 PartitionRange {
2337 start: Timestamp::new(5, unit.into()),
2338 end: Timestamp::new(25, unit.into()),
2339 num_rows: 4,
2340 identifier: 1,
2341 },
2342 vec![
2343 DfRecordBatch::try_new(
2344 schema.clone(),
2345 vec![new_ts_array(unit, vec![5, 6, 7, 8])],
2346 )
2347 .unwrap(),
2348 ],
2349 ),
2350 (
2352 PartitionRange {
2353 start: Timestamp::new(42, unit.into()),
2354 end: Timestamp::new(52, unit.into()),
2355 num_rows: 2,
2356 identifier: 1,
2357 },
2358 vec![
2359 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![42, 51])])
2360 .unwrap(),
2361 ],
2362 ),
2363 (
2365 PartitionRange {
2366 start: Timestamp::new(48, unit.into()),
2367 end: Timestamp::new(53, unit.into()),
2368 num_rows: 2,
2369 identifier: 1,
2370 },
2371 vec![
2372 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![48, 51])])
2373 .unwrap(),
2374 ],
2375 ),
2376 ];
2377
2378 let expected_output = Some(
2381 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![5, 6, 7, 8])])
2382 .unwrap(),
2383 );
2384
2385 run_test(
2386 2005,
2387 input_ranged_data,
2388 schema.clone(),
2389 SortOptions {
2390 descending: false,
2391 ..Default::default()
2392 },
2393 Some(4),
2394 expected_output,
2395 Some(11), )
2397 .await;
2398 }
2399
2400 #[tokio::test]
2403 async fn test_early_stop_with_nulls() {
2404 let unit = TimeUnit::Millisecond;
2405 let schema = Arc::new(Schema::new(vec![Field::new(
2406 "ts",
2407 DataType::Timestamp(unit, None),
2408 true, )]));
2410
2411 let new_nullable_ts_array = |unit: TimeUnit, arr: Vec<Option<i64>>| -> ArrayRef {
2413 match unit {
2414 TimeUnit::Second => Arc::new(TimestampSecondArray::from(arr)) as ArrayRef,
2415 TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(arr)) as ArrayRef,
2416 TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(arr)) as ArrayRef,
2417 TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(arr)) as ArrayRef,
2418 }
2419 };
2420
2421 let input_ranged_data = vec![
2425 (
2426 PartitionRange {
2427 start: Timestamp::new(70, unit.into()),
2428 end: Timestamp::new(100, unit.into()),
2429 num_rows: 5,
2430 identifier: 0,
2431 },
2432 vec![
2433 DfRecordBatch::try_new(
2434 schema.clone(),
2435 vec![new_nullable_ts_array(
2436 unit,
2437 vec![Some(99), Some(98), None, Some(97), None],
2438 )],
2439 )
2440 .unwrap(),
2441 ],
2442 ),
2443 (
2444 PartitionRange {
2445 start: Timestamp::new(50, unit.into()),
2446 end: Timestamp::new(90, unit.into()),
2447 num_rows: 3,
2448 identifier: 1,
2449 },
2450 vec![
2451 DfRecordBatch::try_new(
2452 schema.clone(),
2453 vec![new_nullable_ts_array(
2454 unit,
2455 vec![Some(89), Some(88), Some(87)],
2456 )],
2457 )
2458 .unwrap(),
2459 ],
2460 ),
2461 ];
2462
2463 let expected_output = Some(
2467 DfRecordBatch::try_new(
2468 schema.clone(),
2469 vec![new_nullable_ts_array(unit, vec![None, None, Some(99)])],
2470 )
2471 .unwrap(),
2472 );
2473
2474 run_test(
2475 3000,
2476 input_ranged_data,
2477 schema.clone(),
2478 SortOptions {
2479 descending: true,
2480 nulls_first: true,
2481 },
2482 Some(3),
2483 expected_output,
2484 Some(8), )
2486 .await;
2487
2488 let input_ranged_data = vec![
2492 (
2493 PartitionRange {
2494 start: Timestamp::new(70, unit.into()),
2495 end: Timestamp::new(100, unit.into()),
2496 num_rows: 5,
2497 identifier: 0,
2498 },
2499 vec![
2500 DfRecordBatch::try_new(
2501 schema.clone(),
2502 vec![new_nullable_ts_array(
2503 unit,
2504 vec![Some(99), Some(98), Some(97), None, None],
2505 )],
2506 )
2507 .unwrap(),
2508 ],
2509 ),
2510 (
2511 PartitionRange {
2512 start: Timestamp::new(50, unit.into()),
2513 end: Timestamp::new(90, unit.into()),
2514 num_rows: 3,
2515 identifier: 1,
2516 },
2517 vec![
2518 DfRecordBatch::try_new(
2519 schema.clone(),
2520 vec![new_nullable_ts_array(
2521 unit,
2522 vec![Some(89), Some(88), Some(87)],
2523 )],
2524 )
2525 .unwrap(),
2526 ],
2527 ),
2528 ];
2529
2530 let expected_output = Some(
2534 DfRecordBatch::try_new(
2535 schema.clone(),
2536 vec![new_nullable_ts_array(
2537 unit,
2538 vec![Some(99), Some(98), Some(97)],
2539 )],
2540 )
2541 .unwrap(),
2542 );
2543
2544 run_test(
2545 3001,
2546 input_ranged_data,
2547 schema.clone(),
2548 SortOptions {
2549 descending: true,
2550 nulls_first: false,
2551 },
2552 Some(3),
2553 expected_output,
2554 Some(8), )
2556 .await;
2557 }
2558
2559 #[tokio::test]
2562 async fn test_early_stop_single_group() {
2563 let unit = TimeUnit::Millisecond;
2564 let schema = Arc::new(Schema::new(vec![Field::new(
2565 "ts",
2566 DataType::Timestamp(unit, None),
2567 false,
2568 )]));
2569
2570 let input_ranged_data = vec![
2572 (
2573 PartitionRange {
2574 start: Timestamp::new(70, unit.into()),
2575 end: Timestamp::new(100, unit.into()),
2576 num_rows: 6,
2577 identifier: 0,
2578 },
2579 vec![
2580 DfRecordBatch::try_new(
2581 schema.clone(),
2582 vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2583 )
2584 .unwrap(),
2585 ],
2586 ),
2587 (
2588 PartitionRange {
2589 start: Timestamp::new(50, unit.into()),
2590 end: Timestamp::new(100, unit.into()),
2591 num_rows: 3,
2592 identifier: 1,
2593 },
2594 vec![
2595 DfRecordBatch::try_new(
2596 schema.clone(),
2597 vec![new_ts_array(unit, vec![85, 86, 87])],
2598 )
2599 .unwrap(),
2600 ],
2601 ),
2602 ];
2603
2604 let expected_output = Some(
2607 DfRecordBatch::try_new(
2608 schema.clone(),
2609 vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2610 )
2611 .unwrap(),
2612 );
2613
2614 run_test(
2615 3002,
2616 input_ranged_data,
2617 schema.clone(),
2618 SortOptions {
2619 descending: true,
2620 ..Default::default()
2621 },
2622 Some(4),
2623 expected_output,
2624 Some(9), )
2626 .await;
2627 }
2628
2629 #[tokio::test]
2631 async fn test_early_stop_exact_boundary_equality() {
2632 let unit = TimeUnit::Millisecond;
2633 let schema = Arc::new(Schema::new(vec![Field::new(
2634 "ts",
2635 DataType::Timestamp(unit, None),
2636 false,
2637 )]));
2638
2639 let input_ranged_data = vec![
2643 (
2644 PartitionRange {
2645 start: Timestamp::new(70, unit.into()),
2646 end: Timestamp::new(100, unit.into()),
2647 num_rows: 4,
2648 identifier: 0,
2649 },
2650 vec![
2651 DfRecordBatch::try_new(
2652 schema.clone(),
2653 vec![new_ts_array(unit, vec![92, 91, 90, 89])],
2654 )
2655 .unwrap(),
2656 ],
2657 ),
2658 (
2659 PartitionRange {
2660 start: Timestamp::new(50, unit.into()),
2661 end: Timestamp::new(90, unit.into()),
2662 num_rows: 3,
2663 identifier: 1,
2664 },
2665 vec![
2666 DfRecordBatch::try_new(
2667 schema.clone(),
2668 vec![new_ts_array(unit, vec![88, 87, 86])],
2669 )
2670 .unwrap(),
2671 ],
2672 ),
2673 ];
2674
2675 let expected_output = Some(
2676 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![92, 91, 90])])
2677 .unwrap(),
2678 );
2679
2680 run_test(
2681 3003,
2682 input_ranged_data,
2683 schema.clone(),
2684 SortOptions {
2685 descending: true,
2686 ..Default::default()
2687 },
2688 Some(3),
2689 expected_output,
2690 Some(7), )
2692 .await;
2693
2694 let input_ranged_data = vec![
2698 (
2699 PartitionRange {
2700 start: Timestamp::new(10, unit.into()),
2701 end: Timestamp::new(50, unit.into()),
2702 num_rows: 4,
2703 identifier: 0,
2704 },
2705 vec![
2706 DfRecordBatch::try_new(
2707 schema.clone(),
2708 vec![new_ts_array(unit, vec![10, 15, 20, 25])],
2709 )
2710 .unwrap(),
2711 ],
2712 ),
2713 (
2714 PartitionRange {
2715 start: Timestamp::new(20, unit.into()),
2716 end: Timestamp::new(60, unit.into()),
2717 num_rows: 3,
2718 identifier: 1,
2719 },
2720 vec![
2721 DfRecordBatch::try_new(
2722 schema.clone(),
2723 vec![new_ts_array(unit, vec![21, 22, 23])],
2724 )
2725 .unwrap(),
2726 ],
2727 ),
2728 ];
2729
2730 let expected_output = Some(
2731 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![10, 15, 20])])
2732 .unwrap(),
2733 );
2734
2735 run_test(
2736 3004,
2737 input_ranged_data,
2738 schema.clone(),
2739 SortOptions {
2740 descending: false,
2741 ..Default::default()
2742 },
2743 Some(3),
2744 expected_output,
2745 Some(7), )
2747 .await;
2748 }
2749
2750 #[tokio::test]
2752 async fn test_early_stop_with_empty_partitions() {
2753 let unit = TimeUnit::Millisecond;
2754 let schema = Arc::new(Schema::new(vec![Field::new(
2755 "ts",
2756 DataType::Timestamp(unit, None),
2757 false,
2758 )]));
2759
2760 let input_ranged_data = vec![
2762 (
2763 PartitionRange {
2764 start: Timestamp::new(70, unit.into()),
2765 end: Timestamp::new(100, unit.into()),
2766 num_rows: 0,
2767 identifier: 0,
2768 },
2769 vec![
2770 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2772 .unwrap(),
2773 ],
2774 ),
2775 (
2776 PartitionRange {
2777 start: Timestamp::new(50, unit.into()),
2778 end: Timestamp::new(100, unit.into()),
2779 num_rows: 0,
2780 identifier: 1,
2781 },
2782 vec![
2783 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2785 .unwrap(),
2786 ],
2787 ),
2788 (
2789 PartitionRange {
2790 start: Timestamp::new(30, unit.into()),
2791 end: Timestamp::new(80, unit.into()),
2792 num_rows: 4,
2793 identifier: 2,
2794 },
2795 vec![
2796 DfRecordBatch::try_new(
2797 schema.clone(),
2798 vec![new_ts_array(unit, vec![74, 75, 76, 77])],
2799 )
2800 .unwrap(),
2801 ],
2802 ),
2803 (
2804 PartitionRange {
2805 start: Timestamp::new(10, unit.into()),
2806 end: Timestamp::new(60, unit.into()),
2807 num_rows: 3,
2808 identifier: 3,
2809 },
2810 vec![
2811 DfRecordBatch::try_new(
2812 schema.clone(),
2813 vec![new_ts_array(unit, vec![58, 59, 60])],
2814 )
2815 .unwrap(),
2816 ],
2817 ),
2818 ];
2819
2820 let expected_output = Some(
2823 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![77, 76])]).unwrap(),
2824 );
2825
2826 run_test(
2827 3005,
2828 input_ranged_data,
2829 schema.clone(),
2830 SortOptions {
2831 descending: true,
2832 ..Default::default()
2833 },
2834 Some(2),
2835 expected_output,
2836 Some(7), )
2838 .await;
2839
2840 let input_ranged_data = vec![
2842 (
2843 PartitionRange {
2844 start: Timestamp::new(70, unit.into()),
2845 end: Timestamp::new(100, unit.into()),
2846 num_rows: 4,
2847 identifier: 0,
2848 },
2849 vec![
2850 DfRecordBatch::try_new(
2851 schema.clone(),
2852 vec![new_ts_array(unit, vec![96, 97, 98, 99])],
2853 )
2854 .unwrap(),
2855 ],
2856 ),
2857 (
2858 PartitionRange {
2859 start: Timestamp::new(50, unit.into()),
2860 end: Timestamp::new(90, unit.into()),
2861 num_rows: 0,
2862 identifier: 1,
2863 },
2864 vec![
2865 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2867 .unwrap(),
2868 ],
2869 ),
2870 (
2871 PartitionRange {
2872 start: Timestamp::new(30, unit.into()),
2873 end: Timestamp::new(70, unit.into()),
2874 num_rows: 0,
2875 identifier: 2,
2876 },
2877 vec![
2878 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2880 .unwrap(),
2881 ],
2882 ),
2883 (
2884 PartitionRange {
2885 start: Timestamp::new(10, unit.into()),
2886 end: Timestamp::new(50, unit.into()),
2887 num_rows: 3,
2888 identifier: 3,
2889 },
2890 vec![
2891 DfRecordBatch::try_new(
2892 schema.clone(),
2893 vec![new_ts_array(unit, vec![48, 49, 50])],
2894 )
2895 .unwrap(),
2896 ],
2897 ),
2898 ];
2899
2900 let expected_output = Some(
2903 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98])]).unwrap(),
2904 );
2905
2906 run_test(
2907 3006,
2908 input_ranged_data,
2909 schema.clone(),
2910 SortOptions {
2911 descending: true,
2912 ..Default::default()
2913 },
2914 Some(2),
2915 expected_output,
2916 Some(7), )
2918 .await;
2919 }
2920
2921 #[tokio::test]
2925 async fn test_early_stop_check_update_dyn_filter() {
2926 let unit = TimeUnit::Millisecond;
2927 let schema = Arc::new(Schema::new(vec![Field::new(
2928 "ts",
2929 DataType::Timestamp(unit, None),
2930 false,
2931 )]));
2932
2933 let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
2934 let exec = PartSortExec::try_new(
2935 PhysicalSortExpr {
2936 expr: Arc::new(Column::new("ts", 0)),
2937 options: SortOptions {
2938 descending: false,
2939 ..Default::default()
2940 },
2941 },
2942 Some(3),
2943 vec![vec![
2944 PartitionRange {
2945 start: Timestamp::new(0, unit.into()),
2946 end: Timestamp::new(20, unit.into()),
2947 num_rows: 3,
2948 identifier: 1,
2949 },
2950 PartitionRange {
2951 start: Timestamp::new(10, unit.into()),
2952 end: Timestamp::new(30, unit.into()),
2953 num_rows: 3,
2954 identifier: 1,
2955 },
2956 ]],
2957 mock_input.clone(),
2958 )
2959 .unwrap();
2960
2961 let filter = exec.filter.clone().unwrap();
2962 let input_stream = mock_input
2963 .execute(0, Arc::new(TaskContext::default()))
2964 .unwrap();
2965 let mut stream = PartSortStream::new(
2966 Arc::new(TaskContext::default()),
2967 &exec,
2968 Some(3),
2969 input_stream,
2970 vec![],
2971 0,
2972 Some(filter.clone()),
2973 )
2974 .unwrap();
2975
2976 assert_eq!(filter.read().expr().snapshot_generation(), 1);
2978 let batch =
2979 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])])
2980 .unwrap();
2981 stream.push_buffer(batch).unwrap();
2982
2983 assert_eq!(filter.read().expr().snapshot_generation(), 2);
2985 assert!(!stream.can_stop_early(&schema).unwrap());
2986 assert_eq!(filter.read().expr().snapshot_generation(), 2);
2988
2989 let _ = stream.sort_top_buffer().unwrap();
2990
2991 let batch =
2992 DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])])
2993 .unwrap();
2994 stream.push_buffer(batch).unwrap();
2995 assert_eq!(filter.read().expr().snapshot_generation(), 2);
2997 let new = stream.sort_top_buffer().unwrap();
2998 assert_eq!(filter.read().expr().snapshot_generation(), 2);
3000
3001 assert_eq!(new.num_rows(), 0)
3003 }
3004}