query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{DataFusionError, internal_err};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
53/// merge sort them whenever possible, and emit the sorted result as soon as possible.
54/// This sorting plan only accept sort by ts and will not sort by other fields.
55///
56/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
57///
58/// # Invariant Promise on Input Stream
59/// 1. The input stream must be sorted by timestamp and
60/// 2. in the order of `PartitionRange` in `ranges`
61/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
62/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
63///
64///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
65
66#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68    /// Physical sort expressions(that is, sort by timestamp)
69    expression: PhysicalSortExpr,
70    /// Optional number of rows to fetch. Stops producing rows after this fetch
71    fetch: Option<usize>,
72    /// The input ranges indicate input stream will be composed of those ranges in given order.
73    ///
74    /// Each partition has one vector of `PartitionRange`.
75    ranges: Vec<Vec<PartitionRange>>,
76    /// All available working ranges and their corresponding working set
77    ///
78    /// working ranges promise once input stream get a value out of current range, future values will never
79    /// be in this range. Each partition has one vector of ranges.
80    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81    input: Arc<dyn ExecutionPlan>,
82    /// Execution metrics
83    metrics: ExecutionPlanMetricsSet,
84    properties: PlanProperties,
85}
86
87/// Checks that partition ranges are sorted correctly for the given sort direction.
88/// - Descending: sorted by (end DESC, start DESC) - shorter ranges first when ends are equal
89/// - Ascending: sorted by (start ASC, end ASC) - shorter ranges first when starts are equal
90pub fn check_partition_range_monotonicity(
91    ranges: &[Vec<PartitionRange>],
92    descending: bool,
93) -> Result<()> {
94    let is_valid = ranges.iter().all(|r| {
95        if descending {
96            // Primary: end descending, Secondary: start descending (shorter range first)
97            r.windows(2)
98                .all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
99        } else {
100            // Primary: start ascending, Secondary: end ascending (shorter range first)
101            r.windows(2).all(|w| {
102                w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
103            })
104        }
105    });
106
107    if !is_valid {
108        let msg = if descending {
109            "Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
110        } else {
111            "Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
112        };
113        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
114        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
115    } else {
116        Ok(())
117    }
118}
119
120impl WindowedSortExec {
121    pub fn try_new(
122        expression: PhysicalSortExpr,
123        fetch: Option<usize>,
124        ranges: Vec<Vec<PartitionRange>>,
125        input: Arc<dyn ExecutionPlan>,
126    ) -> Result<Self> {
127        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
128
129        let mut eq_properties = input.equivalence_properties().clone();
130        eq_properties.reorder(vec![expression.clone()])?;
131
132        let properties = input.properties();
133        let properties = PlanProperties::new(
134            eq_properties,
135            input.output_partitioning().clone(),
136            properties.emission_type,
137            properties.boundedness,
138        );
139
140        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
141        for r in &ranges {
142            let overlap_counts = split_overlapping_ranges(r);
143            let working_ranges =
144                compute_all_working_ranges(&overlap_counts, expression.options.descending);
145            all_avail_working_range.push(working_ranges);
146        }
147
148        Ok(Self {
149            expression,
150            fetch,
151            ranges,
152            all_avail_working_range,
153            input,
154            metrics: ExecutionPlanMetricsSet::new(),
155            properties,
156        })
157    }
158
159    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
160    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
161    /// of working set, we can merge results before whenever possible.
162    pub fn to_stream(
163        &self,
164        context: Arc<TaskContext>,
165        partition: usize,
166    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
167        let input_stream: DfSendableRecordBatchStream =
168            self.input.execute(partition, context.clone())?;
169
170        let df_stream = Box::pin(WindowedSortStream::new(
171            context,
172            self,
173            input_stream,
174            partition,
175        )) as _;
176
177        Ok(df_stream)
178    }
179}
180
181impl DisplayAs for WindowedSortExec {
182    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        write!(
184            f,
185            "WindowedSortExec: expr={} num_ranges={}",
186            self.expression,
187            self.ranges.len()
188        )?;
189        if let Some(fetch) = self.fetch {
190            write!(f, " fetch={}", fetch)?;
191        }
192        Ok(())
193    }
194}
195
196impl ExecutionPlan for WindowedSortExec {
197    fn as_any(&self) -> &dyn Any {
198        self
199    }
200
201    fn schema(&self) -> SchemaRef {
202        self.input.schema()
203    }
204
205    fn properties(&self) -> &PlanProperties {
206        &self.properties
207    }
208
209    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
210        vec![&self.input]
211    }
212
213    fn with_new_children(
214        self: Arc<Self>,
215        children: Vec<Arc<dyn ExecutionPlan>>,
216    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
217        let new_input = if let Some(first) = children.first() {
218            first
219        } else {
220            internal_err!("No children found")?
221        };
222        let new = Self::try_new(
223            self.expression.clone(),
224            self.fetch,
225            self.ranges.clone(),
226            new_input.clone(),
227        )?;
228        Ok(Arc::new(new))
229    }
230
231    fn execute(
232        &self,
233        partition: usize,
234        context: Arc<TaskContext>,
235    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
236        self.to_stream(context, partition)
237    }
238
239    fn metrics(&self) -> Option<MetricsSet> {
240        Some(self.metrics.clone_inner())
241    }
242
243    /// # Explain
244    ///
245    /// This plan needs to be executed on each partition independently,
246    /// and is expected to run directly on storage engine's output
247    /// distribution / partition.
248    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
249        vec![false]
250    }
251
252    fn name(&self) -> &str {
253        "WindowedSortExec"
254    }
255}
256
257/// The core logic of merging sort multiple sorted ranges
258///
259/// the flow of data is:
260/// ```md
261/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
262/// ```
263pub struct WindowedSortStream {
264    /// Memory pool for this stream
265    memory_pool: Arc<dyn MemoryPool>,
266    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
267    in_progress: Vec<DfRecordBatch>,
268    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
269    last_value: Option<Timestamp>,
270    /// Current working set of `PartitionRange` sorted RecordBatches
271    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
272    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
273    merge_stream: VecDeque<DfSendableRecordBatchStream>,
274    /// The number of times merge sort has been called
275    merge_count: usize,
276    /// Index into current `working_range` in `all_avail_working_range`
277    working_idx: usize,
278    /// input stream assumed reading in order of `PartitionRange`
279    input: DfSendableRecordBatchStream,
280    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
281    is_terminated: bool,
282    /// Output Schema, which is the same as input schema, since this is a sort plan
283    schema: SchemaRef,
284    /// Physical sort expressions(that is, sort by timestamp)
285    expression: PhysicalSortExpr,
286    /// Optional number of rows to fetch. Stops producing rows after this fetch
287    fetch: Option<usize>,
288    /// number of rows produced
289    produced: usize,
290    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
291    batch_size: usize,
292    /// All available working ranges and their corresponding working set
293    ///
294    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
295    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
296    /// The input partition ranges
297    #[allow(dead_code)] // this is used under #[debug_assertions]
298    ranges: Vec<PartitionRange>,
299    /// Execution metrics
300    metrics: BaselineMetrics,
301}
302
303impl WindowedSortStream {
304    pub fn new(
305        context: Arc<TaskContext>,
306        exec: &WindowedSortExec,
307        input: DfSendableRecordBatchStream,
308        partition: usize,
309    ) -> Self {
310        Self {
311            memory_pool: context.runtime_env().memory_pool.clone(),
312            in_progress: Vec::new(),
313            last_value: None,
314            sorted_input_runs: Vec::new(),
315            merge_stream: VecDeque::new(),
316            merge_count: 0,
317            working_idx: 0,
318            schema: input.schema(),
319            input,
320            is_terminated: false,
321            expression: exec.expression.clone(),
322            fetch: exec.fetch,
323            produced: 0,
324            batch_size: context.session_config().batch_size(),
325            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
326            ranges: exec.ranges[partition].clone(),
327            metrics: BaselineMetrics::new(&exec.metrics, partition),
328        }
329    }
330}
331
332impl WindowedSortStream {
333    #[cfg(debug_assertions)]
334    fn check_subset_ranges(&self, cur_range: &TimeRange) {
335        let cur_is_subset_to = self
336            .ranges
337            .iter()
338            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
339            .collect_vec();
340        if cur_is_subset_to.is_empty() {
341            error!("Current range is not a subset of any PartitionRange");
342            // found in all ranges that are subset of current range
343            let subset_ranges = self
344                .ranges
345                .iter()
346                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
347                .collect_vec();
348            let only_overlap = self
349                .ranges
350                .iter()
351                .filter(|r| {
352                    let r = TimeRange::from(*r);
353                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
354                })
355                .collect_vec();
356            error!(
357                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
358                subset_ranges.len(),
359                only_overlap.len(),
360                subset_ranges,
361                only_overlap
362            );
363        } else {
364            let only_overlap = self
365                .ranges
366                .iter()
367                .filter(|r| {
368                    let r = TimeRange::from(*r);
369                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
370                })
371                .collect_vec();
372            error!(
373                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
374                cur_is_subset_to.len(),
375                only_overlap.len(),
376                cur_is_subset_to,
377                only_overlap
378            );
379        }
380        let all_overlap_working_range = self
381            .all_avail_working_range
382            .iter()
383            .filter(|(range, _)| range.is_overlapping(cur_range))
384            .map(|(range, _)| range)
385            .collect_vec();
386        error!(
387            "Found {} working ranges that overlap with current range: {:?}",
388            all_overlap_working_range.len(),
389            all_overlap_working_range
390        );
391    }
392
393    /// Poll the next RecordBatch from the merge-sort's output stream
394    fn poll_result_stream(
395        &mut self,
396        cx: &mut Context<'_>,
397    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
398        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
399            match merge_stream.as_mut().poll_next(cx) {
400                Poll::Ready(Some(Ok(batch))) => {
401                    let ret = if let Some(remaining) = self.remaining_fetch() {
402                        if remaining == 0 {
403                            self.is_terminated = true;
404                            None
405                        } else if remaining < batch.num_rows() {
406                            self.produced += remaining;
407                            Some(Ok(batch.slice(0, remaining)))
408                        } else {
409                            self.produced += batch.num_rows();
410                            Some(Ok(batch))
411                        }
412                    } else {
413                        self.produced += batch.num_rows();
414                        Some(Ok(batch))
415                    };
416                    return Poll::Ready(ret);
417                }
418                Poll::Ready(Some(Err(e))) => {
419                    return Poll::Ready(Some(Err(e)));
420                }
421                Poll::Ready(None) => {
422                    // current merge stream is done, we can start polling the next one
423
424                    self.merge_stream.pop_front();
425                    continue;
426                }
427                Poll::Pending => {
428                    return Poll::Pending;
429                }
430            }
431        }
432        // if no output stream is available
433        Poll::Ready(None)
434    }
435
436    /// The core logic of merging sort multiple sorted ranges
437    ///
438    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
439    pub fn poll_next_inner(
440        mut self: Pin<&mut Self>,
441        cx: &mut Context<'_>,
442    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
443        // first check and send out the merge result
444        match self.poll_result_stream(cx) {
445            Poll::Ready(None) => {
446                if self.is_terminated {
447                    return Poll::Ready(None);
448                }
449            }
450            x => return x,
451        };
452
453        // consume input stream
454        while !self.is_terminated {
455            // then we get a new RecordBatch from input stream
456            let SortedRunSet {
457                runs_with_batch,
458                sort_column,
459            } = match self.input.as_mut().poll_next(cx) {
460                Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
461                Poll::Ready(Some(Err(e))) => {
462                    return Poll::Ready(Some(Err(e)));
463                }
464                Poll::Ready(None) => {
465                    // input stream is done, we need to merge sort the remaining working set
466                    self.is_terminated = true;
467                    self.build_sorted_stream()?;
468                    self.start_new_merge_sort()?;
469                    break;
470                }
471                Poll::Pending => return Poll::Pending,
472            };
473
474            // The core logic to eargerly merge sort the working set
475
476            // compare with last_value to find boundary, then merge runs if needed
477
478            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
479            let mut last_remaining = None;
480            let mut run_iter = runs_with_batch.into_iter();
481            loop {
482                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
483                    break;
484                };
485                if sorted_rb.num_rows() == 0 {
486                    continue;
487                }
488                // determine if this batch is in current working range
489                let Some(cur_range) = run_info.get_time_range() else {
490                    internal_err!("Found NULL in time index column")?
491                };
492                let Some(working_range) = self.get_working_range() else {
493                    internal_err!("No working range found")?
494                };
495
496                // ensure the current batch is in the working range
497                if sort_column.options.unwrap_or_default().descending {
498                    if cur_range.end > working_range.end {
499                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
500                        #[cfg(debug_assertions)]
501                        self.check_subset_ranges(&cur_range);
502                        internal_err!(
503                            "Current batch have data on the right side of working range, something is very wrong"
504                        )?;
505                    }
506                } else if cur_range.start < working_range.start {
507                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
508                    #[cfg(debug_assertions)]
509                    self.check_subset_ranges(&cur_range);
510                    internal_err!(
511                        "Current batch have data on the left side of working range, something is very wrong"
512                    )?;
513                }
514
515                if cur_range.is_subset(&working_range) {
516                    // data still in range, can't merge sort yet
517                    // see if can concat entire sorted rb, merge sort need to wait
518                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
519                } else if let Some(intersection) = cur_range.intersection(&working_range) {
520                    // slice rb by intersection and concat it then merge sort
521                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
522                    let (offset, len) = find_slice_from_range(
523                        &SortColumn {
524                            values: cur_sort_column.clone(),
525                            options: sort_column.options,
526                        },
527                        &intersection,
528                    )?;
529
530                    if offset != 0 {
531                        internal_err!(
532                            "Current batch have data on the left side of working range, something is very wrong"
533                        )?;
534                    }
535
536                    let sliced_rb = sorted_rb.slice(offset, len);
537
538                    // try to concat the sliced input batch to the current `in_progress` run
539                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
540                    // since no more sorted data in this range will come in now, build stream now
541                    self.build_sorted_stream()?;
542
543                    // since we are crossing working range, we need to merge sort the working set
544                    self.start_new_merge_sort()?;
545
546                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
547                    if r_len != 0 {
548                        // we have remaining data in this batch, put it back to input queue
549                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
550                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
551                        let new_run_info = SucRun {
552                            offset: run_info.offset + r_offset,
553                            len: r_len,
554                            first_val: new_first_val,
555                            last_val: run_info.last_val,
556                        };
557                        last_remaining = Some((remaining_rb, new_run_info));
558                    }
559                    // deal with remaining batch cross working range problem
560                    // i.e: this example require more slice, and we are currently at point A
561                    // |---1---|       |---3---|
562                    // |-------A--2------------|
563                    //  put the remaining batch back to iter and deal it in next loop
564                } else {
565                    // no overlap, we can merge sort the working set
566
567                    self.build_sorted_stream()?;
568                    self.start_new_merge_sort()?;
569
570                    // always put it back to input queue until some batch is in working range
571                    last_remaining = Some((sorted_rb, run_info));
572                }
573            }
574
575            // poll result stream again to see if we can emit more results
576            match self.poll_result_stream(cx) {
577                Poll::Ready(None) => {
578                    if self.is_terminated {
579                        return Poll::Ready(None);
580                    }
581                }
582                x => return x,
583            };
584        }
585        // emit the merge result after terminated(all input stream is done)
586        self.poll_result_stream(cx)
587    }
588
589    fn push_batch(&mut self, batch: DfRecordBatch) {
590        self.in_progress.push(batch);
591    }
592
593    /// Try to concat the input batch to the current `in_progress` run
594    ///
595    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
596    fn try_concat_batch(
597        &mut self,
598        batch: DfRecordBatch,
599        run_info: &SucRun<Timestamp>,
600        opt: Option<SortOptions>,
601    ) -> datafusion_common::Result<()> {
602        let is_ok_to_concat =
603            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
604
605        if is_ok_to_concat {
606            self.push_batch(batch);
607            // next time we get input batch might still be ordered, so not build stream yet
608        } else {
609            // no more sorted data, build stream now
610            self.build_sorted_stream()?;
611            self.push_batch(batch);
612        }
613        self.last_value = run_info.last_val;
614        Ok(())
615    }
616
617    /// Get the current working range
618    fn get_working_range(&self) -> Option<TimeRange> {
619        self.all_avail_working_range
620            .get(self.working_idx)
621            .map(|(range, _)| *range)
622    }
623
624    /// Set current working range to the next working range
625    fn set_next_working_range(&mut self) {
626        self.working_idx += 1;
627    }
628
629    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
630    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
631        if self.in_progress.is_empty() {
632            return Ok(());
633        }
634        let data = std::mem::take(&mut self.in_progress);
635
636        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
637        self.sorted_input_runs.push(Box::pin(new_stream));
638        Ok(())
639    }
640
641    /// Start merging sort the current working set
642    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
643        if !self.in_progress.is_empty() {
644            return internal_err!("Starting a merge sort when in_progress is not empty")?;
645        }
646
647        self.set_next_working_range();
648
649        let streams = std::mem::take(&mut self.sorted_input_runs);
650        if streams.is_empty() {
651            return Ok(());
652        } else if streams.len() == 1 {
653            self.merge_stream
654                .push_back(streams.into_iter().next().unwrap());
655            return Ok(());
656        }
657
658        let fetch = self.remaining_fetch();
659        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
660            .register(&self.memory_pool);
661        self.merge_count += 1;
662
663        let resulting_stream = StreamingMergeBuilder::new()
664            .with_streams(streams)
665            .with_schema(self.schema())
666            .with_expressions(&[self.expression.clone()].into())
667            .with_metrics(self.metrics.clone())
668            .with_batch_size(self.batch_size)
669            .with_fetch(fetch)
670            .with_reservation(reservation)
671            .build()?;
672        self.merge_stream.push_back(resulting_stream);
673        // this working range is done, move to next working range
674        Ok(())
675    }
676
677    /// Remaining number of rows to fetch, if no fetch limit, return None
678    /// if fetch limit is reached, return Some(0)
679    fn remaining_fetch(&self) -> Option<usize> {
680        let total_now = self.produced;
681        self.fetch.map(|p| p.saturating_sub(total_now))
682    }
683}
684
685impl Stream for WindowedSortStream {
686    type Item = datafusion_common::Result<DfRecordBatch>;
687
688    fn poll_next(
689        mut self: Pin<&mut Self>,
690        cx: &mut Context<'_>,
691    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
692        let result = self.as_mut().poll_next_inner(cx);
693        self.metrics.record_poll(result)
694    }
695}
696
697impl RecordBatchStream for WindowedSortStream {
698    fn schema(&self) -> SchemaRef {
699        self.schema.clone()
700    }
701}
702
703/// split batch to sorted runs
704fn split_batch_to_sorted_run(
705    batch: DfRecordBatch,
706    expression: &PhysicalSortExpr,
707) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
708    // split input rb to sorted runs
709    let sort_column = expression.evaluate_to_sort_column(&batch)?;
710    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
711    if let Some(run) = sorted_runs_offset.first()
712        && sorted_runs_offset.len() == 1
713    {
714        if !(run.offset == 0 && run.len == batch.num_rows()) {
715            internal_err!(
716                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
717                run.offset,
718                run.len,
719                batch.num_rows()
720            )?;
721        }
722        // input rb is already sorted, we can emit it directly
723        Ok(SortedRunSet {
724            runs_with_batch: vec![(batch, run.clone())],
725            sort_column,
726        })
727    } else {
728        // those slice should be zero copy, so supposedly no new reservation needed
729        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
730        for run in sorted_runs_offset {
731            if run.offset + run.len > batch.num_rows() {
732                internal_err!(
733                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
734                    run.offset,
735                    run.len,
736                    batch.num_rows()
737                )?;
738            }
739            let new_rb = batch.slice(run.offset, run.len);
740            ret.push((new_rb, run));
741        }
742        Ok(SortedRunSet {
743            runs_with_batch: ret,
744            sort_column,
745        })
746    }
747}
748
749/// Downcast a temporal array to a specific type
750///
751/// usage similar to `downcast_primitive!` in `arrow-array` crate
752#[macro_export]
753macro_rules! downcast_ts_array {
754    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
755    {
756        match $data_type {
757            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
758                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
759            }
760            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
761                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
762            }
763            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
764                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
765            }
766            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
767                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
768            }
769            $($p => $fallback,)*
770        }
771    };
772}
773
774/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
775///
776/// Return the offset and length of the slice
777fn find_slice_from_range(
778    sort_column: &SortColumn,
779    range: &TimeRange,
780) -> datafusion_common::Result<(usize, usize)> {
781    let ty = sort_column.values.data_type();
782    let time_unit = if let DataType::Timestamp(unit, _) = ty {
783        unit
784    } else {
785        return Err(DataFusionError::Internal(format!(
786            "Unsupported sort column type: {}",
787            sort_column.values.data_type()
788        )));
789    };
790    let array = &sort_column.values;
791    let opt = &sort_column.options.unwrap_or_default();
792    let descending = opt.descending;
793
794    let typed_sorted_range = [range.start, range.end]
795        .iter()
796        .map(|t| {
797            t.convert_to(time_unit.into())
798                .ok_or_else(|| {
799                    DataFusionError::Internal(format!(
800                        "Failed to convert timestamp from {:?} to {:?}",
801                        t.unit(),
802                        time_unit
803                    ))
804                })
805                .and_then(|typed_ts| {
806                    let value = Value::Timestamp(typed_ts);
807                    value
808                        .try_to_scalar_value(&value.data_type())
809                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
810                })
811        })
812        .try_collect::<_, Vec<_>, _>()?;
813
814    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
815
816    // get slice that in which all data that `min_val<=data<max_val`
817    let (start, end) = if descending {
818        // note that `data < max_val`
819        // i,e, for max_val = 4, array = [5,3,2] should be start=1
820        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
821        let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
822        // min_val = 1, array = [3, 2, 1, 0], end = 3
823        // min_val = 1, array = [3, 2, 0], end = 2
824        let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
825        (start, end)
826    } else {
827        // min_val = 1, array = [1, 2, 3], start = 0
828        // min_val = 1, array = [0, 2, 3], start = 1
829        let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
830        // max_val = 3, array = [1, 3, 4], end = 1
831        // max_val = 3, array = [1, 2, 4], end = 2
832        let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
833        (start, end)
834    };
835
836    Ok((start, end - start))
837}
838
839/// Get an iterator from a primitive array.
840///
841/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
842#[macro_export]
843macro_rules! array_iter_helper {
844    ($t:ty, $unit:expr, $arr:expr) => {{
845        let typed = $arr
846            .as_any()
847            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
848            .unwrap();
849        let iter = typed.iter().enumerate();
850        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
851    }};
852}
853
854/// Compare with options, note None is considered as NULL here
855///
856/// default to null first
857fn cmp_with_opts<T: Ord>(
858    a: &Option<T>,
859    b: &Option<T>,
860    opt: &Option<SortOptions>,
861) -> std::cmp::Ordering {
862    let opt = opt.unwrap_or_default();
863
864    if let (Some(a), Some(b)) = (a, b) {
865        if opt.descending { b.cmp(a) } else { a.cmp(b) }
866    } else if opt.nulls_first {
867        // now we know at leatst one of them is None
868        // in rust None < Some(_)
869        a.cmp(b)
870    } else {
871        match (a, b) {
872            (Some(a), Some(b)) => a.cmp(b),
873            (Some(_), None) => std::cmp::Ordering::Less,
874            (None, Some(_)) => std::cmp::Ordering::Greater,
875            (None, None) => std::cmp::Ordering::Equal,
876        }
877    }
878}
879
880#[derive(Debug, Clone)]
881struct SortedRunSet<N: Ord> {
882    /// sorted runs with batch corresponding to them
883    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
884    /// sorted column from eval sorting expr
885    sort_column: SortColumn,
886}
887
888/// A struct to represent a successive run in the input iterator
889#[derive(Debug, Clone, PartialEq)]
890struct SucRun<N: Ord> {
891    /// offset of the first element in the run
892    offset: usize,
893    /// length of the run
894    len: usize,
895    /// first non-null value in the run
896    first_val: Option<N>,
897    /// last non-null value in the run
898    last_val: Option<N>,
899}
900
901impl SucRun<Timestamp> {
902    /// Get the time range of the run, which is [min_val, max_val + 1)
903    fn get_time_range(&self) -> Option<TimeRange> {
904        let start = self.first_val.min(self.last_val);
905        let end = self
906            .first_val
907            .max(self.last_val)
908            .map(|i| Timestamp::new(i.value() + 1, i.unit()));
909        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
910    }
911}
912
913/// find all successive runs in the input iterator
914fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
915    iter: T,
916    sort_opts: &Option<SortOptions>,
917) -> Vec<SucRun<N>> {
918    let mut runs = Vec::new();
919    let mut last_value = None;
920    let mut iter_len = None;
921
922    let mut last_offset = 0;
923    let mut first_val: Option<N> = None;
924    let mut last_val: Option<N> = None;
925
926    for (idx, t) in iter {
927        if let Some(last_value) = &last_value
928            && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
929        {
930            // we found a boundary
931            let len = idx - last_offset;
932            let run = SucRun {
933                offset: last_offset,
934                len,
935                first_val,
936                last_val,
937            };
938            runs.push(run);
939            first_val = None;
940            last_val = None;
941
942            last_offset = idx;
943        }
944        last_value = Some(t);
945        if let Some(t) = t {
946            first_val = first_val.or(Some(t));
947            last_val = Some(t).or(last_val);
948        }
949        iter_len = Some(idx);
950    }
951    let run = SucRun {
952        offset: last_offset,
953        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
954        first_val,
955        last_val,
956    };
957    runs.push(run);
958
959    runs
960}
961
962/// return a list of non-overlapping (offset, length) which represent sorted runs, and
963/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
964/// Returned runs will be as long as possible, and will not overlap with each other
965fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
966    let ty = sort_column.values.data_type();
967    if let DataType::Timestamp(unit, _) = ty {
968        let array = &sort_column.values;
969        let iter = downcast_ts_array!(
970            array.data_type() => (array_iter_helper, array),
971            _ => internal_err!("Unsupported sort column type: {ty}")?
972        );
973
974        let raw = find_successive_runs(iter, &sort_column.options);
975        let ts_runs = raw
976            .into_iter()
977            .map(|run| SucRun {
978                offset: run.offset,
979                len: run.len,
980                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
981                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
982            })
983            .collect_vec();
984        Ok(ts_runs)
985    } else {
986        Err(DataFusionError::Internal(format!(
987            "Unsupported sort column type: {ty}"
988        )))
989    }
990}
991
992/// Left(`start`) inclusive right(`end`) exclusive,
993///
994/// This is just tuple with extra methods
995#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
996struct TimeRange {
997    start: Timestamp,
998    end: Timestamp,
999}
1000
1001impl From<&PartitionRange> for TimeRange {
1002    fn from(range: &PartitionRange) -> Self {
1003        Self::new(range.start, range.end)
1004    }
1005}
1006
1007impl From<(Timestamp, Timestamp)> for TimeRange {
1008    fn from(range: (Timestamp, Timestamp)) -> Self {
1009        Self::new(range.0, range.1)
1010    }
1011}
1012
1013impl From<&(Timestamp, Timestamp)> for TimeRange {
1014    fn from(range: &(Timestamp, Timestamp)) -> Self {
1015        Self::new(range.0, range.1)
1016    }
1017}
1018
1019impl TimeRange {
1020    /// Create a new TimeRange, if start is greater than end, swap them
1021    fn new(start: Timestamp, end: Timestamp) -> Self {
1022        if start > end {
1023            Self {
1024                start: end,
1025                end: start,
1026            }
1027        } else {
1028            Self { start, end }
1029        }
1030    }
1031
1032    fn is_subset(&self, other: &Self) -> bool {
1033        self.start >= other.start && self.end <= other.end
1034    }
1035
1036    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1037    fn is_overlapping(&self, other: &Self) -> bool {
1038        !(self.start >= other.end || self.end <= other.start)
1039    }
1040
1041    fn intersection(&self, other: &Self) -> Option<Self> {
1042        if self.is_overlapping(other) {
1043            Some(Self::new(
1044                self.start.max(other.start),
1045                self.end.min(other.end),
1046            ))
1047        } else {
1048            None
1049        }
1050    }
1051
1052    fn difference(&self, other: &Self) -> Vec<Self> {
1053        if !self.is_overlapping(other) {
1054            vec![*self]
1055        } else {
1056            let mut ret = Vec::new();
1057            if self.start < other.start && self.end > other.end {
1058                ret.push(Self::new(self.start, other.start));
1059                ret.push(Self::new(other.end, self.end));
1060            } else if self.start < other.start {
1061                ret.push(Self::new(self.start, other.start));
1062            } else if self.end > other.end {
1063                ret.push(Self::new(other.end, self.end));
1064            }
1065            ret
1066        }
1067    }
1068}
1069
1070/// split input range by `split_by` range to one, two or three parts.
1071fn split_range_by(
1072    input_range: &TimeRange,
1073    input_parts: &[usize],
1074    split_by: &TimeRange,
1075    split_idx: usize,
1076) -> Vec<Action> {
1077    let mut ret = Vec::new();
1078    if input_range.is_overlapping(split_by) {
1079        let input_parts = input_parts.to_vec();
1080        let new_parts = {
1081            let mut new_parts = input_parts.clone();
1082            new_parts.push(split_idx);
1083            new_parts
1084        };
1085
1086        ret.push(Action::Pop(*input_range));
1087        if let Some(intersection) = input_range.intersection(split_by) {
1088            ret.push(Action::Push(intersection, new_parts.clone()));
1089        }
1090        for diff in input_range.difference(split_by) {
1091            ret.push(Action::Push(diff, input_parts.clone()));
1092        }
1093    }
1094    ret
1095}
1096
1097#[derive(Debug, Clone, PartialEq, Eq)]
1098enum Action {
1099    Pop(TimeRange),
1100    Push(TimeRange, Vec<usize>),
1101}
1102
1103/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1104///
1105/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1106///
1107/// hence we can merge sort current working range once that happens
1108///
1109/// if `descending` is true, the working ranges will be in descending order
1110fn compute_all_working_ranges(
1111    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1112    descending: bool,
1113) -> Vec<(TimeRange, BTreeSet<usize>)> {
1114    let mut ret = Vec::new();
1115    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1116    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1117        Box::new(overlap_counts.iter().rev()) as _
1118    } else {
1119        Box::new(overlap_counts.iter()) as _
1120    };
1121    for (range, set) in overlap_iter {
1122        match &mut cur_range_set {
1123            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1124            Some((working_range, working_set)) => {
1125                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1126                // to be read before merge sorting), and `working_set` have >1 count
1127                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1128                // so that merge sort is possible
1129                let need_expand = {
1130                    let last_part = working_set.last();
1131                    let inter: BTreeSet<usize> = working_set
1132                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1133                        .cloned()
1134                        .collect();
1135                    if let Some(one) = inter.first()
1136                        && inter.len() == 1
1137                        && Some(one) == last_part
1138                    {
1139                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1140                        if set.iter().all(|p| Some(p) >= last_part) {
1141                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1142                            false
1143                        } else {
1144                            // elsewise, we need to expand working range to include next overlap range
1145                            true
1146                        }
1147                    } else if inter.is_empty() {
1148                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1149                        false
1150                    } else {
1151                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1152                        true
1153                    }
1154                };
1155
1156                if need_expand {
1157                    if descending {
1158                        working_range.start = range.start;
1159                    } else {
1160                        working_range.end = range.end;
1161                    }
1162                    working_set.extend(set.iter().cloned());
1163                } else {
1164                    ret.push((*working_range, std::mem::take(working_set)));
1165                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1166                }
1167            }
1168        }
1169    }
1170
1171    if let Some(cur_range_set) = cur_range_set {
1172        ret.push(cur_range_set)
1173    }
1174
1175    ret
1176}
1177
1178/// return a map of non-overlapping ranges and their corresponding index
1179/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1180fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1181    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1182    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1183    for (idx, range) in ranges.iter().enumerate() {
1184        let key: TimeRange = (range.start, range.end).into();
1185        let mut actions = Vec::new();
1186        let mut untouched = vec![key];
1187        // create a forward and backward iterator to find all overlapping ranges
1188        // given that tuple is sorted in lexicographical order and promise to not overlap,
1189        // since range is sorted that way, we can stop when we find a non-overlapping range
1190        let forward_iter = ret
1191            .range(key..)
1192            .take_while(|(range, _)| range.is_overlapping(&key));
1193        let backward_iter = ret
1194            .range(..key)
1195            .rev()
1196            .take_while(|(range, _)| range.is_overlapping(&key));
1197
1198        for (range, parts) in forward_iter.chain(backward_iter) {
1199            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1200            let act = split_range_by(range, parts, &key, idx);
1201            actions.extend(act.into_iter());
1202        }
1203
1204        for action in actions {
1205            match action {
1206                Action::Pop(range) => {
1207                    ret.remove(&range);
1208                }
1209                Action::Push(range, parts) => {
1210                    ret.insert(range, parts);
1211                }
1212            }
1213        }
1214
1215        // insert untouched ranges
1216        for range in untouched {
1217            ret.insert(range, vec![idx]);
1218        }
1219    }
1220    ret
1221}
1222
1223/// Get timestamp from array at offset
1224fn get_timestamp_from_idx(
1225    array: &ArrayRef,
1226    offset: usize,
1227) -> datafusion_common::Result<Option<Timestamp>> {
1228    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1229        unit
1230    } else {
1231        return Err(DataFusionError::Internal(format!(
1232            "Unsupported sort column type: {}",
1233            array.data_type()
1234        )));
1235    };
1236    let ty = array.data_type();
1237    let array = array.slice(offset, 1);
1238    let mut iter = downcast_ts_array!(
1239        array.data_type() => (array_iter_helper, array),
1240        _ => internal_err!("Unsupported sort column type: {ty}")?
1241    );
1242    let (_idx, val) = iter.next().ok_or_else(|| {
1243        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1244    })?;
1245    let val = if let Some(val) = val {
1246        val
1247    } else {
1248        return Ok(None);
1249    };
1250    let gt_timestamp = Timestamp::new(val, time_unit.into());
1251    Ok(Some(gt_timestamp))
1252}
1253
1254#[cfg(test)]
1255mod test {
1256    use std::io::Write;
1257    use std::sync::Arc;
1258
1259    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1260    use arrow::compute::concat_batches;
1261    use arrow::json::ArrayWriter;
1262    use arrow_schema::{Field, Schema, TimeUnit};
1263    use futures::StreamExt;
1264    use pretty_assertions::assert_eq;
1265    use serde_json::json;
1266
1267    use super::*;
1268    use crate::test_util::{MockInputExec, new_ts_array};
1269
1270    // Test helpers to reduce duplication
1271    mod helpers {
1272        use datafusion::physical_plan::expressions::Column;
1273
1274        use super::*;
1275
1276        pub fn default_sort_opts(descending: bool) -> SortOptions {
1277            SortOptions {
1278                descending,
1279                nulls_first: true,
1280            }
1281        }
1282
1283        pub fn ts_field(unit: TimeUnit) -> Field {
1284            Field::new("ts", DataType::Timestamp(unit, None), false)
1285        }
1286
1287        pub fn ts_column() -> Column {
1288            Column::new("ts", 0)
1289        }
1290
1291        pub fn partition_range(start: i64, end: i64, num_rows: usize, id: usize) -> PartitionRange {
1292            PartitionRange {
1293                start: Timestamp::new_millisecond(start),
1294                end: Timestamp::new_millisecond(end),
1295                num_rows,
1296                identifier: id,
1297            }
1298        }
1299
1300        pub fn ts_array(values: impl IntoIterator<Item = i64>) -> ArrayRef {
1301            Arc::new(TimestampMillisecondArray::from_iter_values(values))
1302        }
1303    }
1304
1305    #[test]
1306    fn test_overlapping() {
1307        let testcases = [
1308            (
1309                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1310                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1311                false,
1312            ),
1313            (
1314                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1315                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1316                true,
1317            ),
1318            (
1319                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1320                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1321                true,
1322            ),
1323            (
1324                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1325                (
1326                    Timestamp::new_millisecond(1000),
1327                    Timestamp::new_millisecond(1002),
1328                ),
1329                true,
1330            ),
1331            (
1332                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1333                (
1334                    Timestamp::new_millisecond(1001),
1335                    Timestamp::new_millisecond(1002),
1336                ),
1337                false,
1338            ),
1339            (
1340                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1341                (
1342                    Timestamp::new_millisecond(1002),
1343                    Timestamp::new_millisecond(1003),
1344                ),
1345                false,
1346            ),
1347        ];
1348
1349        for (range1, range2, expected) in testcases.iter() {
1350            assert_eq!(
1351                TimeRange::from(range1).is_overlapping(&range2.into()),
1352                *expected,
1353                "range1: {:?}, range2: {:?}",
1354                range1,
1355                range2
1356            );
1357        }
1358    }
1359
1360    #[test]
1361    fn test_split() {
1362        let testcases = [
1363            // no split
1364            (
1365                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1366                vec![0],
1367                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1368                1,
1369                vec![],
1370            ),
1371            // one part
1372            (
1373                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1374                vec![0],
1375                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1376                1,
1377                vec![
1378                    Action::Pop(
1379                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1380                    ),
1381                    Action::Push(
1382                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1383                        vec![0, 1],
1384                    ),
1385                ],
1386            ),
1387            (
1388                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1389                vec![0],
1390                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1391                1,
1392                vec![
1393                    Action::Pop(
1394                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1395                    ),
1396                    Action::Push(
1397                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1398                        vec![0, 1],
1399                    ),
1400                ],
1401            ),
1402            (
1403                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1404                vec![0],
1405                (
1406                    Timestamp::new_millisecond(1000),
1407                    Timestamp::new_millisecond(1002),
1408                ),
1409                1,
1410                vec![
1411                    Action::Pop(
1412                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1413                    ),
1414                    Action::Push(
1415                        (
1416                            Timestamp::new_millisecond(1000),
1417                            Timestamp::new_millisecond(1001),
1418                        )
1419                            .into(),
1420                        vec![0, 1],
1421                    ),
1422                ],
1423            ),
1424            // two part
1425            (
1426                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1427                vec![0],
1428                (
1429                    Timestamp::new_millisecond(1001),
1430                    Timestamp::new_millisecond(1002),
1431                ),
1432                1,
1433                vec![
1434                    Action::Pop(
1435                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1436                    ),
1437                    Action::Push(
1438                        (
1439                            Timestamp::new_millisecond(1001),
1440                            Timestamp::new_millisecond(1002),
1441                        )
1442                            .into(),
1443                        vec![0, 1],
1444                    ),
1445                    Action::Push(
1446                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1447                        vec![0],
1448                    ),
1449                ],
1450            ),
1451            // three part
1452            (
1453                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1454                vec![0],
1455                (
1456                    Timestamp::new_millisecond(1001),
1457                    Timestamp::new_millisecond(1002),
1458                ),
1459                1,
1460                vec![
1461                    Action::Pop(
1462                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1463                    ),
1464                    Action::Push(
1465                        (
1466                            Timestamp::new_millisecond(1001),
1467                            Timestamp::new_millisecond(1002),
1468                        )
1469                            .into(),
1470                        vec![0, 1],
1471                    ),
1472                    Action::Push(
1473                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1474                        vec![0],
1475                    ),
1476                    Action::Push(
1477                        (
1478                            Timestamp::new_millisecond(1002),
1479                            Timestamp::new_millisecond(1004),
1480                        )
1481                            .into(),
1482                        vec![0],
1483                    ),
1484                ],
1485            ),
1486        ];
1487
1488        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1489            assert_eq!(
1490                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1491                *expected,
1492                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1493                range,
1494                parts,
1495                split_by,
1496                split_idx
1497            );
1498        }
1499    }
1500
1501    #[allow(clippy::type_complexity)]
1502    fn run_compute_working_ranges_test(
1503        testcases: Vec<(
1504            BTreeMap<(Timestamp, Timestamp), Vec<usize>>,
1505            Vec<((Timestamp, Timestamp), BTreeSet<usize>)>,
1506        )>,
1507        descending: bool,
1508    ) {
1509        for (input, expected) in testcases {
1510            let expected = expected
1511                .into_iter()
1512                .map(|(r, s)| (r.into(), s))
1513                .collect_vec();
1514            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1515            assert_eq!(
1516                compute_all_working_ranges(&input, descending),
1517                expected,
1518                "input: {:?}, descending: {}",
1519                input,
1520                descending
1521            );
1522        }
1523    }
1524
1525    #[test]
1526    fn test_compute_working_ranges_descending() {
1527        let testcases = vec![
1528            (
1529                BTreeMap::from([(
1530                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1531                    vec![0],
1532                )]),
1533                vec![(
1534                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1535                    BTreeSet::from([0]),
1536                )],
1537            ),
1538            (
1539                BTreeMap::from([(
1540                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1541                    vec![0, 1],
1542                )]),
1543                vec![(
1544                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1545                    BTreeSet::from([0, 1]),
1546                )],
1547            ),
1548            (
1549                BTreeMap::from([
1550                    (
1551                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1552                        vec![0],
1553                    ),
1554                    (
1555                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1556                        vec![0, 1],
1557                    ),
1558                ]),
1559                vec![
1560                    (
1561                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1562                        BTreeSet::from([0]),
1563                    ),
1564                    (
1565                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1566                        BTreeSet::from([0, 1]),
1567                    ),
1568                ],
1569            ),
1570            (
1571                BTreeMap::from([
1572                    (
1573                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1574                        vec![0, 1],
1575                    ),
1576                    (
1577                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1578                        vec![1],
1579                    ),
1580                ]),
1581                vec![
1582                    (
1583                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1584                        BTreeSet::from([0, 1]),
1585                    ),
1586                    (
1587                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1588                        BTreeSet::from([1]),
1589                    ),
1590                ],
1591            ),
1592            (
1593                BTreeMap::from([
1594                    (
1595                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1596                        vec![0],
1597                    ),
1598                    (
1599                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1600                        vec![0, 1],
1601                    ),
1602                    (
1603                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1604                        vec![1],
1605                    ),
1606                ]),
1607                vec![
1608                    (
1609                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1610                        BTreeSet::from([0]),
1611                    ),
1612                    (
1613                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1614                        BTreeSet::from([0, 1]),
1615                    ),
1616                    (
1617                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1618                        BTreeSet::from([1]),
1619                    ),
1620                ],
1621            ),
1622            (
1623                BTreeMap::from([
1624                    (
1625                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1626                        vec![0, 2],
1627                    ),
1628                    (
1629                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1630                        vec![0, 1, 2],
1631                    ),
1632                    (
1633                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1634                        vec![1, 2],
1635                    ),
1636                ]),
1637                vec![(
1638                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1639                    BTreeSet::from([0, 1, 2]),
1640                )],
1641            ),
1642            (
1643                BTreeMap::from([
1644                    (
1645                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1646                        vec![0, 2],
1647                    ),
1648                    (
1649                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1650                        vec![1, 2],
1651                    ),
1652                ]),
1653                vec![(
1654                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1655                    BTreeSet::from([0, 1, 2]),
1656                )],
1657            ),
1658            (
1659                BTreeMap::from([
1660                    (
1661                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1662                        vec![0, 1],
1663                    ),
1664                    (
1665                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1666                        vec![0, 1, 2],
1667                    ),
1668                    (
1669                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1670                        vec![1, 2],
1671                    ),
1672                ]),
1673                vec![(
1674                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1675                    BTreeSet::from([0, 1, 2]),
1676                )],
1677            ),
1678            (
1679                BTreeMap::from([
1680                    (
1681                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1682                        vec![0, 1],
1683                    ),
1684                    (
1685                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1686                        vec![1, 2],
1687                    ),
1688                ]),
1689                vec![
1690                    (
1691                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1692                        BTreeSet::from([0, 1]),
1693                    ),
1694                    (
1695                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1696                        BTreeSet::from([1, 2]),
1697                    ),
1698                ],
1699            ),
1700            // non-overlapping
1701            (
1702                BTreeMap::from([
1703                    (
1704                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1705                        vec![0],
1706                    ),
1707                    (
1708                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1709                        vec![1, 2],
1710                    ),
1711                ]),
1712                vec![
1713                    (
1714                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1715                        BTreeSet::from([0]),
1716                    ),
1717                    (
1718                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1719                        BTreeSet::from([1, 2]),
1720                    ),
1721                ],
1722            ),
1723        ];
1724
1725        run_compute_working_ranges_test(testcases, true);
1726    }
1727
1728    #[test]
1729    fn test_compute_working_ranges_ascending() {
1730        let testcases = vec![
1731            (
1732                BTreeMap::from([(
1733                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1734                    vec![0],
1735                )]),
1736                vec![(
1737                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1738                    BTreeSet::from([0]),
1739                )],
1740            ),
1741            (
1742                BTreeMap::from([(
1743                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1744                    vec![0, 1],
1745                )]),
1746                vec![(
1747                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1748                    BTreeSet::from([0, 1]),
1749                )],
1750            ),
1751            (
1752                BTreeMap::from([
1753                    (
1754                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1755                        vec![0, 1],
1756                    ),
1757                    (
1758                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1759                        vec![1],
1760                    ),
1761                ]),
1762                vec![
1763                    (
1764                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1765                        BTreeSet::from([0, 1]),
1766                    ),
1767                    (
1768                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1769                        BTreeSet::from([1]),
1770                    ),
1771                ],
1772            ),
1773            (
1774                BTreeMap::from([
1775                    (
1776                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1777                        vec![0],
1778                    ),
1779                    (
1780                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1781                        vec![0, 1],
1782                    ),
1783                ]),
1784                vec![
1785                    (
1786                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1787                        BTreeSet::from([0]),
1788                    ),
1789                    (
1790                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1791                        BTreeSet::from([0, 1]),
1792                    ),
1793                ],
1794            ),
1795            // test if only one count in working set get it's own working range
1796            (
1797                BTreeMap::from([
1798                    (
1799                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1800                        vec![0],
1801                    ),
1802                    (
1803                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1804                        vec![0, 1],
1805                    ),
1806                    (
1807                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1808                        vec![1],
1809                    ),
1810                ]),
1811                vec![
1812                    (
1813                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1814                        BTreeSet::from([0]),
1815                    ),
1816                    (
1817                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1818                        BTreeSet::from([0, 1]),
1819                    ),
1820                    (
1821                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1822                        BTreeSet::from([1]),
1823                    ),
1824                ],
1825            ),
1826            (
1827                BTreeMap::from([
1828                    (
1829                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1830                        vec![0, 2],
1831                    ),
1832                    (
1833                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1834                        vec![0, 1, 2],
1835                    ),
1836                    (
1837                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1838                        vec![1, 2],
1839                    ),
1840                ]),
1841                vec![(
1842                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1843                    BTreeSet::from([0, 1, 2]),
1844                )],
1845            ),
1846            (
1847                BTreeMap::from([
1848                    (
1849                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1850                        vec![0, 2],
1851                    ),
1852                    (
1853                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1854                        vec![1, 2],
1855                    ),
1856                ]),
1857                vec![(
1858                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1859                    BTreeSet::from([0, 1, 2]),
1860                )],
1861            ),
1862            (
1863                BTreeMap::from([
1864                    (
1865                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1866                        vec![0, 1],
1867                    ),
1868                    (
1869                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1870                        vec![0, 1, 2],
1871                    ),
1872                    (
1873                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1874                        vec![1, 2],
1875                    ),
1876                ]),
1877                vec![(
1878                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1879                    BTreeSet::from([0, 1, 2]),
1880                )],
1881            ),
1882            (
1883                BTreeMap::from([
1884                    (
1885                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1886                        vec![0, 1],
1887                    ),
1888                    (
1889                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1890                        vec![1, 2],
1891                    ),
1892                ]),
1893                vec![
1894                    (
1895                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1896                        BTreeSet::from([0, 1]),
1897                    ),
1898                    (
1899                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1900                        BTreeSet::from([1, 2]),
1901                    ),
1902                ],
1903            ),
1904            // non-overlapping
1905            (
1906                BTreeMap::from([
1907                    (
1908                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1909                        vec![0, 1],
1910                    ),
1911                    (
1912                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1913                        vec![2],
1914                    ),
1915                ]),
1916                vec![
1917                    (
1918                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1919                        BTreeSet::from([0, 1]),
1920                    ),
1921                    (
1922                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1923                        BTreeSet::from([2]),
1924                    ),
1925                ],
1926            ),
1927        ];
1928
1929        run_compute_working_ranges_test(testcases, false);
1930    }
1931
1932    #[test]
1933    fn test_split_overlap_range() {
1934        let testcases = vec![
1935            // simple one range
1936            (
1937                vec![PartitionRange {
1938                    start: Timestamp::new_second(1),
1939                    end: Timestamp::new_second(2),
1940                    num_rows: 2,
1941                    identifier: 0,
1942                }],
1943                BTreeMap::from_iter(
1944                    vec![(
1945                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1946                        vec![0],
1947                    )]
1948                    .into_iter(),
1949                ),
1950            ),
1951            // two overlapping range
1952            (
1953                vec![
1954                    PartitionRange {
1955                        start: Timestamp::new_second(1),
1956                        end: Timestamp::new_second(2),
1957                        num_rows: 2,
1958                        identifier: 0,
1959                    },
1960                    PartitionRange {
1961                        start: Timestamp::new_second(1),
1962                        end: Timestamp::new_second(2),
1963                        num_rows: 2,
1964                        identifier: 1,
1965                    },
1966                ],
1967                BTreeMap::from_iter(
1968                    vec![(
1969                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1970                        vec![0, 1],
1971                    )]
1972                    .into_iter(),
1973                ),
1974            ),
1975            (
1976                vec![
1977                    PartitionRange {
1978                        start: Timestamp::new_second(1),
1979                        end: Timestamp::new_second(3),
1980                        num_rows: 2,
1981                        identifier: 0,
1982                    },
1983                    PartitionRange {
1984                        start: Timestamp::new_second(2),
1985                        end: Timestamp::new_second(4),
1986                        num_rows: 2,
1987                        identifier: 1,
1988                    },
1989                ],
1990                BTreeMap::from_iter(
1991                    vec![
1992                        (
1993                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1994                            vec![0],
1995                        ),
1996                        (
1997                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1998                            vec![0, 1],
1999                        ),
2000                        (
2001                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2002                            vec![1],
2003                        ),
2004                    ]
2005                    .into_iter(),
2006                ),
2007            ),
2008            // three or more overlapping range
2009            (
2010                vec![
2011                    PartitionRange {
2012                        start: Timestamp::new_second(1),
2013                        end: Timestamp::new_second(3),
2014                        num_rows: 2,
2015                        identifier: 0,
2016                    },
2017                    PartitionRange {
2018                        start: Timestamp::new_second(2),
2019                        end: Timestamp::new_second(4),
2020                        num_rows: 2,
2021                        identifier: 1,
2022                    },
2023                    PartitionRange {
2024                        start: Timestamp::new_second(1),
2025                        end: Timestamp::new_second(4),
2026                        num_rows: 2,
2027                        identifier: 2,
2028                    },
2029                ],
2030                BTreeMap::from_iter(
2031                    vec![
2032                        (
2033                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2034                            vec![0, 2],
2035                        ),
2036                        (
2037                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2038                            vec![0, 1, 2],
2039                        ),
2040                        (
2041                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2042                            vec![1, 2],
2043                        ),
2044                    ]
2045                    .into_iter(),
2046                ),
2047            ),
2048            (
2049                vec![
2050                    PartitionRange {
2051                        start: Timestamp::new_second(1),
2052                        end: Timestamp::new_second(3),
2053                        num_rows: 2,
2054                        identifier: 0,
2055                    },
2056                    PartitionRange {
2057                        start: Timestamp::new_second(1),
2058                        end: Timestamp::new_second(4),
2059                        num_rows: 2,
2060                        identifier: 1,
2061                    },
2062                    PartitionRange {
2063                        start: Timestamp::new_second(2),
2064                        end: Timestamp::new_second(4),
2065                        num_rows: 2,
2066                        identifier: 2,
2067                    },
2068                ],
2069                BTreeMap::from_iter(
2070                    vec![
2071                        (
2072                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2073                            vec![0, 1],
2074                        ),
2075                        (
2076                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2077                            vec![0, 1, 2],
2078                        ),
2079                        (
2080                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2081                            vec![1, 2],
2082                        ),
2083                    ]
2084                    .into_iter(),
2085                ),
2086            ),
2087        ];
2088
2089        for (input, expected) in testcases {
2090            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2091            assert_eq!(split_overlapping_ranges(&input), expected);
2092        }
2093    }
2094
2095    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2096        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2097            Self {
2098                offset: offset as usize,
2099                len: len as usize,
2100                first_val: min_val,
2101                last_val: max_val,
2102            }
2103        }
2104    }
2105
2106    #[test]
2107    fn test_find_successive_runs() {
2108        let testcases = vec![
2109            (
2110                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2111                Some(SortOptions {
2112                    descending: false,
2113                    nulls_first: false,
2114                }),
2115                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2116            ),
2117            (
2118                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2119                Some(SortOptions {
2120                    descending: false,
2121                    nulls_first: false,
2122                }),
2123                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2124            ),
2125            (
2126                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2127                Some(SortOptions {
2128                    descending: false,
2129                    nulls_first: false,
2130                }),
2131                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2132            ),
2133            (
2134                vec![Some(1), Some(2), Some(1), Some(3)],
2135                Some(SortOptions {
2136                    descending: false,
2137                    nulls_first: false,
2138                }),
2139                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2140            ),
2141            (
2142                vec![Some(1), Some(2), Some(1), Some(3)],
2143                Some(SortOptions {
2144                    descending: true,
2145                    nulls_first: false,
2146                }),
2147                vec![
2148                    (0, 1, Some(1), Some(1)),
2149                    (1, 2, Some(2), Some(1)),
2150                    (3, 1, Some(3), Some(3)),
2151                ],
2152            ),
2153            (
2154                vec![Some(1), Some(2), None, Some(3)],
2155                Some(SortOptions {
2156                    descending: false,
2157                    nulls_first: true,
2158                }),
2159                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2160            ),
2161            (
2162                vec![Some(1), Some(2), None, Some(3)],
2163                Some(SortOptions {
2164                    descending: false,
2165                    nulls_first: false,
2166                }),
2167                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2168            ),
2169            (
2170                vec![Some(2), Some(1), None, Some(3)],
2171                Some(SortOptions {
2172                    descending: true,
2173                    nulls_first: true,
2174                }),
2175                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2176            ),
2177            (
2178                vec![],
2179                Some(SortOptions {
2180                    descending: false,
2181                    nulls_first: true,
2182                }),
2183                vec![(0, 0, None, None)],
2184            ),
2185            (
2186                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2187                Some(SortOptions {
2188                    descending: true,
2189                    nulls_first: true,
2190                }),
2191                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2192            ),
2193            (
2194                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2195                Some(SortOptions {
2196                    descending: true,
2197                    nulls_first: false,
2198                }),
2199                vec![
2200                    (0, 2, None, None),
2201                    (2, 3, Some(2), Some(1)),
2202                    (5, 2, Some(5), Some(4)),
2203                ],
2204            ),
2205        ];
2206        for (input, sort_opts, expected) in testcases {
2207            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2208            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2209            assert_eq!(
2210                ret, expected,
2211                "input: {:?}, opt: {:?},expected: {:?}",
2212                input, sort_opts, expected
2213            );
2214        }
2215    }
2216
2217    #[test]
2218    fn test_cmp_with_opts() {
2219        let testcases = vec![
2220            // Test ascending vs descending for Some values
2221            (
2222                Some(1),
2223                Some(2),
2224                Some(SortOptions {
2225                    descending: false,
2226                    nulls_first: false,
2227                }),
2228                std::cmp::Ordering::Less,
2229            ),
2230            (
2231                Some(1),
2232                Some(2),
2233                Some(SortOptions {
2234                    descending: true,
2235                    nulls_first: false,
2236                }),
2237                std::cmp::Ordering::Greater,
2238            ),
2239            // Test Some vs None with nulls_first
2240            (
2241                Some(1),
2242                None,
2243                Some(SortOptions {
2244                    descending: false,
2245                    nulls_first: true,
2246                }),
2247                std::cmp::Ordering::Greater,
2248            ),
2249            (
2250                Some(1),
2251                None,
2252                Some(SortOptions {
2253                    descending: true,
2254                    nulls_first: true,
2255                }),
2256                std::cmp::Ordering::Greater,
2257            ),
2258            // Test Some vs None with nulls_last
2259            (
2260                Some(1),
2261                None,
2262                Some(SortOptions {
2263                    descending: true,
2264                    nulls_first: false,
2265                }),
2266                std::cmp::Ordering::Less,
2267            ),
2268            (
2269                Some(1),
2270                None,
2271                Some(SortOptions {
2272                    descending: false,
2273                    nulls_first: false,
2274                }),
2275                std::cmp::Ordering::Less,
2276            ),
2277            // Test None vs None - always Equal regardless of sort options
2278            (
2279                None,
2280                None,
2281                Some(SortOptions {
2282                    descending: false,
2283                    nulls_first: true,
2284                }),
2285                std::cmp::Ordering::Equal,
2286            ),
2287        ];
2288        for (a, b, opts, expected) in testcases {
2289            assert_eq!(
2290                cmp_with_opts(&a, &b, &opts),
2291                expected,
2292                "a: {:?}, b: {:?}, opts: {:?}",
2293                a,
2294                b,
2295                opts
2296            );
2297        }
2298    }
2299
2300    #[test]
2301    fn test_find_slice_from_range() {
2302        let test_cases = vec![
2303            // test for off by one case
2304            (
2305                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2306                false,
2307                TimeRange {
2308                    start: Timestamp::new_millisecond(2),
2309                    end: Timestamp::new_millisecond(4),
2310                },
2311                Ok((1, 2)),
2312            ),
2313            (
2314                Arc::new(TimestampMillisecondArray::from_iter_values([
2315                    -2, -1, 0, 1, 2, 3, 4, 5,
2316                ])) as ArrayRef,
2317                false,
2318                TimeRange {
2319                    start: Timestamp::new_millisecond(-1),
2320                    end: Timestamp::new_millisecond(4),
2321                },
2322                Ok((1, 5)),
2323            ),
2324            (
2325                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2326                false,
2327                TimeRange {
2328                    start: Timestamp::new_millisecond(2),
2329                    end: Timestamp::new_millisecond(5),
2330                },
2331                Ok((1, 2)),
2332            ),
2333            (
2334                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2335                false,
2336                TimeRange {
2337                    start: Timestamp::new_millisecond(2),
2338                    end: Timestamp::new_millisecond(5),
2339                },
2340                Ok((1, 3)),
2341            ),
2342            (
2343                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2344                false,
2345                TimeRange {
2346                    start: Timestamp::new_millisecond(2),
2347                    end: Timestamp::new_millisecond(5),
2348                },
2349                Ok((1, 2)),
2350            ),
2351            (
2352                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2353                false,
2354                TimeRange {
2355                    start: Timestamp::new_millisecond(6),
2356                    end: Timestamp::new_millisecond(7),
2357                },
2358                Ok((5, 0)),
2359            ),
2360            // descending off by one cases
2361            (
2362                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2363                true,
2364                TimeRange {
2365                    end: Timestamp::new_millisecond(4),
2366                    start: Timestamp::new_millisecond(1),
2367                },
2368                Ok((1, 3)),
2369            ),
2370            (
2371                Arc::new(TimestampMillisecondArray::from_iter_values([
2372                    5, 4, 3, 2, 1, 0,
2373                ])) as ArrayRef,
2374                true,
2375                TimeRange {
2376                    end: Timestamp::new_millisecond(4),
2377                    start: Timestamp::new_millisecond(1),
2378                },
2379                Ok((2, 3)),
2380            ),
2381            (
2382                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2383                true,
2384                TimeRange {
2385                    end: Timestamp::new_millisecond(4),
2386                    start: Timestamp::new_millisecond(1),
2387                },
2388                Ok((1, 2)),
2389            ),
2390            (
2391                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2392                true,
2393                TimeRange {
2394                    end: Timestamp::new_millisecond(4),
2395                    start: Timestamp::new_millisecond(1),
2396                },
2397                Ok((2, 2)),
2398            ),
2399            (
2400                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2401                true,
2402                TimeRange {
2403                    end: Timestamp::new_millisecond(5),
2404                    start: Timestamp::new_millisecond(2),
2405                },
2406                Ok((1, 3)),
2407            ),
2408            (
2409                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2410                true,
2411                TimeRange {
2412                    end: Timestamp::new_millisecond(5),
2413                    start: Timestamp::new_millisecond(2),
2414                },
2415                Ok((1, 2)),
2416            ),
2417            (
2418                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2419                true,
2420                TimeRange {
2421                    end: Timestamp::new_millisecond(5),
2422                    start: Timestamp::new_millisecond(2),
2423                },
2424                Ok((1, 3)),
2425            ),
2426            (
2427                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2428                true,
2429                TimeRange {
2430                    end: Timestamp::new_millisecond(5),
2431                    start: Timestamp::new_millisecond(2),
2432                },
2433                Ok((1, 2)),
2434            ),
2435            (
2436                Arc::new(TimestampMillisecondArray::from_iter_values([
2437                    10, 9, 8, 7, 6,
2438                ])) as ArrayRef,
2439                true,
2440                TimeRange {
2441                    end: Timestamp::new_millisecond(5),
2442                    start: Timestamp::new_millisecond(2),
2443                },
2444                Ok((5, 0)),
2445            ),
2446            // test off by one case
2447            (
2448                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2449                true,
2450                TimeRange {
2451                    end: Timestamp::new_millisecond(4),
2452                    start: Timestamp::new_millisecond(3),
2453                },
2454                Ok((0, 1)),
2455            ),
2456            (
2457                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2458                true,
2459                TimeRange {
2460                    end: Timestamp::new_millisecond(4),
2461                    start: Timestamp::new_millisecond(3),
2462                },
2463                Ok((1, 1)),
2464            ),
2465            (
2466                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2467                true,
2468                TimeRange {
2469                    end: Timestamp::new_millisecond(4),
2470                    start: Timestamp::new_millisecond(3),
2471                },
2472                Ok((2, 1)),
2473            ),
2474        ];
2475
2476        for (sort_vals, descending, range, expected) in test_cases {
2477            let sort_column = SortColumn {
2478                values: sort_vals,
2479                options: Some(SortOptions {
2480                    descending,
2481                    ..Default::default()
2482                }),
2483            };
2484            let ret = find_slice_from_range(&sort_column, &range);
2485            match (ret, expected) {
2486                (Ok(ret), Ok(expected)) => {
2487                    assert_eq!(
2488                        ret, expected,
2489                        "sort_vals: {:?}, range: {:?}",
2490                        sort_column, range
2491                    )
2492                }
2493                (Err(err), Err(expected)) => {
2494                    let expected: &str = expected;
2495                    assert!(
2496                        err.to_string().contains(expected),
2497                        "err: {:?}, expected: {:?}",
2498                        err,
2499                        expected
2500                    );
2501                }
2502                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2503            }
2504        }
2505    }
2506
2507    #[derive(Debug)]
2508    struct TestStream {
2509        expression: PhysicalSortExpr,
2510        fetch: Option<usize>,
2511        input: Vec<(PartitionRange, DfRecordBatch)>,
2512        output: Vec<DfRecordBatch>,
2513        schema: SchemaRef,
2514    }
2515
2516    impl TestStream {
2517        fn new(
2518            opt: SortOptions,
2519            fetch: Option<usize>,
2520            unit: TimeUnit,
2521            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2522            expected: Vec<Vec<ArrayRef>>,
2523        ) -> Self {
2524            let expression = PhysicalSortExpr {
2525                expr: Arc::new(helpers::ts_column()),
2526                options: opt,
2527            };
2528            let schema = Schema::new(vec![helpers::ts_field(unit)]);
2529            let schema = Arc::new(schema);
2530            let input = input
2531                .into_iter()
2532                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2533                .collect_vec();
2534            let output_batchs = expected
2535                .into_iter()
2536                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2537                .collect_vec();
2538            Self {
2539                expression,
2540                fetch,
2541                input,
2542                output: output_batchs,
2543                schema,
2544            }
2545        }
2546
2547        fn new_simple(
2548            descending: bool,
2549            fetch: Option<usize>,
2550            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2551            expected: Vec<Vec<ArrayRef>>,
2552        ) -> Self {
2553            Self::new(
2554                helpers::default_sort_opts(descending),
2555                fetch,
2556                TimeUnit::Millisecond,
2557                input,
2558                expected,
2559            )
2560        }
2561
2562        async fn run_test(&self) -> Vec<DfRecordBatch> {
2563            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2564
2565            let mock_input = MockInputExec::new(vec![batches], self.schema.clone());
2566
2567            let exec = WindowedSortExec::try_new(
2568                self.expression.clone(),
2569                self.fetch,
2570                vec![ranges],
2571                Arc::new(mock_input),
2572            )
2573            .unwrap();
2574
2575            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2576
2577            let real_output = exec_stream.collect::<Vec<_>>().await;
2578            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2579            real_output
2580        }
2581    }
2582
2583    #[tokio::test]
2584    async fn test_window_sort_empty_and_minimal() {
2585        use helpers::*;
2586        let test_cases = [
2587            // Empty input
2588            TestStream::new_simple(false, None, vec![], vec![]),
2589            // One empty batch, one with data
2590            TestStream::new_simple(
2591                false,
2592                None,
2593                vec![
2594                    (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2595                    (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2596                ],
2597                vec![vec![ts_array([2])]],
2598            ),
2599            // Both batches empty
2600            TestStream::new_simple(
2601                false,
2602                None,
2603                vec![
2604                    (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2605                    (partition_range(1, 3, 1, 0), vec![ts_array([])]),
2606                ],
2607                vec![],
2608            ),
2609            // Indistinguishable boundary case - value at exact boundary
2610            TestStream::new_simple(
2611                false,
2612                None,
2613                vec![
2614                    (partition_range(1, 2, 1, 0), vec![ts_array([1])]),
2615                    (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2616                ],
2617                vec![vec![ts_array([1])], vec![ts_array([2])]],
2618            ),
2619        ];
2620
2621        for (idx, testcase) in test_cases.iter().enumerate() {
2622            let output = testcase.run_test().await;
2623            assert_eq!(output, testcase.output, "empty/minimal case {idx} failed");
2624        }
2625    }
2626
2627    #[tokio::test]
2628    async fn test_window_sort_overlapping() {
2629        use helpers::*;
2630        let test_cases = [
2631            // Direct emit - overlapping ranges without merge
2632            TestStream::new_simple(
2633                false,
2634                None,
2635                vec![
2636                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2637                    (partition_range(1, 4, 1, 0), vec![ts_array([2, 3])]),
2638                ],
2639                vec![
2640                    vec![ts_array([1, 2])],
2641                    vec![ts_array([2])],
2642                    vec![ts_array([3])],
2643                ],
2644            ),
2645            // Cross working range batch intersection - triggers merge
2646            TestStream::new_simple(
2647                false,
2648                None,
2649                vec![
2650                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2651                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2652                ],
2653                vec![vec![ts_array([1, 1, 2, 2])], vec![ts_array([3])]],
2654            ),
2655            // No overlap case - separate ranges
2656            TestStream::new_simple(
2657                false,
2658                None,
2659                vec![
2660                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2661                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2662                    (partition_range(4, 6, 1, 1), vec![ts_array([4, 5])]),
2663                ],
2664                vec![
2665                    vec![ts_array([1, 1, 2, 2])],
2666                    vec![ts_array([3])],
2667                    vec![ts_array([4, 5])],
2668                ],
2669            ),
2670        ];
2671
2672        for (idx, testcase) in test_cases.iter().enumerate() {
2673            let output = testcase.run_test().await;
2674            assert_eq!(output, testcase.output, "overlapping case {idx} failed");
2675        }
2676    }
2677
2678    #[tokio::test]
2679    async fn test_window_sort_with_fetch() {
2680        use helpers::*;
2681        let test_cases = [
2682            // Fetch limit stops at 6 rows
2683            TestStream::new_simple(
2684                false,
2685                Some(6),
2686                vec![
2687                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2688                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2689                    (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2690                ],
2691                vec![
2692                    vec![ts_array([1, 1, 2, 2])],
2693                    vec![ts_array([3])],
2694                    vec![ts_array([4])],
2695                ],
2696            ),
2697            // Fetch limit stops at 3 rows
2698            TestStream::new_simple(
2699                false,
2700                Some(3),
2701                vec![
2702                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2703                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2704                    (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2705                ],
2706                vec![vec![ts_array([1, 1, 2])]],
2707            ),
2708        ];
2709
2710        for (idx, testcase) in test_cases.iter().enumerate() {
2711            let output = testcase.run_test().await;
2712            assert_eq!(output, testcase.output, "fetch case {idx} failed");
2713        }
2714    }
2715
2716    #[tokio::test]
2717    async fn test_window_sort_descending() {
2718        use helpers::*;
2719        let test_cases = [
2720            // Descending order sort
2721            TestStream::new_simple(
2722                true,
2723                None,
2724                vec![
2725                    (partition_range(3, 6, 1, 1), vec![ts_array([5, 4])]),
2726                    (partition_range(1, 4, 1, 1), vec![ts_array([3, 2, 1])]),
2727                    (partition_range(1, 3, 1, 0), vec![ts_array([2, 1])]),
2728                ],
2729                vec![
2730                    vec![ts_array([5, 4])],
2731                    vec![ts_array([3])],
2732                    vec![ts_array([2, 2, 1, 1])],
2733                ],
2734            ),
2735        ];
2736
2737        for (idx, testcase) in test_cases.iter().enumerate() {
2738            let output = testcase.run_test().await;
2739            assert_eq!(output, testcase.output, "descending case {idx} failed");
2740        }
2741    }
2742
2743    #[tokio::test]
2744    async fn test_window_sort_complex() {
2745        use helpers::*;
2746        let test_cases = [
2747            // Long range with subset short run
2748            TestStream::new_simple(
2749                false,
2750                None,
2751                vec![
2752                    (partition_range(1, 10, 1, 0), vec![ts_array([1, 5, 9])]),
2753                    (partition_range(3, 7, 1, 1), vec![ts_array([3, 4, 5, 6])]),
2754                ],
2755                vec![vec![ts_array([1])], vec![ts_array([3, 4, 5, 5, 6, 9])]],
2756            ),
2757            // Complex multi-range overlap
2758            TestStream::new_simple(
2759                false,
2760                None,
2761                vec![
2762                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2763                    (
2764                        partition_range(1, 10, 1, 1),
2765                        vec![ts_array([1, 3, 4, 5, 6, 8])],
2766                    ),
2767                    (partition_range(7, 10, 1, 1), vec![ts_array([7, 8, 9])]),
2768                ],
2769                vec![
2770                    vec![ts_array([1, 1, 2])],
2771                    vec![ts_array([3, 4, 5, 6])],
2772                    vec![ts_array([7, 8, 8, 9])],
2773                ],
2774            ),
2775            // Subset with duplicate datapoints
2776            TestStream::new_simple(
2777                false,
2778                None,
2779                vec![
2780                    (
2781                        partition_range(1, 11, 1, 0),
2782                        vec![ts_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])],
2783                    ),
2784                    (partition_range(5, 7, 1, 1), vec![ts_array([5, 6])]),
2785                ],
2786                vec![
2787                    vec![ts_array([1, 2, 3, 4])],
2788                    vec![ts_array([5, 5, 6, 6, 7, 8, 9, 10])],
2789                ],
2790            ),
2791        ];
2792
2793        for (idx, testcase) in test_cases.iter().enumerate() {
2794            let output = testcase.run_test().await;
2795            assert_eq!(output, testcase.output, "complex case {idx} failed");
2796        }
2797    }
2798
2799    #[tokio::test]
2800    async fn fuzzy_ish_test_window_sort_stream() {
2801        let test_cnt = 100;
2802        let part_cnt_bound = 100;
2803        let range_size_bound = 100;
2804        let range_offset_bound = 100;
2805        let in_range_datapoint_cnt_bound = 100;
2806        let fetch_bound = 100;
2807
2808        let mut rng = fastrand::Rng::new();
2809        let rng_seed = rng.u64(..);
2810        rng.seed(rng_seed);
2811        let mut bound_val = None;
2812        // construct testcases
2813        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
2814        let mut full_testcase_list = Vec::new();
2815        for _case_id in 0..test_cnt {
2816            let descending = rng.bool();
2817            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
2818                if descending {
2819                    return Box::new(|a: &T, b: &T| b.cmp(a));
2820                }
2821                Box::new(|a: &T, b: &T| a.cmp(b))
2822            }
2823            let unit = match rng.u8(0..3) {
2824                0 => TimeUnit::Second,
2825                1 => TimeUnit::Millisecond,
2826                2 => TimeUnit::Microsecond,
2827                _ => TimeUnit::Nanosecond,
2828            };
2829            let fetch = if rng.bool() {
2830                Some(rng.usize(0..fetch_bound))
2831            } else {
2832                None
2833            };
2834
2835            let mut input_ranged_data = vec![];
2836            let mut output_data: Vec<i64> = vec![];
2837            // generate input data
2838            for part_id in 0..rng.usize(0..part_cnt_bound) {
2839                let (start, end) = if descending {
2840                    // Use 1..=range_offset_bound to ensure strictly decreasing end values
2841                    let end = bound_val
2842                        .map(|i| i - rng.i64(1..=range_offset_bound))
2843                        .unwrap_or_else(|| rng.i64(..));
2844                    bound_val = Some(end);
2845                    let start = end - rng.i64(1..range_size_bound);
2846                    let start = Timestamp::new(start, unit.into());
2847                    let end = Timestamp::new(end, unit.into());
2848                    (start, end)
2849                } else {
2850                    // Use 1..=range_offset_bound to ensure strictly increasing start values
2851                    let start = bound_val
2852                        .map(|i| i + rng.i64(1..=range_offset_bound))
2853                        .unwrap_or_else(|| rng.i64(..));
2854                    bound_val = Some(start);
2855                    let end = start + rng.i64(1..range_size_bound);
2856                    let start = Timestamp::new(start, unit.into());
2857                    let end = Timestamp::new(end, unit.into());
2858                    (start, end)
2859                };
2860
2861                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
2862                let data_gen = iter
2863                    .map(|_| rng.i64(start.value()..end.value()))
2864                    .sorted_by(ret_cmp_fn(descending))
2865                    .collect_vec();
2866                output_data.extend(data_gen.clone());
2867                let arr = new_ts_array(unit, data_gen);
2868                let range = PartitionRange {
2869                    start,
2870                    end,
2871                    num_rows: arr.len(),
2872                    identifier: part_id,
2873                };
2874                input_ranged_data.push((range, vec![arr]));
2875            }
2876
2877            output_data.sort_by(ret_cmp_fn(descending));
2878            if let Some(fetch) = fetch {
2879                output_data.truncate(fetch);
2880            }
2881            let output_arr = new_ts_array(unit, output_data);
2882
2883            let test_stream = TestStream::new(
2884                helpers::default_sort_opts(descending),
2885                fetch,
2886                unit,
2887                input_ranged_data.clone(),
2888                vec![vec![output_arr]],
2889            );
2890            full_testcase_list.push(test_stream);
2891        }
2892
2893        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
2894            let res = test_stream.run_test().await;
2895            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
2896            let expected = test_stream.output;
2897            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
2898
2899            if res_concat != expected_concat {
2900                {
2901                    let mut f_input = std::io::stderr();
2902                    f_input.write_all(b"[").unwrap();
2903                    for (input_range, input_arr) in test_stream.input {
2904                        let range_json = json!({
2905                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
2906                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
2907                            "num_rows": input_range.num_rows,
2908                            "identifier": input_range.identifier,
2909                        });
2910                        let buf = Vec::new();
2911                        let mut input_writer = ArrayWriter::new(buf);
2912                        input_writer.write(&input_arr).unwrap();
2913                        input_writer.finish().unwrap();
2914                        let res_str =
2915                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
2916                        let whole_json =
2917                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
2918                        f_input.write_all(whole_json.as_bytes()).unwrap();
2919                    }
2920                    f_input.write_all(b"]").unwrap();
2921                }
2922                {
2923                    let mut f_res = std::io::stderr();
2924                    f_res.write_all(b"[").unwrap();
2925                    for batch in &res {
2926                        let mut res_writer = ArrayWriter::new(f_res);
2927                        res_writer.write(batch).unwrap();
2928                        res_writer.finish().unwrap();
2929                        f_res = res_writer.into_inner();
2930                        f_res.write_all(b",").unwrap();
2931                    }
2932                    f_res.write_all(b"]").unwrap();
2933
2934                    let f_res_concat = std::io::stderr();
2935                    let mut res_writer = ArrayWriter::new(f_res_concat);
2936                    res_writer.write(&res_concat).unwrap();
2937                    res_writer.finish().unwrap();
2938
2939                    let f_expected = std::io::stderr();
2940                    let mut expected_writer = ArrayWriter::new(f_expected);
2941                    expected_writer.write(&expected_concat).unwrap();
2942                    expected_writer.finish().unwrap();
2943                }
2944                panic!(
2945                    "case failed, case id: {0}, output and expected output to stderr",
2946                    case_id
2947                );
2948            }
2949            assert_eq!(
2950                res_concat, expected_concat,
2951                "case failed, case id: {}, rng seed: {}",
2952                case_id, rng_seed
2953            );
2954        }
2955    }
2956}