query/
part_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Module for sorting input data within each [`PartitionRange`].
16//!
17//! This module defines the [`PartSortExec`] execution plan, which sorts each
18//! partition ([`PartitionRange`]) independently based on the provided physical
19//! sort expressions.
20
21use std::any::Any;
22use std::pin::Pin;
23use std::sync::Arc;
24use std::task::{Context, Poll};
25
26use arrow::array::{
27    ArrayRef, AsArray, TimestampMicrosecondArray, TimestampMillisecondArray,
28    TimestampNanosecondArray, TimestampSecondArray,
29};
30use arrow::compute::{concat, concat_batches, take_record_batch};
31use arrow_schema::{Schema, SchemaRef};
32use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
33use common_telemetry::warn;
34use common_time::Timestamp;
35use common_time::timestamp::TimeUnit;
36use datafusion::common::arrow::compute::sort_to_indices;
37use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation};
38use datafusion::execution::{RecordBatchStream, TaskContext};
39use datafusion::physical_plan::execution_plan::CardinalityEffect;
40use datafusion::physical_plan::filter_pushdown::{
41    ChildFilterDescription, FilterDescription, FilterPushdownPhase,
42};
43use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
44use datafusion::physical_plan::{
45    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, TopK,
46    TopKDynamicFilters,
47};
48use datafusion_common::tree_node::{Transformed, TreeNode};
49use datafusion_common::{DataFusionError, internal_err};
50use datafusion_physical_expr::expressions::{Column, DynamicFilterPhysicalExpr, lit};
51use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
52use futures::{Stream, StreamExt};
53use itertools::Itertools;
54use parking_lot::RwLock;
55use snafu::location;
56use store_api::region_engine::PartitionRange;
57
58use crate::error::Result;
59use crate::window_sort::check_partition_range_monotonicity;
60use crate::{array_iter_helper, downcast_ts_array};
61
62/// Get the primary end of a `PartitionRange` based on sort direction.
63///
64/// - Descending: primary end is `end` (we process highest values first)
65/// - Ascending: primary end is `start` (we process lowest values first)
66fn get_primary_end(range: &PartitionRange, descending: bool) -> Timestamp {
67    if descending { range.end } else { range.start }
68}
69
70/// Group consecutive ranges by their primary end value.
71///
72/// Returns a vector of (primary_end, start_idx_inclusive, end_idx_exclusive) tuples.
73/// Ranges with the same primary end MUST be processed together because they may
74/// overlap and contain values that belong to the same "top-k" result.
75fn group_ranges_by_primary_end(
76    ranges: &[PartitionRange],
77    descending: bool,
78) -> Vec<(Timestamp, usize, usize)> {
79    if ranges.is_empty() {
80        return vec![];
81    }
82
83    let mut groups = Vec::new();
84    let mut group_start = 0;
85    let mut current_primary_end = get_primary_end(&ranges[0], descending);
86
87    for (idx, range) in ranges.iter().enumerate().skip(1) {
88        let primary_end = get_primary_end(range, descending);
89        if primary_end != current_primary_end {
90            // End current group
91            groups.push((current_primary_end, group_start, idx));
92            // Start new group
93            group_start = idx;
94            current_primary_end = primary_end;
95        }
96    }
97    // Push the last group
98    groups.push((current_primary_end, group_start, ranges.len()));
99
100    groups
101}
102
103/// Sort input within given PartitionRange
104///
105/// Input is assumed to be segmented by empty RecordBatch, which indicates a new `PartitionRange` is starting
106///
107/// and this operator will sort each partition independently within the partition.
108#[derive(Debug, Clone)]
109pub struct PartSortExec {
110    /// Physical sort expressions(that is, sort by timestamp)
111    expression: PhysicalSortExpr,
112    limit: Option<usize>,
113    input: Arc<dyn ExecutionPlan>,
114    /// Execution metrics
115    metrics: ExecutionPlanMetricsSet,
116    partition_ranges: Vec<Vec<PartitionRange>>,
117    properties: PlanProperties,
118    /// Filter matching the state of the sort for dynamic filter pushdown.
119    /// If `limit` is `Some`, this will also be set and a TopK operator may be used.
120    /// If `limit` is `None`, this will be `None`.
121    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
122}
123
124impl PartSortExec {
125    pub fn try_new(
126        expression: PhysicalSortExpr,
127        limit: Option<usize>,
128        partition_ranges: Vec<Vec<PartitionRange>>,
129        input: Arc<dyn ExecutionPlan>,
130    ) -> Result<Self> {
131        check_partition_range_monotonicity(&partition_ranges, expression.options.descending)?;
132
133        let metrics = ExecutionPlanMetricsSet::new();
134        let properties = input.properties();
135        let properties = PlanProperties::new(
136            input.equivalence_properties().clone(),
137            input.output_partitioning().clone(),
138            properties.emission_type,
139            properties.boundedness,
140        );
141
142        let filter = limit
143            .is_some()
144            .then(|| Self::create_filter(expression.expr.clone()));
145
146        Ok(Self {
147            expression,
148            limit,
149            input,
150            metrics,
151            partition_ranges,
152            properties,
153            filter,
154        })
155    }
156
157    /// Add or reset `self.filter` to a new `TopKDynamicFilters`.
158    fn create_filter(expr: Arc<dyn PhysicalExpr>) -> Arc<RwLock<TopKDynamicFilters>> {
159        Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
160            DynamicFilterPhysicalExpr::new(vec![expr], lit(true)),
161        ))))
162    }
163
164    pub fn to_stream(
165        &self,
166        context: Arc<TaskContext>,
167        partition: usize,
168    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
169        let input_stream: DfSendableRecordBatchStream =
170            self.input.execute(partition, context.clone())?;
171
172        if partition >= self.partition_ranges.len() {
173            internal_err!(
174                "Partition index out of range: {} >= {} at {}",
175                partition,
176                self.partition_ranges.len(),
177                snafu::location!()
178            )?;
179        }
180
181        let df_stream = Box::pin(PartSortStream::new(
182            context,
183            self,
184            self.limit,
185            input_stream,
186            self.partition_ranges[partition].clone(),
187            partition,
188            self.filter.clone(),
189        )?) as _;
190
191        Ok(df_stream)
192    }
193}
194
195impl DisplayAs for PartSortExec {
196    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
197        write!(
198            f,
199            "PartSortExec: expr={} num_ranges={}",
200            self.expression,
201            self.partition_ranges.len(),
202        )?;
203        if let Some(limit) = self.limit {
204            write!(f, " limit={}", limit)?;
205        }
206        Ok(())
207    }
208}
209
210impl ExecutionPlan for PartSortExec {
211    fn name(&self) -> &str {
212        "PartSortExec"
213    }
214
215    fn as_any(&self) -> &dyn Any {
216        self
217    }
218
219    fn schema(&self) -> SchemaRef {
220        self.input.schema()
221    }
222
223    fn properties(&self) -> &PlanProperties {
224        &self.properties
225    }
226
227    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
228        vec![&self.input]
229    }
230
231    fn with_new_children(
232        self: Arc<Self>,
233        children: Vec<Arc<dyn ExecutionPlan>>,
234    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
235        let new_input = if let Some(first) = children.first() {
236            first
237        } else {
238            internal_err!("No children found")?
239        };
240        let new = Self::try_new(
241            self.expression.clone(),
242            self.limit,
243            self.partition_ranges.clone(),
244            new_input.clone(),
245        )?;
246        Ok(Arc::new(new))
247    }
248
249    fn execute(
250        &self,
251        partition: usize,
252        context: Arc<TaskContext>,
253    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
254        self.to_stream(context, partition)
255    }
256
257    fn metrics(&self) -> Option<MetricsSet> {
258        Some(self.metrics.clone_inner())
259    }
260
261    /// # Explain
262    ///
263    /// This plan needs to be executed on each partition independently,
264    /// and is expected to run directly on storage engine's output
265    /// distribution / partition.
266    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
267        vec![false]
268    }
269
270    fn cardinality_effect(&self) -> CardinalityEffect {
271        if self.limit.is_none() {
272            CardinalityEffect::Equal
273        } else {
274            CardinalityEffect::LowerEqual
275        }
276    }
277
278    fn gather_filters_for_pushdown(
279        &self,
280        phase: FilterPushdownPhase,
281        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
282        _config: &datafusion::config::ConfigOptions,
283    ) -> datafusion_common::Result<FilterDescription> {
284        if !matches!(phase, FilterPushdownPhase::Post) {
285            return FilterDescription::from_children(parent_filters, &self.children());
286        }
287
288        let mut child = ChildFilterDescription::from_child(&parent_filters, &self.input)?;
289
290        if let Some(filter) = &self.filter {
291            child = child.with_self_filter(filter.read().expr());
292        }
293
294        Ok(FilterDescription::new().with_child(child))
295    }
296
297    fn reset_state(self: Arc<Self>) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
298        // shared dynamic filter needs to be reset
299        let new_filter = self
300            .limit
301            .is_some()
302            .then(|| Self::create_filter(self.expression.expr.clone()));
303
304        Ok(Arc::new(Self {
305            expression: self.expression.clone(),
306            limit: self.limit,
307            input: self.input.clone(),
308            metrics: self.metrics.clone(),
309            partition_ranges: self.partition_ranges.clone(),
310            properties: self.properties.clone(),
311            filter: new_filter,
312        }))
313    }
314}
315
316enum PartSortBuffer {
317    All(Vec<DfRecordBatch>),
318    /// TopK buffer with row count.
319    ///
320    /// Given this heap only keeps k element, the capacity of this buffer
321    /// is not accurate, and is only used for empty check.
322    Top(TopK, usize),
323}
324
325impl PartSortBuffer {
326    pub fn is_empty(&self) -> bool {
327        match self {
328            PartSortBuffer::All(v) => v.is_empty(),
329            PartSortBuffer::Top(_, cnt) => *cnt == 0,
330        }
331    }
332}
333
334struct PartSortStream {
335    /// Memory pool for this stream
336    reservation: MemoryReservation,
337    buffer: PartSortBuffer,
338    expression: PhysicalSortExpr,
339    limit: Option<usize>,
340    input: DfSendableRecordBatchStream,
341    input_complete: bool,
342    schema: SchemaRef,
343    partition_ranges: Vec<PartitionRange>,
344    #[allow(dead_code)] // this is used under #[debug_assertions]
345    partition: usize,
346    cur_part_idx: usize,
347    evaluating_batch: Option<DfRecordBatch>,
348    metrics: BaselineMetrics,
349    context: Arc<TaskContext>,
350    root_metrics: ExecutionPlanMetricsSet,
351    /// Groups of ranges by primary end: (primary_end, start_idx_inclusive, end_idx_exclusive).
352    /// Ranges in the same group must be processed together before outputting results.
353    range_groups: Vec<(Timestamp, usize, usize)>,
354    /// Current group being processed (index into range_groups).
355    cur_group_idx: usize,
356    /// Dynamic Filter for all TopK instance, notice the `PartSortExec`/`PartSortStream`/`TopK` must share the same filter
357    /// so that updates from each `TopK` can be seen by others(and by the table scan operator).
358    filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
359}
360
361impl PartSortStream {
362    fn new(
363        context: Arc<TaskContext>,
364        sort: &PartSortExec,
365        limit: Option<usize>,
366        input: DfSendableRecordBatchStream,
367        partition_ranges: Vec<PartitionRange>,
368        partition: usize,
369        filter: Option<Arc<RwLock<TopKDynamicFilters>>>,
370    ) -> datafusion_common::Result<Self> {
371        let buffer = if let Some(limit) = limit {
372            let Some(filter) = filter.clone() else {
373                return internal_err!(
374                    "TopKDynamicFilters must be provided when limit is set at {}",
375                    snafu::location!()
376                );
377            };
378
379            PartSortBuffer::Top(
380                TopK::try_new(
381                    partition,
382                    sort.schema().clone(),
383                    vec![],
384                    [sort.expression.clone()].into(),
385                    limit,
386                    context.session_config().batch_size(),
387                    context.runtime_env(),
388                    &sort.metrics,
389                    filter.clone(),
390                )?,
391                0,
392            )
393        } else {
394            PartSortBuffer::All(Vec::new())
395        };
396
397        // Compute range groups by primary end
398        let descending = sort.expression.options.descending;
399        let range_groups = group_ranges_by_primary_end(&partition_ranges, descending);
400
401        Ok(Self {
402            reservation: MemoryConsumer::new("PartSortStream".to_string())
403                .register(&context.runtime_env().memory_pool),
404            buffer,
405            expression: sort.expression.clone(),
406            limit,
407            input,
408            input_complete: false,
409            schema: sort.input.schema(),
410            partition_ranges,
411            partition,
412            cur_part_idx: 0,
413            evaluating_batch: None,
414            metrics: BaselineMetrics::new(&sort.metrics, partition),
415            context,
416            root_metrics: sort.metrics.clone(),
417            range_groups,
418            cur_group_idx: 0,
419            filter,
420        })
421    }
422}
423
424macro_rules! array_check_helper {
425    ($t:ty, $unit:expr, $arr:expr, $cur_range:expr, $min_max_idx:expr) => {{
426            if $cur_range.start.unit().as_arrow_time_unit() != $unit
427            || $cur_range.end.unit().as_arrow_time_unit() != $unit
428        {
429            internal_err!(
430                "PartitionRange unit mismatch, expect {:?}, found {:?}",
431                $cur_range.start.unit(),
432                $unit
433            )?;
434        }
435        let arr = $arr
436            .as_any()
437            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
438            .unwrap();
439
440        let min = arr.value($min_max_idx.0);
441        let max = arr.value($min_max_idx.1);
442        let (min, max) = if min < max{
443            (min, max)
444        } else {
445            (max, min)
446        };
447        let cur_min = $cur_range.start.value();
448        let cur_max = $cur_range.end.value();
449        // note that PartitionRange is left inclusive and right exclusive
450        if !(min >= cur_min && max < cur_max) {
451            internal_err!(
452                "Sort column min/max value out of partition range: sort_column.min_max=[{:?}, {:?}] not in PartitionRange=[{:?}, {:?}]",
453                min,
454                max,
455                cur_min,
456                cur_max
457            )?;
458        }
459    }};
460}
461
462impl PartSortStream {
463    /// check whether the sort column's min/max value is within the current group's effective range.
464    /// For group-based processing, data from multiple ranges with the same primary end
465    /// is accumulated together, so we check against the union of all ranges in the group.
466    fn check_in_range(
467        &self,
468        sort_column: &ArrayRef,
469        min_max_idx: (usize, usize),
470    ) -> datafusion_common::Result<()> {
471        // Use the group's effective range instead of the current partition range
472        let Some(cur_range) = self.get_current_group_effective_range() else {
473            internal_err!(
474                "No effective range for current group {} at {}",
475                self.cur_group_idx,
476                snafu::location!()
477            )?
478        };
479
480        downcast_ts_array!(
481            sort_column.data_type() => (array_check_helper, sort_column, cur_range, min_max_idx),
482            _ => internal_err!(
483                "Unsupported data type for sort column: {:?}",
484                sort_column.data_type()
485            )?,
486        );
487
488        Ok(())
489    }
490
491    /// Try find data whose value exceeds the current partition range.
492    ///
493    /// Returns `None` if no such data is found, and `Some(idx)` where idx points to
494    /// the first data that exceeds the current partition range.
495    fn try_find_next_range(
496        &self,
497        sort_column: &ArrayRef,
498    ) -> datafusion_common::Result<Option<usize>> {
499        if sort_column.is_empty() {
500            return Ok(None);
501        }
502
503        // check if the current partition index is out of range
504        if self.cur_part_idx >= self.partition_ranges.len() {
505            internal_err!(
506                "Partition index out of range: {} >= {} at {}",
507                self.cur_part_idx,
508                self.partition_ranges.len(),
509                snafu::location!()
510            )?;
511        }
512        let cur_range = self.partition_ranges[self.cur_part_idx];
513
514        let sort_column_iter = downcast_ts_array!(
515            sort_column.data_type() => (array_iter_helper, sort_column),
516            _ => internal_err!(
517                "Unsupported data type for sort column: {:?}",
518                sort_column.data_type()
519            )?,
520        );
521
522        for (idx, val) in sort_column_iter {
523            // ignore vacant time index data
524            if let Some(val) = val
525                && (val >= cur_range.end.value() || val < cur_range.start.value())
526            {
527                return Ok(Some(idx));
528            }
529        }
530
531        Ok(None)
532    }
533
534    fn push_buffer(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
535        match &mut self.buffer {
536            PartSortBuffer::All(v) => v.push(batch),
537            PartSortBuffer::Top(top, cnt) => {
538                *cnt += batch.num_rows();
539                top.insert_batch(batch)?;
540            }
541        }
542
543        Ok(())
544    }
545
546    /// Stop read earlier when current group do not overlap with any of those next group
547    /// If not overlap, we can stop read further input as current top k is final
548    /// Use dynamic filter to evaluate the next group's primary end
549    fn can_stop_early(&mut self, schema: &Arc<Schema>) -> datafusion_common::Result<bool> {
550        let topk_cnt = match &self.buffer {
551            PartSortBuffer::Top(_, cnt) => *cnt,
552            _ => return Ok(false),
553        };
554        // not fulfill topk yet
555        if Some(topk_cnt) < self.limit {
556            return Ok(false);
557        }
558        let next_group_primary_end = if self.cur_group_idx + 1 < self.range_groups.len() {
559            self.range_groups[self.cur_group_idx + 1].0
560        } else {
561            // no next group
562            return Ok(false);
563        };
564
565        // dyn filter is updated based on the last value of topk heap("threshold")
566        // it's a max-heap for a ASC TopK operator
567        // so can use dyn filter to prune data range
568        let filter = self
569            .filter
570            .as_ref()
571            .expect("TopKDynamicFilters must be provided when limit is set");
572        let filter = filter.read().expr().current()?;
573        let mut ts_index = None;
574        // invariant: the filter must contain only the same column expr that's time index column
575        let filter = filter
576            .transform_down(|c| {
577                // rewrite all column's index as 0
578                if let Some(column) = c.as_any().downcast_ref::<Column>() {
579                    ts_index = Some(column.index());
580                    Ok(Transformed::yes(
581                        Arc::new(Column::new(column.name(), 0)) as Arc<dyn PhysicalExpr>
582                    ))
583                } else {
584                    Ok(Transformed::no(c))
585                }
586            })?
587            .data;
588        let Some(ts_index) = ts_index else {
589            return Ok(false); // dyn filter is still true, cannot decide, continue read
590        };
591        let field = if schema.fields().len() <= ts_index {
592            warn!(
593                "Schema mismatch when evaluating dynamic filter for PartSortExec at {}, schema: {:?}, ts_index: {}",
594                self.partition, schema, ts_index
595            );
596            return Ok(false); // schema mismatch, cannot decide, continue read
597        } else {
598            schema.field(ts_index)
599        };
600        let schema = Arc::new(Schema::new(vec![field.clone()]));
601        // convert next_group_primary_end to array&filter, if eval to false, means no overlap, can stop early
602        let primary_end_array = match next_group_primary_end.unit() {
603            TimeUnit::Second => Arc::new(TimestampSecondArray::from(vec![
604                next_group_primary_end.value(),
605            ])) as ArrayRef,
606            TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(vec![
607                next_group_primary_end.value(),
608            ])) as ArrayRef,
609            TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(vec![
610                next_group_primary_end.value(),
611            ])) as ArrayRef,
612            TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(vec![
613                next_group_primary_end.value(),
614            ])) as ArrayRef,
615        };
616        let primary_end_batch = DfRecordBatch::try_new(schema, vec![primary_end_array])?;
617        let res = filter.evaluate(&primary_end_batch)?;
618        let array = res.into_array(primary_end_batch.num_rows())?;
619        let filter = array.as_boolean().clone();
620        let overlap = filter.iter().next().flatten();
621        if let Some(false) = overlap {
622            Ok(true)
623        } else {
624            Ok(false)
625        }
626    }
627
628    /// Check if the given partition index is within the current group.
629    fn is_in_current_group(&self, part_idx: usize) -> bool {
630        if self.cur_group_idx >= self.range_groups.len() {
631            return false;
632        }
633        let (_, start, end) = self.range_groups[self.cur_group_idx];
634        part_idx >= start && part_idx < end
635    }
636
637    /// Advance to the next group. Returns true if there is a next group.
638    fn advance_to_next_group(&mut self) -> bool {
639        self.cur_group_idx += 1;
640        self.cur_group_idx < self.range_groups.len()
641    }
642
643    /// Get the effective range for the current group.
644    /// For a group of ranges with the same primary end, the effective range is
645    /// the union of all ranges in the group.
646    fn get_current_group_effective_range(&self) -> Option<PartitionRange> {
647        if self.cur_group_idx >= self.range_groups.len() {
648            return None;
649        }
650        let (_, start_idx, end_idx) = self.range_groups[self.cur_group_idx];
651        if start_idx >= end_idx || start_idx >= self.partition_ranges.len() {
652            return None;
653        }
654
655        let ranges_in_group =
656            &self.partition_ranges[start_idx..end_idx.min(self.partition_ranges.len())];
657        if ranges_in_group.is_empty() {
658            return None;
659        }
660
661        // Compute union of all ranges in the group
662        let mut min_start = ranges_in_group[0].start;
663        let mut max_end = ranges_in_group[0].end;
664        for range in ranges_in_group.iter().skip(1) {
665            if range.start < min_start {
666                min_start = range.start;
667            }
668            if range.end > max_end {
669                max_end = range.end;
670            }
671        }
672
673        Some(PartitionRange {
674            start: min_start,
675            end: max_end,
676            num_rows: 0,   // Not used for validation
677            identifier: 0, // Not used for validation
678        })
679    }
680
681    /// Sort and clear the buffer and return the sorted record batch
682    ///
683    /// this function will return a empty record batch if the buffer is empty
684    fn sort_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
685        match &mut self.buffer {
686            PartSortBuffer::All(_) => self.sort_all_buffer(),
687            PartSortBuffer::Top(_, _) => self.sort_top_buffer(),
688        }
689    }
690
691    /// Internal method for sorting `All` buffer (without limit).
692    fn sort_all_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
693        let PartSortBuffer::All(buffer) =
694            std::mem::replace(&mut self.buffer, PartSortBuffer::All(Vec::new()))
695        else {
696            unreachable!("buffer type is checked before and should be All variant")
697        };
698
699        if buffer.is_empty() {
700            return Ok(DfRecordBatch::new_empty(self.schema.clone()));
701        }
702        let mut sort_columns = Vec::with_capacity(buffer.len());
703        let mut opt = None;
704        for batch in buffer.iter() {
705            let sort_column = self.expression.evaluate_to_sort_column(batch)?;
706            opt = opt.or(sort_column.options);
707            sort_columns.push(sort_column.values);
708        }
709
710        let sort_column =
711            concat(&sort_columns.iter().map(|a| a.as_ref()).collect_vec()).map_err(|e| {
712                DataFusionError::ArrowError(
713                    Box::new(e),
714                    Some(format!("Fail to concat sort columns at {}", location!())),
715                )
716            })?;
717
718        let indices = sort_to_indices(&sort_column, opt, self.limit).map_err(|e| {
719            DataFusionError::ArrowError(
720                Box::new(e),
721                Some(format!("Fail to sort to indices at {}", location!())),
722            )
723        })?;
724        if indices.is_empty() {
725            return Ok(DfRecordBatch::new_empty(self.schema.clone()));
726        }
727
728        self.check_in_range(
729            &sort_column,
730            (
731                indices.value(0) as usize,
732                indices.value(indices.len() - 1) as usize,
733            ),
734        )
735        .inspect_err(|_e| {
736            #[cfg(debug_assertions)]
737            common_telemetry::error!(
738                "Fail to check sort column in range at {}, current_idx: {}, num_rows: {}, err: {}",
739                self.partition,
740                self.cur_part_idx,
741                sort_column.len(),
742                _e
743            );
744        })?;
745
746        // reserve memory for the concat input and sorted output
747        let total_mem: usize = buffer.iter().map(|r| r.get_array_memory_size()).sum();
748        self.reservation.try_grow(total_mem * 2)?;
749
750        let full_input = concat_batches(&self.schema, &buffer).map_err(|e| {
751            DataFusionError::ArrowError(
752                Box::new(e),
753                Some(format!(
754                    "Fail to concat input batches when sorting at {}",
755                    location!()
756                )),
757            )
758        })?;
759
760        let sorted = take_record_batch(&full_input, &indices).map_err(|e| {
761            DataFusionError::ArrowError(
762                Box::new(e),
763                Some(format!(
764                    "Fail to take result record batch when sorting at {}",
765                    location!()
766                )),
767            )
768        })?;
769
770        drop(full_input);
771        // here remove both buffer and full_input memory
772        self.reservation.shrink(2 * total_mem);
773        Ok(sorted)
774    }
775
776    /// Internal method for sorting `Top` buffer (with limit).
777    fn sort_top_buffer(&mut self) -> datafusion_common::Result<DfRecordBatch> {
778        let Some(filter) = self.filter.clone() else {
779            return internal_err!(
780                "TopKDynamicFilters must be provided when sorting with limit at {}",
781                snafu::location!()
782            );
783        };
784
785        let new_top_buffer = TopK::try_new(
786            self.partition,
787            self.schema().clone(),
788            vec![],
789            [self.expression.clone()].into(),
790            self.limit.unwrap(),
791            self.context.session_config().batch_size(),
792            self.context.runtime_env(),
793            &self.root_metrics,
794            filter,
795        )?;
796        let PartSortBuffer::Top(top_k, _) =
797            std::mem::replace(&mut self.buffer, PartSortBuffer::Top(new_top_buffer, 0))
798        else {
799            unreachable!("buffer type is checked before and should be Top variant")
800        };
801
802        let mut result_stream = top_k.emit()?;
803        let mut placeholder_ctx = std::task::Context::from_waker(futures::task::noop_waker_ref());
804        let mut results = vec![];
805        // according to the current implementation of `TopK`, the result stream will always be ready
806        loop {
807            match result_stream.poll_next_unpin(&mut placeholder_ctx) {
808                Poll::Ready(Some(batch)) => {
809                    let batch = batch?;
810                    results.push(batch);
811                }
812                Poll::Pending => {
813                    #[cfg(debug_assertions)]
814                    unreachable!("TopK result stream should always be ready")
815                }
816                Poll::Ready(None) => {
817                    break;
818                }
819            }
820        }
821
822        let concat_batch = concat_batches(&self.schema, &results).map_err(|e| {
823            DataFusionError::ArrowError(
824                Box::new(e),
825                Some(format!(
826                    "Fail to concat top k result record batch when sorting at {}",
827                    location!()
828                )),
829            )
830        })?;
831
832        Ok(concat_batch)
833    }
834
835    /// Sorts current buffer and returns `None` when there is nothing to emit.
836    fn sorted_buffer_if_non_empty(&mut self) -> datafusion_common::Result<Option<DfRecordBatch>> {
837        if self.buffer.is_empty() {
838            return Ok(None);
839        }
840
841        let sorted = self.sort_buffer()?;
842        if sorted.num_rows() == 0 {
843            Ok(None)
844        } else {
845            Ok(Some(sorted))
846        }
847    }
848
849    /// Try to split the input batch if it contains data that exceeds the current partition range.
850    ///
851    /// When the input batch contains data that exceeds the current partition range, this function
852    /// will split the input batch into two parts, the first part is within the current partition
853    /// range will be merged and sorted with previous buffer, and the second part will be registered
854    /// to `evaluating_batch` for next polling.
855    ///
856    /// **Group-based processing**: Ranges with the same primary end are grouped together.
857    /// We only sort and output when transitioning to a NEW group, not when moving between
858    /// ranges within the same group.
859    ///
860    /// Returns `None` if the input batch is empty or fully within the current partition range
861    /// (or we're still collecting data within the same group), and `Some(batch)` when we've
862    /// completed a group and have sorted output. When operating in TopK (limit) mode, this
863    /// function will not emit intermediate batches; it only prepares state for a single final
864    /// output.
865    fn split_batch(
866        &mut self,
867        batch: DfRecordBatch,
868    ) -> datafusion_common::Result<Option<DfRecordBatch>> {
869        if matches!(self.buffer, PartSortBuffer::Top(_, _)) {
870            self.split_batch_topk(batch)?;
871            return Ok(None);
872        }
873
874        self.split_batch_all(batch)
875    }
876
877    /// Specialized splitting logic for TopK (limit) mode.
878    ///
879    /// We only emit once when the TopK buffer is fulfilled or when input is fully consumed.
880    /// When the buffer is fulfilled and we are about to enter a new group, we stop consuming
881    /// further ranges.
882    fn split_batch_topk(&mut self, batch: DfRecordBatch) -> datafusion_common::Result<()> {
883        if batch.num_rows() == 0 {
884            return Ok(());
885        }
886
887        let sort_column = self
888            .expression
889            .expr
890            .evaluate(&batch)?
891            .into_array(batch.num_rows())?;
892
893        let next_range_idx = self.try_find_next_range(&sort_column)?;
894        let Some(idx) = next_range_idx else {
895            self.push_buffer(batch)?;
896            // keep polling input for next batch
897            return Ok(());
898        };
899
900        let this_range = batch.slice(0, idx);
901        let remaining_range = batch.slice(idx, batch.num_rows() - idx);
902        if this_range.num_rows() != 0 {
903            self.push_buffer(this_range)?;
904        }
905
906        // Step to next proper PartitionRange
907        self.cur_part_idx += 1;
908
909        // If we've processed all partitions, mark completion.
910        if self.cur_part_idx >= self.partition_ranges.len() {
911            debug_assert!(remaining_range.num_rows() == 0);
912            self.input_complete = true;
913            return Ok(());
914        }
915
916        // Check if we're still in the same group
917        let in_same_group = self.is_in_current_group(self.cur_part_idx);
918
919        // When TopK is fulfilled and we are switching to a new group, stop consuming further ranges if possible.
920        // read from topk heap and determine whether we can stop earlier.
921        if !in_same_group && self.can_stop_early(&batch.schema())? {
922            self.input_complete = true;
923            self.evaluating_batch = None;
924            return Ok(());
925        }
926
927        // Transition to a new group if needed
928        if !in_same_group {
929            self.advance_to_next_group();
930        }
931
932        let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
933        if self.try_find_next_range(&next_sort_column)?.is_some() {
934            // remaining batch still contains data that exceeds the current partition range
935            // register the remaining batch for next polling
936            self.evaluating_batch = Some(remaining_range);
937        } else if remaining_range.num_rows() != 0 {
938            // remaining batch is within the current partition range
939            // push to the buffer and continue polling
940            self.push_buffer(remaining_range)?;
941        }
942
943        Ok(())
944    }
945
946    fn split_batch_all(
947        &mut self,
948        batch: DfRecordBatch,
949    ) -> datafusion_common::Result<Option<DfRecordBatch>> {
950        if batch.num_rows() == 0 {
951            return Ok(None);
952        }
953
954        let sort_column = self
955            .expression
956            .expr
957            .evaluate(&batch)?
958            .into_array(batch.num_rows())?;
959
960        let next_range_idx = self.try_find_next_range(&sort_column)?;
961        let Some(idx) = next_range_idx else {
962            self.push_buffer(batch)?;
963            // keep polling input for next batch
964            return Ok(None);
965        };
966
967        let this_range = batch.slice(0, idx);
968        let remaining_range = batch.slice(idx, batch.num_rows() - idx);
969        if this_range.num_rows() != 0 {
970            self.push_buffer(this_range)?;
971        }
972
973        // Step to next proper PartitionRange
974        self.cur_part_idx += 1;
975
976        // If we've processed all partitions, sort and output
977        if self.cur_part_idx >= self.partition_ranges.len() {
978            // assert there is no data beyond the last partition range (remaining is empty).
979            debug_assert!(remaining_range.num_rows() == 0);
980
981            // Sort and output the final group
982            return self.sorted_buffer_if_non_empty();
983        }
984
985        // Check if we're still in the same group
986        if self.is_in_current_group(self.cur_part_idx) {
987            // Same group - don't sort yet, keep collecting
988            let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
989            if self.try_find_next_range(&next_sort_column)?.is_some() {
990                // remaining batch still contains data that exceeds the current partition range
991                self.evaluating_batch = Some(remaining_range);
992            } else {
993                // remaining batch is within the current partition range
994                if remaining_range.num_rows() != 0 {
995                    self.push_buffer(remaining_range)?;
996                }
997            }
998            // Return None to continue collecting within the same group
999            return Ok(None);
1000        }
1001
1002        // Transitioning to a new group - sort current group and output
1003        let sorted_batch = self.sorted_buffer_if_non_empty()?;
1004        self.advance_to_next_group();
1005
1006        let next_sort_column = sort_column.slice(idx, batch.num_rows() - idx);
1007        if self.try_find_next_range(&next_sort_column)?.is_some() {
1008            // remaining batch still contains data that exceeds the current partition range
1009            // register the remaining batch for next polling
1010            self.evaluating_batch = Some(remaining_range);
1011        } else {
1012            // remaining batch is within the current partition range
1013            // push to the buffer and continue polling
1014            if remaining_range.num_rows() != 0 {
1015                self.push_buffer(remaining_range)?;
1016            }
1017        }
1018
1019        Ok(sorted_batch)
1020    }
1021
1022    pub fn poll_next_inner(
1023        mut self: Pin<&mut Self>,
1024        cx: &mut Context<'_>,
1025    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
1026        loop {
1027            if self.input_complete {
1028                if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
1029                    return Poll::Ready(Some(Ok(sorted_batch)));
1030                }
1031                return Poll::Ready(None);
1032            }
1033
1034            // if there is a remaining batch being evaluated from last run,
1035            // split on it instead of fetching new batch
1036            if let Some(evaluating_batch) = self.evaluating_batch.take()
1037                && evaluating_batch.num_rows() != 0
1038            {
1039                // Check if we've already processed all partitions
1040                if self.cur_part_idx >= self.partition_ranges.len() {
1041                    // All partitions processed, discard remaining data
1042                    if let Some(sorted_batch) = self.sorted_buffer_if_non_empty()? {
1043                        return Poll::Ready(Some(Ok(sorted_batch)));
1044                    }
1045                    return Poll::Ready(None);
1046                }
1047
1048                if let Some(sorted_batch) = self.split_batch(evaluating_batch)? {
1049                    return Poll::Ready(Some(Ok(sorted_batch)));
1050                }
1051                continue;
1052            }
1053
1054            // fetch next batch from input
1055            let res = self.input.as_mut().poll_next(cx);
1056            match res {
1057                Poll::Ready(Some(Ok(batch))) => {
1058                    if let Some(sorted_batch) = self.split_batch(batch)? {
1059                        return Poll::Ready(Some(Ok(sorted_batch)));
1060                    }
1061                }
1062                // input stream end, mark and continue
1063                Poll::Ready(None) => {
1064                    self.input_complete = true;
1065                }
1066                Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
1067                Poll::Pending => return Poll::Pending,
1068            }
1069        }
1070    }
1071}
1072
1073impl Stream for PartSortStream {
1074    type Item = datafusion_common::Result<DfRecordBatch>;
1075
1076    fn poll_next(
1077        mut self: Pin<&mut Self>,
1078        cx: &mut Context<'_>,
1079    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
1080        let result = self.as_mut().poll_next_inner(cx);
1081        self.metrics.record_poll(result)
1082    }
1083}
1084
1085impl RecordBatchStream for PartSortStream {
1086    fn schema(&self) -> SchemaRef {
1087        self.schema.clone()
1088    }
1089}
1090
1091#[cfg(test)]
1092mod test {
1093    use std::sync::Arc;
1094
1095    use arrow::array::{
1096        TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
1097        TimestampSecondArray,
1098    };
1099    use arrow::json::ArrayWriter;
1100    use arrow_schema::{DataType, Field, Schema, SortOptions, TimeUnit};
1101    use common_time::Timestamp;
1102    use datafusion_physical_expr::expressions::Column;
1103    use futures::StreamExt;
1104    use store_api::region_engine::PartitionRange;
1105
1106    use super::*;
1107    use crate::test_util::{MockInputExec, new_ts_array};
1108
1109    #[tokio::test]
1110    async fn test_can_stop_early_with_empty_topk_buffer() {
1111        let unit = TimeUnit::Millisecond;
1112        let schema = Arc::new(Schema::new(vec![Field::new(
1113            "ts",
1114            DataType::Timestamp(unit, None),
1115            false,
1116        )]));
1117
1118        // Build a minimal PartSortExec and stream, but inject a dynamic filter that
1119        // always evaluates to false so TopK will filter out all rows internally.
1120        let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
1121        let exec = PartSortExec::try_new(
1122            PhysicalSortExpr {
1123                expr: Arc::new(Column::new("ts", 0)),
1124                options: SortOptions {
1125                    descending: true,
1126                    ..Default::default()
1127                },
1128            },
1129            Some(3),
1130            vec![vec![]],
1131            mock_input.clone(),
1132        )
1133        .unwrap();
1134
1135        let filter = Arc::new(RwLock::new(TopKDynamicFilters::new(Arc::new(
1136            DynamicFilterPhysicalExpr::new(vec![], lit(false)),
1137        ))));
1138
1139        let input_stream = mock_input
1140            .execute(0, Arc::new(TaskContext::default()))
1141            .unwrap();
1142        let mut stream = PartSortStream::new(
1143            Arc::new(TaskContext::default()),
1144            &exec,
1145            Some(3),
1146            input_stream,
1147            vec![],
1148            0,
1149            Some(filter),
1150        )
1151        .unwrap();
1152
1153        // Push 3 rows so the external counter reaches `limit`, while TopK keeps no rows.
1154        let batch = DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1155            .unwrap();
1156        stream.push_buffer(batch).unwrap();
1157
1158        // The TopK result buffer is empty, so we cannot determine early-stop.
1159        // Ensure this path returns `Ok(false)` (and, importantly, does not panic).
1160        assert!(!stream.can_stop_early(&schema).unwrap());
1161    }
1162
1163    #[ignore = "hard to gen expected data correctly here, TODO(discord9): fix it later"]
1164    #[tokio::test]
1165    async fn fuzzy_test() {
1166        let test_cnt = 100;
1167        // bound for total count of PartitionRange
1168        let part_cnt_bound = 100;
1169        // bound for timestamp range size and offset for each PartitionRange
1170        let range_size_bound = 100;
1171        let range_offset_bound = 100;
1172        // bound for batch count and size within each PartitionRange
1173        let batch_cnt_bound = 20;
1174        let batch_size_bound = 100;
1175
1176        let mut rng = fastrand::Rng::new();
1177        rng.seed(1337);
1178
1179        let mut test_cases = Vec::new();
1180
1181        for case_id in 0..test_cnt {
1182            let mut bound_val: Option<i64> = None;
1183            let descending = rng.bool();
1184            let nulls_first = rng.bool();
1185            let opt = SortOptions {
1186                descending,
1187                nulls_first,
1188            };
1189            let limit = if rng.bool() {
1190                Some(rng.usize(1..batch_cnt_bound * batch_size_bound))
1191            } else {
1192                None
1193            };
1194            let unit = match rng.u8(0..3) {
1195                0 => TimeUnit::Second,
1196                1 => TimeUnit::Millisecond,
1197                2 => TimeUnit::Microsecond,
1198                _ => TimeUnit::Nanosecond,
1199            };
1200
1201            let schema = Schema::new(vec![Field::new(
1202                "ts",
1203                DataType::Timestamp(unit, None),
1204                false,
1205            )]);
1206            let schema = Arc::new(schema);
1207
1208            let mut input_ranged_data = vec![];
1209            let mut output_ranges = vec![];
1210            let mut output_data = vec![];
1211            // generate each input `PartitionRange`
1212            for part_id in 0..rng.usize(0..part_cnt_bound) {
1213                // generate each `PartitionRange`'s timestamp range
1214                let (start, end) = if descending {
1215                    // Use 1..=range_offset_bound to ensure strictly decreasing end values
1216                    let end = bound_val
1217                        .map(
1218                            |i| i
1219                            .checked_sub(rng.i64(1..=range_offset_bound))
1220                            .expect("Bad luck, fuzzy test generate data that will overflow, change seed and try again")
1221                        )
1222                        .unwrap_or_else(|| rng.i64(-100000000..100000000));
1223                    bound_val = Some(end);
1224                    let start = end - rng.i64(1..range_size_bound);
1225                    let start = Timestamp::new(start, unit.into());
1226                    let end = Timestamp::new(end, unit.into());
1227                    (start, end)
1228                } else {
1229                    // Use 1..=range_offset_bound to ensure strictly increasing start values
1230                    let start = bound_val
1231                        .map(|i| i + rng.i64(1..=range_offset_bound))
1232                        .unwrap_or_else(|| rng.i64(..));
1233                    bound_val = Some(start);
1234                    let end = start + rng.i64(1..range_size_bound);
1235                    let start = Timestamp::new(start, unit.into());
1236                    let end = Timestamp::new(end, unit.into());
1237                    (start, end)
1238                };
1239                assert!(start < end);
1240
1241                let mut per_part_sort_data = vec![];
1242                let mut batches = vec![];
1243                for _batch_idx in 0..rng.usize(1..batch_cnt_bound) {
1244                    let cnt = rng.usize(0..batch_size_bound) + 1;
1245                    let iter = 0..rng.usize(0..cnt);
1246                    let mut data_gen = iter
1247                        .map(|_| rng.i64(start.value()..end.value()))
1248                        .collect_vec();
1249                    if data_gen.is_empty() {
1250                        // current batch is empty, skip
1251                        continue;
1252                    }
1253                    // mito always sort on ASC order
1254                    data_gen.sort();
1255                    per_part_sort_data.extend(data_gen.clone());
1256                    let arr = new_ts_array(unit, data_gen.clone());
1257                    let batch = DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap();
1258                    batches.push(batch);
1259                }
1260
1261                let range = PartitionRange {
1262                    start,
1263                    end,
1264                    num_rows: batches.iter().map(|b| b.num_rows()).sum(),
1265                    identifier: part_id,
1266                };
1267                input_ranged_data.push((range, batches));
1268
1269                output_ranges.push(range);
1270                if per_part_sort_data.is_empty() {
1271                    continue;
1272                }
1273                output_data.extend_from_slice(&per_part_sort_data);
1274            }
1275
1276            // adjust output data with adjacent PartitionRanges
1277            let mut output_data_iter = output_data.iter().peekable();
1278            let mut output_data = vec![];
1279            for range in output_ranges.clone() {
1280                let mut cur_data = vec![];
1281                while let Some(val) = output_data_iter.peek() {
1282                    if **val < range.start.value() || **val >= range.end.value() {
1283                        break;
1284                    }
1285                    cur_data.push(*output_data_iter.next().unwrap());
1286                }
1287
1288                if cur_data.is_empty() {
1289                    continue;
1290                }
1291
1292                if descending {
1293                    cur_data.sort_by(|a, b| b.cmp(a));
1294                } else {
1295                    cur_data.sort();
1296                }
1297                output_data.push(cur_data);
1298            }
1299
1300            let expected_output = if let Some(limit) = limit {
1301                let mut accumulated = Vec::new();
1302                let mut seen = 0usize;
1303                for mut range_values in output_data {
1304                    seen += range_values.len();
1305                    accumulated.append(&mut range_values);
1306                    if seen >= limit {
1307                        break;
1308                    }
1309                }
1310
1311                if accumulated.is_empty() {
1312                    None
1313                } else {
1314                    if descending {
1315                        accumulated.sort_by(|a, b| b.cmp(a));
1316                    } else {
1317                        accumulated.sort();
1318                    }
1319                    accumulated.truncate(limit.min(accumulated.len()));
1320
1321                    Some(
1322                        DfRecordBatch::try_new(
1323                            schema.clone(),
1324                            vec![new_ts_array(unit, accumulated)],
1325                        )
1326                        .unwrap(),
1327                    )
1328                }
1329            } else {
1330                let batches = output_data
1331                    .into_iter()
1332                    .map(|a| {
1333                        DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap()
1334                    })
1335                    .collect_vec();
1336                if batches.is_empty() {
1337                    None
1338                } else {
1339                    Some(concat_batches(&schema, &batches).unwrap())
1340                }
1341            };
1342
1343            test_cases.push((
1344                case_id,
1345                unit,
1346                input_ranged_data,
1347                schema,
1348                opt,
1349                limit,
1350                expected_output,
1351            ));
1352        }
1353
1354        for (case_id, _unit, input_ranged_data, schema, opt, limit, expected_output) in test_cases {
1355            run_test(
1356                case_id,
1357                input_ranged_data,
1358                schema,
1359                opt,
1360                limit,
1361                expected_output,
1362                None,
1363            )
1364            .await;
1365        }
1366    }
1367
1368    #[tokio::test]
1369    async fn simple_cases() {
1370        let testcases = vec![
1371            (
1372                TimeUnit::Millisecond,
1373                vec![
1374                    ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8, 9]]),
1375                    ((5, 10), vec![vec![5, 6], vec![7, 8]]),
1376                ],
1377                false,
1378                None,
1379                vec![vec![1, 2, 3, 4, 5, 5, 6, 6, 7, 7, 8, 8, 9]],
1380            ),
1381            // Case 1: Descending sort with overlapping ranges that have the same primary end (end=10).
1382            // Ranges [5,10) and [0,10) are grouped together, so their data is merged before sorting.
1383            (
1384                TimeUnit::Millisecond,
1385                vec![
1386                    ((5, 10), vec![vec![5, 6], vec![7, 8, 9]]),
1387                    ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1388                ],
1389                true,
1390                None,
1391                vec![vec![9, 8, 8, 7, 7, 6, 6, 5, 5, 4, 3, 2, 1]],
1392            ),
1393            (
1394                TimeUnit::Millisecond,
1395                vec![
1396                    ((5, 10), vec![]),
1397                    ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1398                ],
1399                true,
1400                None,
1401                vec![vec![8, 7, 6, 5, 4, 3, 2, 1]],
1402            ),
1403            (
1404                TimeUnit::Millisecond,
1405                vec![
1406                    ((15, 20), vec![vec![17, 18, 19]]),
1407                    ((10, 15), vec![]),
1408                    ((5, 10), vec![]),
1409                    ((0, 10), vec![vec![1, 2, 3], vec![4, 5, 6], vec![7, 8]]),
1410                ],
1411                true,
1412                None,
1413                vec![vec![19, 18, 17], vec![8, 7, 6, 5, 4, 3, 2, 1]],
1414            ),
1415            (
1416                TimeUnit::Millisecond,
1417                vec![
1418                    ((15, 20), vec![]),
1419                    ((10, 15), vec![]),
1420                    ((5, 10), vec![]),
1421                    ((0, 10), vec![]),
1422                ],
1423                true,
1424                None,
1425                vec![],
1426            ),
1427            // Case 5: Data from one batch spans multiple ranges. Ranges with same end are grouped.
1428            // Ranges: [15,20) end=20, [10,15) end=15, [5,10) end=10, [0,10) end=10
1429            // Groups: {[15,20)}, {[10,15)}, {[5,10), [0,10)}
1430            // The last two ranges are merged because they share end=10.
1431            (
1432                TimeUnit::Millisecond,
1433                vec![
1434                    (
1435                        (15, 20),
1436                        vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]],
1437                    ),
1438                    ((10, 15), vec![]),
1439                    ((5, 10), vec![]),
1440                    ((0, 10), vec![]),
1441                ],
1442                true,
1443                None,
1444                vec![
1445                    vec![19, 17, 15],
1446                    vec![12, 11, 10],
1447                    vec![9, 8, 7, 6, 5, 4, 3, 2, 1],
1448                ],
1449            ),
1450            (
1451                TimeUnit::Millisecond,
1452                vec![
1453                    (
1454                        (15, 20),
1455                        vec![vec![15, 17, 19, 10, 11, 12, 5, 6, 7, 8, 9, 1, 2, 3, 4]],
1456                    ),
1457                    ((10, 15), vec![]),
1458                    ((5, 10), vec![]),
1459                    ((0, 10), vec![]),
1460                ],
1461                true,
1462                Some(2),
1463                vec![vec![19, 17]],
1464            ),
1465        ];
1466
1467        for (identifier, (unit, input_ranged_data, descending, limit, expected_output)) in
1468            testcases.into_iter().enumerate()
1469        {
1470            let schema = Schema::new(vec![Field::new(
1471                "ts",
1472                DataType::Timestamp(unit, None),
1473                false,
1474            )]);
1475            let schema = Arc::new(schema);
1476            let opt = SortOptions {
1477                descending,
1478                ..Default::default()
1479            };
1480
1481            let input_ranged_data = input_ranged_data
1482                .into_iter()
1483                .map(|(range, data)| {
1484                    let part = PartitionRange {
1485                        start: Timestamp::new(range.0, unit.into()),
1486                        end: Timestamp::new(range.1, unit.into()),
1487                        num_rows: data.iter().map(|b| b.len()).sum(),
1488                        identifier,
1489                    };
1490
1491                    let batches = data
1492                        .into_iter()
1493                        .map(|b| {
1494                            let arr = new_ts_array(unit, b);
1495                            DfRecordBatch::try_new(schema.clone(), vec![arr]).unwrap()
1496                        })
1497                        .collect_vec();
1498                    (part, batches)
1499                })
1500                .collect_vec();
1501
1502            let expected_output = expected_output
1503                .into_iter()
1504                .map(|a| {
1505                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, a)]).unwrap()
1506                })
1507                .collect_vec();
1508            let expected_output = if expected_output.is_empty() {
1509                None
1510            } else {
1511                Some(concat_batches(&schema, &expected_output).unwrap())
1512            };
1513
1514            run_test(
1515                identifier,
1516                input_ranged_data,
1517                schema.clone(),
1518                opt,
1519                limit,
1520                expected_output,
1521                None,
1522            )
1523            .await;
1524        }
1525    }
1526
1527    #[allow(clippy::print_stdout)]
1528    async fn run_test(
1529        case_id: usize,
1530        input_ranged_data: Vec<(PartitionRange, Vec<DfRecordBatch>)>,
1531        schema: SchemaRef,
1532        opt: SortOptions,
1533        limit: Option<usize>,
1534        expected_output: Option<DfRecordBatch>,
1535        expected_polled_rows: Option<usize>,
1536    ) {
1537        if let (Some(limit), Some(rb)) = (limit, &expected_output) {
1538            assert!(
1539                rb.num_rows() <= limit,
1540                "Expect row count in expected output({}) <= limit({})",
1541                rb.num_rows(),
1542                limit
1543            );
1544        }
1545
1546        let mut data_partition = Vec::with_capacity(input_ranged_data.len());
1547        let mut ranges = Vec::with_capacity(input_ranged_data.len());
1548        for (part_range, batches) in input_ranged_data {
1549            data_partition.push(batches);
1550            ranges.push(part_range);
1551        }
1552
1553        let mock_input = Arc::new(MockInputExec::new(data_partition, schema.clone()));
1554
1555        let exec = PartSortExec::try_new(
1556            PhysicalSortExpr {
1557                expr: Arc::new(Column::new("ts", 0)),
1558                options: opt,
1559            },
1560            limit,
1561            vec![ranges.clone()],
1562            mock_input.clone(),
1563        )
1564        .unwrap();
1565
1566        let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
1567
1568        let real_output = exec_stream.map(|r| r.unwrap()).collect::<Vec<_>>().await;
1569        if limit.is_some() {
1570            assert!(
1571                real_output.len() <= 1,
1572                "case_{case_id} expects a single output batch when limit is set, got {}",
1573                real_output.len()
1574            );
1575        }
1576
1577        let actual_output = if real_output.is_empty() {
1578            None
1579        } else {
1580            Some(concat_batches(&schema, &real_output).unwrap())
1581        };
1582
1583        if let Some(expected_polled_rows) = expected_polled_rows {
1584            let input_pulled_rows = mock_input.metrics().unwrap().output_rows().unwrap();
1585            assert_eq!(input_pulled_rows, expected_polled_rows);
1586        }
1587
1588        match (actual_output, expected_output) {
1589            (None, None) => {}
1590            (Some(actual), Some(expected)) => {
1591                if actual != expected {
1592                    let mut actual_json: Vec<u8> = Vec::new();
1593                    let mut writer = ArrayWriter::new(&mut actual_json);
1594                    writer.write(&actual).unwrap();
1595                    writer.finish().unwrap();
1596
1597                    let mut expected_json: Vec<u8> = Vec::new();
1598                    let mut writer = ArrayWriter::new(&mut expected_json);
1599                    writer.write(&expected).unwrap();
1600                    writer.finish().unwrap();
1601
1602                    panic!(
1603                        "case_{} failed (limit {limit:?}), opt: {:?},\nreal_output: {}\nexpected: {}",
1604                        case_id,
1605                        opt,
1606                        String::from_utf8_lossy(&actual_json),
1607                        String::from_utf8_lossy(&expected_json),
1608                    );
1609                }
1610            }
1611            (None, Some(expected)) => panic!(
1612                "case_{} failed (limit {limit:?}), opt: {:?},\nreal output is empty, expected {} rows",
1613                case_id,
1614                opt,
1615                expected.num_rows()
1616            ),
1617            (Some(actual), None) => panic!(
1618                "case_{} failed (limit {limit:?}), opt: {:?},\nreal output has {} rows, expected empty",
1619                case_id,
1620                opt,
1621                actual.num_rows()
1622            ),
1623        }
1624    }
1625
1626    /// Test that verifies the limit is correctly applied per partition when
1627    /// multiple batches are received for the same partition.
1628    #[tokio::test]
1629    async fn test_limit_with_multiple_batches_per_partition() {
1630        let unit = TimeUnit::Millisecond;
1631        let schema = Arc::new(Schema::new(vec![Field::new(
1632            "ts",
1633            DataType::Timestamp(unit, None),
1634            false,
1635        )]));
1636
1637        // Test case: Multiple batches in a single partition with limit=3
1638        // Input: 3 batches with [1,2,3], [4,5,6], [7,8,9] all in partition (0,10)
1639        // Expected: Only top 3 values [9,8,7] for descending sort
1640        let input_ranged_data = vec![(
1641            PartitionRange {
1642                start: Timestamp::new(0, unit.into()),
1643                end: Timestamp::new(10, unit.into()),
1644                num_rows: 9,
1645                identifier: 0,
1646            },
1647            vec![
1648                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1649                    .unwrap(),
1650                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5, 6])])
1651                    .unwrap(),
1652                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![7, 8, 9])])
1653                    .unwrap(),
1654            ],
1655        )];
1656
1657        let expected_output = Some(
1658            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![9, 8, 7])])
1659                .unwrap(),
1660        );
1661
1662        run_test(
1663            1000,
1664            input_ranged_data,
1665            schema.clone(),
1666            SortOptions {
1667                descending: true,
1668                ..Default::default()
1669            },
1670            Some(3),
1671            expected_output,
1672            None,
1673        )
1674        .await;
1675
1676        // Test case: Multiple batches across multiple partitions with limit=2
1677        // Partition 0: batches [10,11,12], [13,14,15] -> top 2 descending = [15,14]
1678        // Partition 1: batches [1,2,3], [4,5] -> top 2 descending = [5,4]
1679        let input_ranged_data = vec![
1680            (
1681                PartitionRange {
1682                    start: Timestamp::new(10, unit.into()),
1683                    end: Timestamp::new(20, unit.into()),
1684                    num_rows: 6,
1685                    identifier: 0,
1686                },
1687                vec![
1688                    DfRecordBatch::try_new(
1689                        schema.clone(),
1690                        vec![new_ts_array(unit, vec![10, 11, 12])],
1691                    )
1692                    .unwrap(),
1693                    DfRecordBatch::try_new(
1694                        schema.clone(),
1695                        vec![new_ts_array(unit, vec![13, 14, 15])],
1696                    )
1697                    .unwrap(),
1698                ],
1699            ),
1700            (
1701                PartitionRange {
1702                    start: Timestamp::new(0, unit.into()),
1703                    end: Timestamp::new(10, unit.into()),
1704                    num_rows: 5,
1705                    identifier: 1,
1706                },
1707                vec![
1708                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1709                        .unwrap(),
1710                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5])])
1711                        .unwrap(),
1712                ],
1713            ),
1714        ];
1715
1716        let expected_output = Some(
1717            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![15, 14])]).unwrap(),
1718        );
1719
1720        run_test(
1721            1001,
1722            input_ranged_data,
1723            schema.clone(),
1724            SortOptions {
1725                descending: true,
1726                ..Default::default()
1727            },
1728            Some(2),
1729            expected_output,
1730            None,
1731        )
1732        .await;
1733
1734        // Test case: Ascending sort with limit
1735        // Partition: batches [7,8,9], [4,5,6], [1,2,3] -> top 2 ascending = [1,2]
1736        let input_ranged_data = vec![(
1737            PartitionRange {
1738                start: Timestamp::new(0, unit.into()),
1739                end: Timestamp::new(10, unit.into()),
1740                num_rows: 9,
1741                identifier: 0,
1742            },
1743            vec![
1744                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![7, 8, 9])])
1745                    .unwrap(),
1746                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![4, 5, 6])])
1747                    .unwrap(),
1748                DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2, 3])])
1749                    .unwrap(),
1750            ],
1751        )];
1752
1753        let expected_output = Some(
1754            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![1, 2])]).unwrap(),
1755        );
1756
1757        run_test(
1758            1002,
1759            input_ranged_data,
1760            schema.clone(),
1761            SortOptions {
1762                descending: false,
1763                ..Default::default()
1764            },
1765            Some(2),
1766            expected_output,
1767            None,
1768        )
1769        .await;
1770    }
1771
1772    /// Test that verifies early termination behavior.
1773    /// Once we've produced limit * num_partitions rows, we should stop
1774    /// pulling from input stream.
1775    #[tokio::test]
1776    async fn test_early_termination() {
1777        let unit = TimeUnit::Millisecond;
1778        let schema = Arc::new(Schema::new(vec![Field::new(
1779            "ts",
1780            DataType::Timestamp(unit, None),
1781            false,
1782        )]));
1783
1784        // Create 3 partitions, each with more data than the limit
1785        // limit=2 per partition, so total expected output = 6 rows
1786        // After producing 6 rows, early termination should kick in
1787        // For descending sort, ranges must be ordered by (end DESC, start DESC)
1788        let input_ranged_data = vec![
1789            (
1790                PartitionRange {
1791                    start: Timestamp::new(20, unit.into()),
1792                    end: Timestamp::new(30, unit.into()),
1793                    num_rows: 10,
1794                    identifier: 2,
1795                },
1796                vec![
1797                    DfRecordBatch::try_new(
1798                        schema.clone(),
1799                        vec![new_ts_array(unit, vec![21, 22, 23, 24, 25])],
1800                    )
1801                    .unwrap(),
1802                    DfRecordBatch::try_new(
1803                        schema.clone(),
1804                        vec![new_ts_array(unit, vec![26, 27, 28, 29, 30])],
1805                    )
1806                    .unwrap(),
1807                ],
1808            ),
1809            (
1810                PartitionRange {
1811                    start: Timestamp::new(10, unit.into()),
1812                    end: Timestamp::new(20, unit.into()),
1813                    num_rows: 10,
1814                    identifier: 1,
1815                },
1816                vec![
1817                    DfRecordBatch::try_new(
1818                        schema.clone(),
1819                        vec![new_ts_array(unit, vec![11, 12, 13, 14, 15])],
1820                    )
1821                    .unwrap(),
1822                    DfRecordBatch::try_new(
1823                        schema.clone(),
1824                        vec![new_ts_array(unit, vec![16, 17, 18, 19, 20])],
1825                    )
1826                    .unwrap(),
1827                ],
1828            ),
1829            (
1830                PartitionRange {
1831                    start: Timestamp::new(0, unit.into()),
1832                    end: Timestamp::new(10, unit.into()),
1833                    num_rows: 10,
1834                    identifier: 0,
1835                },
1836                vec![
1837                    DfRecordBatch::try_new(
1838                        schema.clone(),
1839                        vec![new_ts_array(unit, vec![1, 2, 3, 4, 5])],
1840                    )
1841                    .unwrap(),
1842                    DfRecordBatch::try_new(
1843                        schema.clone(),
1844                        vec![new_ts_array(unit, vec![6, 7, 8, 9, 10])],
1845                    )
1846                    .unwrap(),
1847                ],
1848            ),
1849        ];
1850
1851        // PartSort won't reorder `PartitionRange` (it assumes it's already ordered), so it will not read other partitions.
1852        // This case is just to verify that early termination works as expected.
1853        // First partition [20, 30) produces top 2 values: 29, 28
1854        let expected_output = Some(
1855            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![29, 28])]).unwrap(),
1856        );
1857
1858        run_test(
1859            1003,
1860            input_ranged_data,
1861            schema.clone(),
1862            SortOptions {
1863                descending: true,
1864                ..Default::default()
1865            },
1866            Some(2),
1867            expected_output,
1868            Some(10),
1869        )
1870        .await;
1871    }
1872
1873    /// Example:
1874    /// - Range [70, 100) has data [80, 90, 95]
1875    /// - Range [50, 100) has data [55, 65, 75, 85, 95]
1876    #[tokio::test]
1877    async fn test_primary_end_grouping_with_limit() {
1878        let unit = TimeUnit::Millisecond;
1879        let schema = Arc::new(Schema::new(vec![Field::new(
1880            "ts",
1881            DataType::Timestamp(unit, None),
1882            false,
1883        )]));
1884
1885        // Two ranges with the same end (100) - they should be grouped together
1886        // For descending, ranges are ordered by (end DESC, start DESC)
1887        // So [70, 100) comes before [50, 100) (70 > 50)
1888        let input_ranged_data = vec![
1889            (
1890                PartitionRange {
1891                    start: Timestamp::new(70, unit.into()),
1892                    end: Timestamp::new(100, unit.into()),
1893                    num_rows: 3,
1894                    identifier: 0,
1895                },
1896                vec![
1897                    DfRecordBatch::try_new(
1898                        schema.clone(),
1899                        vec![new_ts_array(unit, vec![80, 90, 95])],
1900                    )
1901                    .unwrap(),
1902                ],
1903            ),
1904            (
1905                PartitionRange {
1906                    start: Timestamp::new(50, unit.into()),
1907                    end: Timestamp::new(100, unit.into()),
1908                    num_rows: 5,
1909                    identifier: 1,
1910                },
1911                vec![
1912                    DfRecordBatch::try_new(
1913                        schema.clone(),
1914                        vec![new_ts_array(unit, vec![55, 65, 75, 85, 95])],
1915                    )
1916                    .unwrap(),
1917                ],
1918            ),
1919        ];
1920
1921        // With limit=4, descending: top 4 values from combined data
1922        // Combined: [80, 90, 95, 55, 65, 75, 85, 95] -> sorted desc: [95, 95, 90, 85, 80, 75, 65, 55]
1923        // Top 4: [95, 95, 90, 85]
1924        let expected_output = Some(
1925            DfRecordBatch::try_new(
1926                schema.clone(),
1927                vec![new_ts_array(unit, vec![95, 95, 90, 85])],
1928            )
1929            .unwrap(),
1930        );
1931
1932        run_test(
1933            2000,
1934            input_ranged_data,
1935            schema.clone(),
1936            SortOptions {
1937                descending: true,
1938                ..Default::default()
1939            },
1940            Some(4),
1941            expected_output,
1942            None,
1943        )
1944        .await;
1945    }
1946
1947    /// Test case with three ranges demonstrating the "keep pulling" behavior.
1948    /// After processing ranges with end=100, the smallest value in top-k might still
1949    /// be reachable by the next group.
1950    ///
1951    /// Ranges: [70, 100), [50, 100), [40, 95)
1952    /// With descending sort and limit=4:
1953    /// - Group 1 (end=100): [70, 100) and [50, 100) merged
1954    /// - Group 2 (end=95): [40, 95)
1955    /// After group 1, smallest in top-4 is 85. Range [40, 95) could have values >= 85,
1956    /// so we continue to group 2.
1957    #[tokio::test]
1958    async fn test_three_ranges_keep_pulling() {
1959        let unit = TimeUnit::Millisecond;
1960        let schema = Arc::new(Schema::new(vec![Field::new(
1961            "ts",
1962            DataType::Timestamp(unit, None),
1963            false,
1964        )]));
1965
1966        // Three ranges, two with same end (100), one with different end (95)
1967        let input_ranged_data = vec![
1968            (
1969                PartitionRange {
1970                    start: Timestamp::new(70, unit.into()),
1971                    end: Timestamp::new(100, unit.into()),
1972                    num_rows: 3,
1973                    identifier: 0,
1974                },
1975                vec![
1976                    DfRecordBatch::try_new(
1977                        schema.clone(),
1978                        vec![new_ts_array(unit, vec![80, 90, 95])],
1979                    )
1980                    .unwrap(),
1981                ],
1982            ),
1983            (
1984                PartitionRange {
1985                    start: Timestamp::new(50, unit.into()),
1986                    end: Timestamp::new(100, unit.into()),
1987                    num_rows: 3,
1988                    identifier: 1,
1989                },
1990                vec![
1991                    DfRecordBatch::try_new(
1992                        schema.clone(),
1993                        vec![new_ts_array(unit, vec![55, 75, 85])],
1994                    )
1995                    .unwrap(),
1996                ],
1997            ),
1998            (
1999                PartitionRange {
2000                    start: Timestamp::new(40, unit.into()),
2001                    end: Timestamp::new(95, unit.into()),
2002                    num_rows: 3,
2003                    identifier: 2,
2004                },
2005                vec![
2006                    DfRecordBatch::try_new(
2007                        schema.clone(),
2008                        vec![new_ts_array(unit, vec![45, 65, 94])],
2009                    )
2010                    .unwrap(),
2011                ],
2012            ),
2013        ];
2014
2015        // All data: [80, 90, 95, 55, 75, 85, 45, 65, 94]
2016        // Sorted descending: [95, 94, 90, 85, 80, 75, 65, 55, 45]
2017        // With limit=4: should be top 4 largest values across all ranges: [95, 94, 90, 85]
2018        let expected_output = Some(
2019            DfRecordBatch::try_new(
2020                schema.clone(),
2021                vec![new_ts_array(unit, vec![95, 94, 90, 85])],
2022            )
2023            .unwrap(),
2024        );
2025
2026        run_test(
2027            2001,
2028            input_ranged_data,
2029            schema.clone(),
2030            SortOptions {
2031                descending: true,
2032                ..Default::default()
2033            },
2034            Some(4),
2035            expected_output,
2036            None,
2037        )
2038        .await;
2039    }
2040
2041    /// Test early termination based on threshold comparison with next group.
2042    /// When the threshold (smallest value for descending) is >= next group's primary end,
2043    /// we can stop early because the next group cannot have better values.
2044    #[tokio::test]
2045    async fn test_threshold_based_early_termination() {
2046        let unit = TimeUnit::Millisecond;
2047        let schema = Arc::new(Schema::new(vec![Field::new(
2048            "ts",
2049            DataType::Timestamp(unit, None),
2050            false,
2051        )]));
2052
2053        // Group 1 (end=100) has 6 rows, TopK will keep top 4
2054        // Group 2 (end=90) has 3 rows - should NOT be processed because
2055        // threshold (96) >= next_primary_end (90)
2056        let input_ranged_data = vec![
2057            (
2058                PartitionRange {
2059                    start: Timestamp::new(70, unit.into()),
2060                    end: Timestamp::new(100, unit.into()),
2061                    num_rows: 6,
2062                    identifier: 0,
2063                },
2064                vec![
2065                    DfRecordBatch::try_new(
2066                        schema.clone(),
2067                        vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2068                    )
2069                    .unwrap(),
2070                ],
2071            ),
2072            (
2073                PartitionRange {
2074                    start: Timestamp::new(50, unit.into()),
2075                    end: Timestamp::new(90, unit.into()),
2076                    num_rows: 3,
2077                    identifier: 1,
2078                },
2079                vec![
2080                    DfRecordBatch::try_new(
2081                        schema.clone(),
2082                        vec![new_ts_array(unit, vec![85, 86, 87])],
2083                    )
2084                    .unwrap(),
2085                ],
2086            ),
2087        ];
2088
2089        // With limit=4, descending: top 4 from group 1 are [99, 98, 97, 96]
2090        // Threshold is 96, next group's primary_end is 90
2091        // Since 96 >= 90, we stop after group 1
2092        let expected_output = Some(
2093            DfRecordBatch::try_new(
2094                schema.clone(),
2095                vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2096            )
2097            .unwrap(),
2098        );
2099
2100        run_test(
2101            2002,
2102            input_ranged_data,
2103            schema.clone(),
2104            SortOptions {
2105                descending: true,
2106                ..Default::default()
2107            },
2108            Some(4),
2109            expected_output,
2110            Some(9), // Pull both batches since all rows fall within the first range
2111        )
2112        .await;
2113    }
2114
2115    /// Test that we continue to next group when threshold is within next group's range.
2116    /// Even after fulfilling limit, if threshold < next_primary_end (descending),
2117    /// we would need to continue... but limit exhaustion stops us first.
2118    #[tokio::test]
2119    async fn test_continue_when_threshold_in_next_group_range() {
2120        let unit = TimeUnit::Millisecond;
2121        let schema = Arc::new(Schema::new(vec![Field::new(
2122            "ts",
2123            DataType::Timestamp(unit, None),
2124            false,
2125        )]));
2126
2127        // Group 1 (end=100) has 6 rows, TopK will keep top 4
2128        // Group 2 (end=98) has 3 rows - threshold (96) < 98, so next group
2129        // could theoretically have better values. Continue reading.
2130        let input_ranged_data = vec![
2131            (
2132                PartitionRange {
2133                    start: Timestamp::new(90, unit.into()),
2134                    end: Timestamp::new(100, unit.into()),
2135                    num_rows: 6,
2136                    identifier: 0,
2137                },
2138                vec![
2139                    DfRecordBatch::try_new(
2140                        schema.clone(),
2141                        vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2142                    )
2143                    .unwrap(),
2144                ],
2145            ),
2146            (
2147                PartitionRange {
2148                    start: Timestamp::new(50, unit.into()),
2149                    end: Timestamp::new(98, unit.into()),
2150                    num_rows: 3,
2151                    identifier: 1,
2152                },
2153                vec![
2154                    // Values must be < 70 (outside group 1's range) to avoid ambiguity
2155                    DfRecordBatch::try_new(
2156                        schema.clone(),
2157                        vec![new_ts_array(unit, vec![55, 60, 65])],
2158                    )
2159                    .unwrap(),
2160                ],
2161            ),
2162        ];
2163
2164        // With limit=4, we get [99, 98, 97, 96] from group 1
2165        // Threshold is 96, next group's primary_end is 98
2166        // 96 < 98, so threshold check says "could continue"
2167        // But limit is exhausted (0), so we stop anyway
2168        let expected_output = Some(
2169            DfRecordBatch::try_new(
2170                schema.clone(),
2171                vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2172            )
2173            .unwrap(),
2174        );
2175
2176        // Note: We pull 9 rows (both batches) because we need to read batch 2
2177        // to detect the group boundary, even though we stop after outputting group 1.
2178        run_test(
2179            2003,
2180            input_ranged_data,
2181            schema.clone(),
2182            SortOptions {
2183                descending: true,
2184                ..Default::default()
2185            },
2186            Some(4),
2187            expected_output,
2188            Some(9), // Pull both batches to detect boundary
2189        )
2190        .await;
2191    }
2192
2193    /// Test ascending sort with threshold-based early termination.
2194    #[tokio::test]
2195    async fn test_ascending_threshold_early_termination() {
2196        let unit = TimeUnit::Millisecond;
2197        let schema = Arc::new(Schema::new(vec![Field::new(
2198            "ts",
2199            DataType::Timestamp(unit, None),
2200            false,
2201        )]));
2202
2203        // For ascending: primary_end is start, ranges sorted by (start ASC, end ASC)
2204        // Group 1 (start=10) has 6 rows
2205        // Group 2 (start=20) has 3 rows - should NOT be processed because
2206        // threshold (13) < next_primary_end (20)
2207        let input_ranged_data = vec![
2208            (
2209                PartitionRange {
2210                    start: Timestamp::new(10, unit.into()),
2211                    end: Timestamp::new(50, unit.into()),
2212                    num_rows: 6,
2213                    identifier: 0,
2214                },
2215                vec![
2216                    DfRecordBatch::try_new(
2217                        schema.clone(),
2218                        vec![new_ts_array(unit, vec![10, 11, 12, 13, 14, 15])],
2219                    )
2220                    .unwrap(),
2221                ],
2222            ),
2223            (
2224                PartitionRange {
2225                    start: Timestamp::new(20, unit.into()),
2226                    end: Timestamp::new(60, unit.into()),
2227                    num_rows: 3,
2228                    identifier: 1,
2229                },
2230                vec![
2231                    DfRecordBatch::try_new(
2232                        schema.clone(),
2233                        vec![new_ts_array(unit, vec![25, 30, 35])],
2234                    )
2235                    .unwrap(),
2236                ],
2237            ),
2238            // still read this batch to detect group boundary(?)
2239            (
2240                PartitionRange {
2241                    start: Timestamp::new(60, unit.into()),
2242                    end: Timestamp::new(70, unit.into()),
2243                    num_rows: 2,
2244                    identifier: 1,
2245                },
2246                vec![
2247                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![60, 61])])
2248                        .unwrap(),
2249                ],
2250            ),
2251            // after boundary detected, this following one should not be read
2252            (
2253                PartitionRange {
2254                    start: Timestamp::new(61, unit.into()),
2255                    end: Timestamp::new(70, unit.into()),
2256                    num_rows: 2,
2257                    identifier: 1,
2258                },
2259                vec![
2260                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![71, 72])])
2261                        .unwrap(),
2262                ],
2263            ),
2264        ];
2265
2266        // With limit=4, ascending: top 4 (smallest) from group 1 are [10, 11, 12, 13]
2267        // Threshold is 13 (largest in top-k), next group's primary_end is 20
2268        // Since 13 < 20, we stop after group 1 (no value in group 2 can be < 13)
2269        let expected_output = Some(
2270            DfRecordBatch::try_new(
2271                schema.clone(),
2272                vec![new_ts_array(unit, vec![10, 11, 12, 13])],
2273            )
2274            .unwrap(),
2275        );
2276
2277        run_test(
2278            2004,
2279            input_ranged_data,
2280            schema.clone(),
2281            SortOptions {
2282                descending: false,
2283                ..Default::default()
2284            },
2285            Some(4),
2286            expected_output,
2287            Some(11), // Pull first two batches to detect boundary
2288        )
2289        .await;
2290    }
2291
2292    #[tokio::test]
2293    async fn test_ascending_threshold_early_termination_case_two() {
2294        let unit = TimeUnit::Millisecond;
2295        let schema = Arc::new(Schema::new(vec![Field::new(
2296            "ts",
2297            DataType::Timestamp(unit, None),
2298            false,
2299        )]));
2300
2301        // For ascending: primary_end is start, ranges sorted by (start ASC, end ASC)
2302        // Group 1 (start=0) has 4 rows, Group 2 (start=4) has 1 row, Group 3 (start=5) has 4 rows
2303        // After reading all data: [9,10,11,12, 21, 5,6,7,8]
2304        // Sorted ascending: [5,6,7,8, 9,10,11,12, 21]
2305        // With limit=4, output should be smallest 4: [5,6,7,8]
2306        // Algorithm continues reading until start=42 > threshold=8, confirming no smaller values exist
2307        let input_ranged_data = vec![
2308            (
2309                PartitionRange {
2310                    start: Timestamp::new(0, unit.into()),
2311                    end: Timestamp::new(20, unit.into()),
2312                    num_rows: 4,
2313                    identifier: 0,
2314                },
2315                vec![
2316                    DfRecordBatch::try_new(
2317                        schema.clone(),
2318                        vec![new_ts_array(unit, vec![9, 10, 11, 12])],
2319                    )
2320                    .unwrap(),
2321                ],
2322            ),
2323            (
2324                PartitionRange {
2325                    start: Timestamp::new(4, unit.into()),
2326                    end: Timestamp::new(25, unit.into()),
2327                    num_rows: 1,
2328                    identifier: 1,
2329                },
2330                vec![
2331                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21])])
2332                        .unwrap(),
2333                ],
2334            ),
2335            (
2336                PartitionRange {
2337                    start: Timestamp::new(5, unit.into()),
2338                    end: Timestamp::new(25, unit.into()),
2339                    num_rows: 4,
2340                    identifier: 1,
2341                },
2342                vec![
2343                    DfRecordBatch::try_new(
2344                        schema.clone(),
2345                        vec![new_ts_array(unit, vec![5, 6, 7, 8])],
2346                    )
2347                    .unwrap(),
2348                ],
2349            ),
2350            // This still will be read to detect boundary, but should not contribute to output
2351            (
2352                PartitionRange {
2353                    start: Timestamp::new(42, unit.into()),
2354                    end: Timestamp::new(52, unit.into()),
2355                    num_rows: 2,
2356                    identifier: 1,
2357                },
2358                vec![
2359                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![42, 51])])
2360                        .unwrap(),
2361                ],
2362            ),
2363            // This following one should not be read after boundary detected
2364            (
2365                PartitionRange {
2366                    start: Timestamp::new(48, unit.into()),
2367                    end: Timestamp::new(53, unit.into()),
2368                    num_rows: 2,
2369                    identifier: 1,
2370                },
2371                vec![
2372                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![48, 51])])
2373                        .unwrap(),
2374                ],
2375            ),
2376        ];
2377
2378        // With limit=4, ascending: after processing all ranges, smallest 4 are [5, 6, 7, 8]
2379        // Threshold is 8 (4th smallest value), algorithm reads until start=42 > threshold=8
2380        let expected_output = Some(
2381            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![5, 6, 7, 8])])
2382                .unwrap(),
2383        );
2384
2385        run_test(
2386            2005,
2387            input_ranged_data,
2388            schema.clone(),
2389            SortOptions {
2390                descending: false,
2391                ..Default::default()
2392            },
2393            Some(4),
2394            expected_output,
2395            Some(11), // Read first 4 ranges to confirm threshold boundary
2396        )
2397        .await;
2398    }
2399
2400    /// Test early stop behavior with null values in sort column.
2401    /// Verifies that nulls are handled correctly based on nulls_first option.
2402    #[tokio::test]
2403    async fn test_early_stop_with_nulls() {
2404        let unit = TimeUnit::Millisecond;
2405        let schema = Arc::new(Schema::new(vec![Field::new(
2406            "ts",
2407            DataType::Timestamp(unit, None),
2408            true, // nullable
2409        )]));
2410
2411        // Helper function to create nullable timestamp array
2412        let new_nullable_ts_array = |unit: TimeUnit, arr: Vec<Option<i64>>| -> ArrayRef {
2413            match unit {
2414                TimeUnit::Second => Arc::new(TimestampSecondArray::from(arr)) as ArrayRef,
2415                TimeUnit::Millisecond => Arc::new(TimestampMillisecondArray::from(arr)) as ArrayRef,
2416                TimeUnit::Microsecond => Arc::new(TimestampMicrosecondArray::from(arr)) as ArrayRef,
2417                TimeUnit::Nanosecond => Arc::new(TimestampNanosecondArray::from(arr)) as ArrayRef,
2418            }
2419        };
2420
2421        // Test case 1: nulls_first=true, null values should appear first
2422        // Group 1 (end=100): [null, null, 99, 98, 97] -> with limit=3, top 3 are [null, null, 99]
2423        // Threshold is 99, next group end=90, since 99 >= 90, we should stop early
2424        let input_ranged_data = vec![
2425            (
2426                PartitionRange {
2427                    start: Timestamp::new(70, unit.into()),
2428                    end: Timestamp::new(100, unit.into()),
2429                    num_rows: 5,
2430                    identifier: 0,
2431                },
2432                vec![
2433                    DfRecordBatch::try_new(
2434                        schema.clone(),
2435                        vec![new_nullable_ts_array(
2436                            unit,
2437                            vec![Some(99), Some(98), None, Some(97), None],
2438                        )],
2439                    )
2440                    .unwrap(),
2441                ],
2442            ),
2443            (
2444                PartitionRange {
2445                    start: Timestamp::new(50, unit.into()),
2446                    end: Timestamp::new(90, unit.into()),
2447                    num_rows: 3,
2448                    identifier: 1,
2449                },
2450                vec![
2451                    DfRecordBatch::try_new(
2452                        schema.clone(),
2453                        vec![new_nullable_ts_array(
2454                            unit,
2455                            vec![Some(89), Some(88), Some(87)],
2456                        )],
2457                    )
2458                    .unwrap(),
2459                ],
2460            ),
2461        ];
2462
2463        // With nulls_first=true, nulls sort before all values
2464        // For descending, order is: null, null, 99, 98, 97
2465        // With limit=3, we get: null, null, 99
2466        let expected_output = Some(
2467            DfRecordBatch::try_new(
2468                schema.clone(),
2469                vec![new_nullable_ts_array(unit, vec![None, None, Some(99)])],
2470            )
2471            .unwrap(),
2472        );
2473
2474        run_test(
2475            3000,
2476            input_ranged_data,
2477            schema.clone(),
2478            SortOptions {
2479                descending: true,
2480                nulls_first: true,
2481            },
2482            Some(3),
2483            expected_output,
2484            Some(8), // Must read both batches to detect group boundary
2485        )
2486        .await;
2487
2488        // Test case 2: nulls_last=true, null values should appear last
2489        // Group 1 (end=100): [99, 98, 97, null, null] -> with limit=3, top 3 are [99, 98, 97]
2490        // Threshold is 97, next group end=90, since 97 >= 90, we should stop early
2491        let input_ranged_data = vec![
2492            (
2493                PartitionRange {
2494                    start: Timestamp::new(70, unit.into()),
2495                    end: Timestamp::new(100, unit.into()),
2496                    num_rows: 5,
2497                    identifier: 0,
2498                },
2499                vec![
2500                    DfRecordBatch::try_new(
2501                        schema.clone(),
2502                        vec![new_nullable_ts_array(
2503                            unit,
2504                            vec![Some(99), Some(98), Some(97), None, None],
2505                        )],
2506                    )
2507                    .unwrap(),
2508                ],
2509            ),
2510            (
2511                PartitionRange {
2512                    start: Timestamp::new(50, unit.into()),
2513                    end: Timestamp::new(90, unit.into()),
2514                    num_rows: 3,
2515                    identifier: 1,
2516                },
2517                vec![
2518                    DfRecordBatch::try_new(
2519                        schema.clone(),
2520                        vec![new_nullable_ts_array(
2521                            unit,
2522                            vec![Some(89), Some(88), Some(87)],
2523                        )],
2524                    )
2525                    .unwrap(),
2526                ],
2527            ),
2528        ];
2529
2530        // With nulls_last=false (equivalent to nulls_first=false), values sort before nulls
2531        // For descending, order is: 99, 98, 97, null, null
2532        // With limit=3, we get: 99, 98, 97
2533        let expected_output = Some(
2534            DfRecordBatch::try_new(
2535                schema.clone(),
2536                vec![new_nullable_ts_array(
2537                    unit,
2538                    vec![Some(99), Some(98), Some(97)],
2539                )],
2540            )
2541            .unwrap(),
2542        );
2543
2544        run_test(
2545            3001,
2546            input_ranged_data,
2547            schema.clone(),
2548            SortOptions {
2549                descending: true,
2550                nulls_first: false,
2551            },
2552            Some(3),
2553            expected_output,
2554            Some(8), // Must read both batches to detect group boundary
2555        )
2556        .await;
2557    }
2558
2559    /// Test early stop behavior when there's only one group (no next group).
2560    /// In this case, can_stop_early should return false and we should process all data.
2561    #[tokio::test]
2562    async fn test_early_stop_single_group() {
2563        let unit = TimeUnit::Millisecond;
2564        let schema = Arc::new(Schema::new(vec![Field::new(
2565            "ts",
2566            DataType::Timestamp(unit, None),
2567            false,
2568        )]));
2569
2570        // Only one group (all ranges have the same end), no next group to compare against
2571        let input_ranged_data = vec![
2572            (
2573                PartitionRange {
2574                    start: Timestamp::new(70, unit.into()),
2575                    end: Timestamp::new(100, unit.into()),
2576                    num_rows: 6,
2577                    identifier: 0,
2578                },
2579                vec![
2580                    DfRecordBatch::try_new(
2581                        schema.clone(),
2582                        vec![new_ts_array(unit, vec![94, 95, 96, 97, 98, 99])],
2583                    )
2584                    .unwrap(),
2585                ],
2586            ),
2587            (
2588                PartitionRange {
2589                    start: Timestamp::new(50, unit.into()),
2590                    end: Timestamp::new(100, unit.into()),
2591                    num_rows: 3,
2592                    identifier: 1,
2593                },
2594                vec![
2595                    DfRecordBatch::try_new(
2596                        schema.clone(),
2597                        vec![new_ts_array(unit, vec![85, 86, 87])],
2598                    )
2599                    .unwrap(),
2600                ],
2601            ),
2602        ];
2603
2604        // Even though we have enough data in first range, we must process all
2605        // because there's no next group to compare threshold against
2606        let expected_output = Some(
2607            DfRecordBatch::try_new(
2608                schema.clone(),
2609                vec![new_ts_array(unit, vec![99, 98, 97, 96])],
2610            )
2611            .unwrap(),
2612        );
2613
2614        run_test(
2615            3002,
2616            input_ranged_data,
2617            schema.clone(),
2618            SortOptions {
2619                descending: true,
2620                ..Default::default()
2621            },
2622            Some(4),
2623            expected_output,
2624            Some(9), // Must read all batches since no early stop is possible
2625        )
2626        .await;
2627    }
2628
2629    /// Test early stop behavior when threshold exactly equals next group's boundary.
2630    #[tokio::test]
2631    async fn test_early_stop_exact_boundary_equality() {
2632        let unit = TimeUnit::Millisecond;
2633        let schema = Arc::new(Schema::new(vec![Field::new(
2634            "ts",
2635            DataType::Timestamp(unit, None),
2636            false,
2637        )]));
2638
2639        // Test case 1: Descending sort, threshold == next_group_end
2640        // Group 1 (end=100): data up to 90, threshold = 90, next_group_end = 90
2641        // Since 90 >= 90, we should stop early
2642        let input_ranged_data = vec![
2643            (
2644                PartitionRange {
2645                    start: Timestamp::new(70, unit.into()),
2646                    end: Timestamp::new(100, unit.into()),
2647                    num_rows: 4,
2648                    identifier: 0,
2649                },
2650                vec![
2651                    DfRecordBatch::try_new(
2652                        schema.clone(),
2653                        vec![new_ts_array(unit, vec![92, 91, 90, 89])],
2654                    )
2655                    .unwrap(),
2656                ],
2657            ),
2658            (
2659                PartitionRange {
2660                    start: Timestamp::new(50, unit.into()),
2661                    end: Timestamp::new(90, unit.into()),
2662                    num_rows: 3,
2663                    identifier: 1,
2664                },
2665                vec![
2666                    DfRecordBatch::try_new(
2667                        schema.clone(),
2668                        vec![new_ts_array(unit, vec![88, 87, 86])],
2669                    )
2670                    .unwrap(),
2671                ],
2672            ),
2673        ];
2674
2675        let expected_output = Some(
2676            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![92, 91, 90])])
2677                .unwrap(),
2678        );
2679
2680        run_test(
2681            3003,
2682            input_ranged_data,
2683            schema.clone(),
2684            SortOptions {
2685                descending: true,
2686                ..Default::default()
2687            },
2688            Some(3),
2689            expected_output,
2690            Some(7), // Must read both batches to detect boundary
2691        )
2692        .await;
2693
2694        // Test case 2: Ascending sort, threshold == next_group_start
2695        // Group 1 (start=10): data from 10, threshold = 20, next_group_start = 20
2696        // Since 20 < 20 is false, we should continue
2697        let input_ranged_data = vec![
2698            (
2699                PartitionRange {
2700                    start: Timestamp::new(10, unit.into()),
2701                    end: Timestamp::new(50, unit.into()),
2702                    num_rows: 4,
2703                    identifier: 0,
2704                },
2705                vec![
2706                    DfRecordBatch::try_new(
2707                        schema.clone(),
2708                        vec![new_ts_array(unit, vec![10, 15, 20, 25])],
2709                    )
2710                    .unwrap(),
2711                ],
2712            ),
2713            (
2714                PartitionRange {
2715                    start: Timestamp::new(20, unit.into()),
2716                    end: Timestamp::new(60, unit.into()),
2717                    num_rows: 3,
2718                    identifier: 1,
2719                },
2720                vec![
2721                    DfRecordBatch::try_new(
2722                        schema.clone(),
2723                        vec![new_ts_array(unit, vec![21, 22, 23])],
2724                    )
2725                    .unwrap(),
2726                ],
2727            ),
2728        ];
2729
2730        let expected_output = Some(
2731            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![10, 15, 20])])
2732                .unwrap(),
2733        );
2734
2735        run_test(
2736            3004,
2737            input_ranged_data,
2738            schema.clone(),
2739            SortOptions {
2740                descending: false,
2741                ..Default::default()
2742            },
2743            Some(3),
2744            expected_output,
2745            Some(7), // Must read both batches since 20 is not < 20
2746        )
2747        .await;
2748    }
2749
2750    /// Test early stop behavior with empty partition groups.
2751    #[tokio::test]
2752    async fn test_early_stop_with_empty_partitions() {
2753        let unit = TimeUnit::Millisecond;
2754        let schema = Arc::new(Schema::new(vec![Field::new(
2755            "ts",
2756            DataType::Timestamp(unit, None),
2757            false,
2758        )]));
2759
2760        // Test case 1: First group is empty, second group has data
2761        let input_ranged_data = vec![
2762            (
2763                PartitionRange {
2764                    start: Timestamp::new(70, unit.into()),
2765                    end: Timestamp::new(100, unit.into()),
2766                    num_rows: 0,
2767                    identifier: 0,
2768                },
2769                vec![
2770                    // Empty batch for first range
2771                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2772                        .unwrap(),
2773                ],
2774            ),
2775            (
2776                PartitionRange {
2777                    start: Timestamp::new(50, unit.into()),
2778                    end: Timestamp::new(100, unit.into()),
2779                    num_rows: 0,
2780                    identifier: 1,
2781                },
2782                vec![
2783                    // Empty batch for second range
2784                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2785                        .unwrap(),
2786                ],
2787            ),
2788            (
2789                PartitionRange {
2790                    start: Timestamp::new(30, unit.into()),
2791                    end: Timestamp::new(80, unit.into()),
2792                    num_rows: 4,
2793                    identifier: 2,
2794                },
2795                vec![
2796                    DfRecordBatch::try_new(
2797                        schema.clone(),
2798                        vec![new_ts_array(unit, vec![74, 75, 76, 77])],
2799                    )
2800                    .unwrap(),
2801                ],
2802            ),
2803            (
2804                PartitionRange {
2805                    start: Timestamp::new(10, unit.into()),
2806                    end: Timestamp::new(60, unit.into()),
2807                    num_rows: 3,
2808                    identifier: 3,
2809                },
2810                vec![
2811                    DfRecordBatch::try_new(
2812                        schema.clone(),
2813                        vec![new_ts_array(unit, vec![58, 59, 60])],
2814                    )
2815                    .unwrap(),
2816                ],
2817            ),
2818        ];
2819
2820        // Group 1 (end=100) is empty, Group 2 (end=80) has data
2821        // Should continue to Group 2 since Group 1 has no data
2822        let expected_output = Some(
2823            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![77, 76])]).unwrap(),
2824        );
2825
2826        run_test(
2827            3005,
2828            input_ranged_data,
2829            schema.clone(),
2830            SortOptions {
2831                descending: true,
2832                ..Default::default()
2833            },
2834            Some(2),
2835            expected_output,
2836            Some(7), // Must read until finding actual data
2837        )
2838        .await;
2839
2840        // Test case 2: Empty partitions between data groups
2841        let input_ranged_data = vec![
2842            (
2843                PartitionRange {
2844                    start: Timestamp::new(70, unit.into()),
2845                    end: Timestamp::new(100, unit.into()),
2846                    num_rows: 4,
2847                    identifier: 0,
2848                },
2849                vec![
2850                    DfRecordBatch::try_new(
2851                        schema.clone(),
2852                        vec![new_ts_array(unit, vec![96, 97, 98, 99])],
2853                    )
2854                    .unwrap(),
2855                ],
2856            ),
2857            (
2858                PartitionRange {
2859                    start: Timestamp::new(50, unit.into()),
2860                    end: Timestamp::new(90, unit.into()),
2861                    num_rows: 0,
2862                    identifier: 1,
2863                },
2864                vec![
2865                    // Empty range - should be skipped
2866                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2867                        .unwrap(),
2868                ],
2869            ),
2870            (
2871                PartitionRange {
2872                    start: Timestamp::new(30, unit.into()),
2873                    end: Timestamp::new(70, unit.into()),
2874                    num_rows: 0,
2875                    identifier: 2,
2876                },
2877                vec![
2878                    // Another empty range
2879                    DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![])])
2880                        .unwrap(),
2881                ],
2882            ),
2883            (
2884                PartitionRange {
2885                    start: Timestamp::new(10, unit.into()),
2886                    end: Timestamp::new(50, unit.into()),
2887                    num_rows: 3,
2888                    identifier: 3,
2889                },
2890                vec![
2891                    DfRecordBatch::try_new(
2892                        schema.clone(),
2893                        vec![new_ts_array(unit, vec![48, 49, 50])],
2894                    )
2895                    .unwrap(),
2896                ],
2897            ),
2898        ];
2899
2900        // With limit=2 from group 1: [99, 98], threshold=98, next group end=50
2901        // Since 98 >= 50, we should stop early
2902        let expected_output = Some(
2903            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![99, 98])]).unwrap(),
2904        );
2905
2906        run_test(
2907            3006,
2908            input_ranged_data,
2909            schema.clone(),
2910            SortOptions {
2911                descending: true,
2912                ..Default::default()
2913            },
2914            Some(2),
2915            expected_output,
2916            Some(7), // Must read to detect early stop condition
2917        )
2918        .await;
2919    }
2920
2921    /// First group: [0,20), data: [0, 5, 15]
2922    /// Second group: [10, 30), data: [21, 25, 29]
2923    /// after first group, calling early stop manually, and check if filter is updated
2924    #[tokio::test]
2925    async fn test_early_stop_check_update_dyn_filter() {
2926        let unit = TimeUnit::Millisecond;
2927        let schema = Arc::new(Schema::new(vec![Field::new(
2928            "ts",
2929            DataType::Timestamp(unit, None),
2930            false,
2931        )]));
2932
2933        let mock_input = Arc::new(MockInputExec::new(vec![vec![]], schema.clone()));
2934        let exec = PartSortExec::try_new(
2935            PhysicalSortExpr {
2936                expr: Arc::new(Column::new("ts", 0)),
2937                options: SortOptions {
2938                    descending: false,
2939                    ..Default::default()
2940                },
2941            },
2942            Some(3),
2943            vec![vec![
2944                PartitionRange {
2945                    start: Timestamp::new(0, unit.into()),
2946                    end: Timestamp::new(20, unit.into()),
2947                    num_rows: 3,
2948                    identifier: 1,
2949                },
2950                PartitionRange {
2951                    start: Timestamp::new(10, unit.into()),
2952                    end: Timestamp::new(30, unit.into()),
2953                    num_rows: 3,
2954                    identifier: 1,
2955                },
2956            ]],
2957            mock_input.clone(),
2958        )
2959        .unwrap();
2960
2961        let filter = exec.filter.clone().unwrap();
2962        let input_stream = mock_input
2963            .execute(0, Arc::new(TaskContext::default()))
2964            .unwrap();
2965        let mut stream = PartSortStream::new(
2966            Arc::new(TaskContext::default()),
2967            &exec,
2968            Some(3),
2969            input_stream,
2970            vec![],
2971            0,
2972            Some(filter.clone()),
2973        )
2974        .unwrap();
2975
2976        // initially, snapshot_generation is 1
2977        assert_eq!(filter.read().expr().snapshot_generation(), 1);
2978        let batch =
2979            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![0, 5, 15])])
2980                .unwrap();
2981        stream.push_buffer(batch).unwrap();
2982
2983        // after pushing first batch, snapshot_generation is updated to 2
2984        assert_eq!(filter.read().expr().snapshot_generation(), 2);
2985        assert!(!stream.can_stop_early(&schema).unwrap());
2986        // still two as not updated
2987        assert_eq!(filter.read().expr().snapshot_generation(), 2);
2988
2989        let _ = stream.sort_top_buffer().unwrap();
2990
2991        let batch =
2992            DfRecordBatch::try_new(schema.clone(), vec![new_ts_array(unit, vec![21, 25, 29])])
2993                .unwrap();
2994        stream.push_buffer(batch).unwrap();
2995        // still two as not updated
2996        assert_eq!(filter.read().expr().snapshot_generation(), 2);
2997        let new = stream.sort_top_buffer().unwrap();
2998        // still two as not updated
2999        assert_eq!(filter.read().expr().snapshot_generation(), 2);
3000
3001        // dyn filter kick in, and filter out all rows >= 15(the filter is rows<15)
3002        assert_eq!(new.num_rows(), 0)
3003    }
3004}