Skip to main content

query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use common_time::timestamp::TimeUnit as TimestampUnit;
34use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
35use datafusion::execution::{RecordBatchStream, TaskContext};
36use datafusion::physical_plan::memory::MemoryStream;
37use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
38use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
39use datafusion::physical_plan::{
40    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
41};
42use datafusion_common::utils::bisect;
43use datafusion_common::{DataFusionError, internal_err};
44use datafusion_physical_expr::PhysicalSortExpr;
45use datatypes::value::Value;
46use futures::Stream;
47use itertools::Itertools;
48use snafu::ResultExt;
49use store_api::region_engine::PartitionRange;
50
51use crate::error::{QueryExecutionSnafu, Result};
52
53/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
54/// merge sort them whenever possible, and emit the sorted result as soon as possible.
55/// This sorting plan only accept sort by ts and will not sort by other fields.
56///
57/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
58///
59/// # Invariant Promise on Input Stream
60/// 1. The input stream must be sorted by timestamp and
61/// 2. in the order of `PartitionRange` in `ranges`
62/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
63/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
64///
65///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
66
67#[derive(Debug, Clone)]
68pub struct WindowedSortExec {
69    /// Physical sort expressions(that is, sort by timestamp)
70    expression: PhysicalSortExpr,
71    /// Optional number of rows to fetch. Stops producing rows after this fetch
72    fetch: Option<usize>,
73    /// The input ranges indicate input stream will be composed of those ranges in given order.
74    ///
75    /// Each partition has one vector of `PartitionRange`.
76    ranges: Vec<Vec<PartitionRange>>,
77    /// All available working ranges and their corresponding working set
78    ///
79    /// working ranges promise once input stream get a value out of current range, future values will never
80    /// be in this range. Each partition has one vector of ranges.
81    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
82    input: Arc<dyn ExecutionPlan>,
83    /// Execution metrics
84    metrics: ExecutionPlanMetricsSet,
85    properties: Arc<PlanProperties>,
86}
87
88/// Checks that partition ranges are sorted correctly for the given sort direction.
89/// - Descending: sorted by (end DESC, start DESC) - shorter ranges first when ends are equal
90/// - Ascending: sorted by (start ASC, end ASC) - shorter ranges first when starts are equal
91pub fn check_partition_range_monotonicity(
92    ranges: &[Vec<PartitionRange>],
93    descending: bool,
94) -> Result<()> {
95    let is_valid = ranges.iter().all(|r| {
96        if descending {
97            // Primary: end descending, Secondary: start descending (shorter range first)
98            r.windows(2)
99                .all(|w| w[0].end > w[1].end || (w[0].end == w[1].end && w[0].start >= w[1].start))
100        } else {
101            // Primary: start ascending, Secondary: end ascending (shorter range first)
102            r.windows(2).all(|w| {
103                w[0].start < w[1].start || (w[0].start == w[1].start && w[0].end <= w[1].end)
104            })
105        }
106    });
107
108    if !is_valid {
109        let msg = if descending {
110            "Input `PartitionRange`s are not sorted by (end DESC, start DESC)"
111        } else {
112            "Input `PartitionRange`s are not sorted by (start ASC, end ASC)"
113        };
114        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
115        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
116    } else {
117        Ok(())
118    }
119}
120
121impl WindowedSortExec {
122    pub fn try_new(
123        expression: PhysicalSortExpr,
124        fetch: Option<usize>,
125        ranges: Vec<Vec<PartitionRange>>,
126        input: Arc<dyn ExecutionPlan>,
127    ) -> Result<Self> {
128        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
129
130        let mut eq_properties = input.equivalence_properties().clone();
131        eq_properties.reorder(vec![expression.clone()])?;
132
133        let properties = input.properties();
134        let properties = Arc::new(PlanProperties::new(
135            eq_properties,
136            input.output_partitioning().clone(),
137            properties.emission_type,
138            properties.boundedness,
139        ));
140
141        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
142        for r in &ranges {
143            let overlap_counts = split_overlapping_ranges(r);
144            let working_ranges =
145                compute_all_working_ranges(&overlap_counts, expression.options.descending);
146            all_avail_working_range.push(working_ranges);
147        }
148
149        Ok(Self {
150            expression,
151            fetch,
152            ranges,
153            all_avail_working_range,
154            input,
155            metrics: ExecutionPlanMetricsSet::new(),
156            properties,
157        })
158    }
159
160    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
161    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
162    /// of working set, we can merge results before whenever possible.
163    pub fn to_stream(
164        &self,
165        context: Arc<TaskContext>,
166        partition: usize,
167    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
168        let input_stream: DfSendableRecordBatchStream =
169            self.input.execute(partition, context.clone())?;
170
171        let df_stream = Box::pin(WindowedSortStream::new(
172            context,
173            self,
174            input_stream,
175            partition,
176        )) as _;
177
178        Ok(df_stream)
179    }
180}
181
182impl DisplayAs for WindowedSortExec {
183    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
184        write!(
185            f,
186            "WindowedSortExec: expr={} num_ranges={}",
187            self.expression,
188            self.ranges.len()
189        )?;
190        if let Some(fetch) = self.fetch {
191            write!(f, " fetch={}", fetch)?;
192        }
193        Ok(())
194    }
195}
196
197impl ExecutionPlan for WindowedSortExec {
198    fn as_any(&self) -> &dyn Any {
199        self
200    }
201
202    fn schema(&self) -> SchemaRef {
203        self.input.schema()
204    }
205
206    fn properties(&self) -> &Arc<PlanProperties> {
207        &self.properties
208    }
209
210    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
211        vec![&self.input]
212    }
213
214    fn with_new_children(
215        self: Arc<Self>,
216        children: Vec<Arc<dyn ExecutionPlan>>,
217    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
218        let new_input = if let Some(first) = children.first() {
219            first
220        } else {
221            internal_err!("No children found")?
222        };
223        let new = Self::try_new(
224            self.expression.clone(),
225            self.fetch,
226            self.ranges.clone(),
227            new_input.clone(),
228        )?;
229        Ok(Arc::new(new))
230    }
231
232    fn execute(
233        &self,
234        partition: usize,
235        context: Arc<TaskContext>,
236    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
237        self.to_stream(context, partition)
238    }
239
240    fn metrics(&self) -> Option<MetricsSet> {
241        Some(self.metrics.clone_inner())
242    }
243
244    /// # Explain
245    ///
246    /// This plan needs to be executed on each partition independently,
247    /// and is expected to run directly on storage engine's output
248    /// distribution / partition.
249    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
250        vec![false]
251    }
252
253    fn name(&self) -> &str {
254        "WindowedSortExec"
255    }
256}
257
258/// The core logic of merging sort multiple sorted ranges
259///
260/// the flow of data is:
261/// ```md
262/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
263/// ```
264pub struct WindowedSortStream {
265    /// Memory pool for this stream
266    memory_pool: Arc<dyn MemoryPool>,
267    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
268    in_progress: Vec<DfRecordBatch>,
269    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
270    last_value: Option<Timestamp>,
271    /// Current working set of `PartitionRange` sorted RecordBatches
272    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
273    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
274    merge_stream: VecDeque<DfSendableRecordBatchStream>,
275    /// The number of times merge sort has been called
276    merge_count: usize,
277    /// Index into current `working_range` in `all_avail_working_range`
278    working_idx: usize,
279    /// input stream assumed reading in order of `PartitionRange`
280    input: DfSendableRecordBatchStream,
281    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
282    is_terminated: bool,
283    /// Output Schema, which is the same as input schema, since this is a sort plan
284    schema: SchemaRef,
285    /// Physical sort expressions(that is, sort by timestamp)
286    expression: PhysicalSortExpr,
287    /// Optional number of rows to fetch. Stops producing rows after this fetch
288    fetch: Option<usize>,
289    /// number of rows produced
290    produced: usize,
291    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
292    batch_size: usize,
293    /// All available working ranges and their corresponding working set
294    ///
295    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
296    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
297    /// The input partition ranges
298    #[allow(dead_code)] // this is used under #[debug_assertions]
299    ranges: Vec<PartitionRange>,
300    /// Execution metrics
301    metrics: BaselineMetrics,
302}
303
304impl WindowedSortStream {
305    pub fn new(
306        context: Arc<TaskContext>,
307        exec: &WindowedSortExec,
308        input: DfSendableRecordBatchStream,
309        partition: usize,
310    ) -> Self {
311        Self {
312            memory_pool: context.runtime_env().memory_pool.clone(),
313            in_progress: Vec::new(),
314            last_value: None,
315            sorted_input_runs: Vec::new(),
316            merge_stream: VecDeque::new(),
317            merge_count: 0,
318            working_idx: 0,
319            schema: input.schema(),
320            input,
321            is_terminated: false,
322            expression: exec.expression.clone(),
323            fetch: exec.fetch,
324            produced: 0,
325            batch_size: context.session_config().batch_size(),
326            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
327            ranges: exec.ranges[partition].clone(),
328            metrics: BaselineMetrics::new(&exec.metrics, partition),
329        }
330    }
331}
332
333impl WindowedSortStream {
334    #[cfg(debug_assertions)]
335    fn check_subset_ranges(&self, cur_range: &TimeRange) {
336        let cur_is_subset_to = self
337            .ranges
338            .iter()
339            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
340            .collect_vec();
341        if cur_is_subset_to.is_empty() {
342            error!("Current range is not a subset of any PartitionRange");
343            // found in all ranges that are subset of current range
344            let subset_ranges = self
345                .ranges
346                .iter()
347                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
348                .collect_vec();
349            let only_overlap = self
350                .ranges
351                .iter()
352                .filter(|r| {
353                    let r = TimeRange::from(*r);
354                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
355                })
356                .collect_vec();
357            error!(
358                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
359                subset_ranges.len(),
360                only_overlap.len(),
361                subset_ranges,
362                only_overlap
363            );
364        } else {
365            let only_overlap = self
366                .ranges
367                .iter()
368                .filter(|r| {
369                    let r = TimeRange::from(*r);
370                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
371                })
372                .collect_vec();
373            error!(
374                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
375                cur_is_subset_to.len(),
376                only_overlap.len(),
377                cur_is_subset_to,
378                only_overlap
379            );
380        }
381        let all_overlap_working_range = self
382            .all_avail_working_range
383            .iter()
384            .filter(|(range, _)| range.is_overlapping(cur_range))
385            .map(|(range, _)| range)
386            .collect_vec();
387        error!(
388            "Found {} working ranges that overlap with current range: {:?}",
389            all_overlap_working_range.len(),
390            all_overlap_working_range
391        );
392    }
393
394    /// Poll the next RecordBatch from the merge-sort's output stream
395    fn poll_result_stream(
396        &mut self,
397        cx: &mut Context<'_>,
398    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
399        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
400            match merge_stream.as_mut().poll_next(cx) {
401                Poll::Ready(Some(Ok(batch))) => {
402                    let ret = if let Some(remaining) = self.remaining_fetch() {
403                        if remaining == 0 {
404                            self.is_terminated = true;
405                            None
406                        } else if remaining < batch.num_rows() {
407                            self.produced += remaining;
408                            Some(Ok(batch.slice(0, remaining)))
409                        } else {
410                            self.produced += batch.num_rows();
411                            Some(Ok(batch))
412                        }
413                    } else {
414                        self.produced += batch.num_rows();
415                        Some(Ok(batch))
416                    };
417                    return Poll::Ready(ret);
418                }
419                Poll::Ready(Some(Err(e))) => {
420                    return Poll::Ready(Some(Err(e)));
421                }
422                Poll::Ready(None) => {
423                    // current merge stream is done, we can start polling the next one
424
425                    self.merge_stream.pop_front();
426                    continue;
427                }
428                Poll::Pending => {
429                    return Poll::Pending;
430                }
431            }
432        }
433        // if no output stream is available
434        Poll::Ready(None)
435    }
436
437    /// The core logic of merging sort multiple sorted ranges
438    ///
439    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
440    pub fn poll_next_inner(
441        mut self: Pin<&mut Self>,
442        cx: &mut Context<'_>,
443    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
444        // first check and send out the merge result
445        match self.poll_result_stream(cx) {
446            Poll::Ready(None) => {
447                if self.is_terminated {
448                    return Poll::Ready(None);
449                }
450            }
451            x => return x,
452        };
453
454        // consume input stream
455        while !self.is_terminated {
456            // then we get a new RecordBatch from input stream
457            let SortedRunSet {
458                runs_with_batch,
459                sort_column,
460            } = match self.input.as_mut().poll_next(cx) {
461                Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
462                Poll::Ready(Some(Err(e))) => {
463                    return Poll::Ready(Some(Err(e)));
464                }
465                Poll::Ready(None) => {
466                    // input stream is done, we need to merge sort the remaining working set
467                    self.is_terminated = true;
468                    self.build_sorted_stream()?;
469                    self.start_new_merge_sort()?;
470                    break;
471                }
472                Poll::Pending => return Poll::Pending,
473            };
474
475            // The core logic to eargerly merge sort the working set
476
477            // compare with last_value to find boundary, then merge runs if needed
478
479            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
480            let mut last_remaining = None;
481            let mut run_iter = runs_with_batch.into_iter();
482            loop {
483                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
484                    break;
485                };
486                if sorted_rb.num_rows() == 0 {
487                    continue;
488                }
489                // determine if this batch is in current working range
490                let Some(cur_range) = run_info.get_time_range() else {
491                    internal_err!("Found NULL in time index column")?
492                };
493                let Some(working_range) = self.get_working_range() else {
494                    internal_err!("No working range found")?
495                };
496
497                // ensure the current batch is in the working range
498                if sort_column.options.unwrap_or_default().descending {
499                    if cur_range.end > working_range.end {
500                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
501                        #[cfg(debug_assertions)]
502                        self.check_subset_ranges(&cur_range);
503                        internal_err!(
504                            "Current batch have data on the right side of working range, something is very wrong"
505                        )?;
506                    }
507                } else if cur_range.start < working_range.start {
508                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
509                    #[cfg(debug_assertions)]
510                    self.check_subset_ranges(&cur_range);
511                    internal_err!(
512                        "Current batch have data on the left side of working range, something is very wrong"
513                    )?;
514                }
515
516                if cur_range.is_subset(&working_range) {
517                    // data still in range, can't merge sort yet
518                    // see if can concat entire sorted rb, merge sort need to wait
519                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
520                } else if let Some(intersection) = cur_range.intersection(&working_range) {
521                    // slice rb by intersection and concat it then merge sort
522                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
523                    let (offset, len) = find_slice_from_range(
524                        &SortColumn {
525                            values: cur_sort_column.clone(),
526                            options: sort_column.options,
527                        },
528                        &intersection,
529                    )?;
530
531                    if offset != 0 {
532                        internal_err!(
533                            "Current batch have data on the left side of working range, something is very wrong"
534                        )?;
535                    }
536
537                    let sliced_rb = sorted_rb.slice(offset, len);
538
539                    // try to concat the sliced input batch to the current `in_progress` run
540                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
541                    // since no more sorted data in this range will come in now, build stream now
542                    self.build_sorted_stream()?;
543
544                    // since we are crossing working range, we need to merge sort the working set
545                    self.start_new_merge_sort()?;
546
547                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
548                    if r_len != 0 {
549                        // we have remaining data in this batch, put it back to input queue
550                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
551                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
552                        let new_run_info = SucRun {
553                            offset: run_info.offset + r_offset,
554                            len: r_len,
555                            first_val: new_first_val,
556                            last_val: run_info.last_val,
557                        };
558                        last_remaining = Some((remaining_rb, new_run_info));
559                    }
560                    // deal with remaining batch cross working range problem
561                    // i.e: this example require more slice, and we are currently at point A
562                    // |---1---|       |---3---|
563                    // |-------A--2------------|
564                    //  put the remaining batch back to iter and deal it in next loop
565                } else {
566                    // no overlap, we can merge sort the working set
567
568                    self.build_sorted_stream()?;
569                    self.start_new_merge_sort()?;
570
571                    // always put it back to input queue until some batch is in working range
572                    last_remaining = Some((sorted_rb, run_info));
573                }
574            }
575
576            // poll result stream again to see if we can emit more results
577            match self.poll_result_stream(cx) {
578                Poll::Ready(None) => {
579                    if self.is_terminated {
580                        return Poll::Ready(None);
581                    }
582                }
583                x => return x,
584            };
585        }
586        // emit the merge result after terminated(all input stream is done)
587        self.poll_result_stream(cx)
588    }
589
590    fn push_batch(&mut self, batch: DfRecordBatch) {
591        self.in_progress.push(batch);
592    }
593
594    /// Try to concat the input batch to the current `in_progress` run
595    ///
596    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
597    fn try_concat_batch(
598        &mut self,
599        batch: DfRecordBatch,
600        run_info: &SucRun<Timestamp>,
601        opt: Option<SortOptions>,
602    ) -> datafusion_common::Result<()> {
603        let is_ok_to_concat =
604            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
605
606        if is_ok_to_concat {
607            self.push_batch(batch);
608            // next time we get input batch might still be ordered, so not build stream yet
609        } else {
610            // no more sorted data, build stream now
611            self.build_sorted_stream()?;
612            self.push_batch(batch);
613        }
614        self.last_value = run_info.last_val;
615        Ok(())
616    }
617
618    /// Get the current working range
619    fn get_working_range(&self) -> Option<TimeRange> {
620        self.all_avail_working_range
621            .get(self.working_idx)
622            .map(|(range, _)| *range)
623    }
624
625    /// Set current working range to the next working range
626    fn set_next_working_range(&mut self) {
627        self.working_idx += 1;
628    }
629
630    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
631    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
632        if self.in_progress.is_empty() {
633            return Ok(());
634        }
635        let data = std::mem::take(&mut self.in_progress);
636
637        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
638        self.sorted_input_runs.push(Box::pin(new_stream));
639        Ok(())
640    }
641
642    /// Start merging sort the current working set
643    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
644        if !self.in_progress.is_empty() {
645            return internal_err!("Starting a merge sort when in_progress is not empty")?;
646        }
647
648        self.set_next_working_range();
649
650        let streams = std::mem::take(&mut self.sorted_input_runs);
651        if streams.is_empty() {
652            return Ok(());
653        } else if streams.len() == 1 {
654            self.merge_stream
655                .push_back(streams.into_iter().next().unwrap());
656            return Ok(());
657        }
658
659        let fetch = self.remaining_fetch();
660        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
661            .register(&self.memory_pool);
662        self.merge_count += 1;
663
664        let resulting_stream = StreamingMergeBuilder::new()
665            .with_streams(streams)
666            .with_schema(self.schema())
667            .with_expressions(&[self.expression.clone()].into())
668            .with_metrics(self.metrics.clone())
669            .with_batch_size(self.batch_size)
670            .with_fetch(fetch)
671            .with_reservation(reservation)
672            .build()?;
673        self.merge_stream.push_back(resulting_stream);
674        // this working range is done, move to next working range
675        Ok(())
676    }
677
678    /// Remaining number of rows to fetch, if no fetch limit, return None
679    /// if fetch limit is reached, return Some(0)
680    fn remaining_fetch(&self) -> Option<usize> {
681        let total_now = self.produced;
682        self.fetch.map(|p| p.saturating_sub(total_now))
683    }
684}
685
686impl Stream for WindowedSortStream {
687    type Item = datafusion_common::Result<DfRecordBatch>;
688
689    fn poll_next(
690        mut self: Pin<&mut Self>,
691        cx: &mut Context<'_>,
692    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
693        let result = self.as_mut().poll_next_inner(cx);
694        self.metrics.record_poll(result)
695    }
696}
697
698impl RecordBatchStream for WindowedSortStream {
699    fn schema(&self) -> SchemaRef {
700        self.schema.clone()
701    }
702}
703
704/// split batch to sorted runs
705fn split_batch_to_sorted_run(
706    batch: DfRecordBatch,
707    expression: &PhysicalSortExpr,
708) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
709    // split input rb to sorted runs
710    let sort_column = expression.evaluate_to_sort_column(&batch)?;
711    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
712    if let Some(run) = sorted_runs_offset.first()
713        && sorted_runs_offset.len() == 1
714    {
715        if !(run.offset == 0 && run.len == batch.num_rows()) {
716            internal_err!(
717                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
718                run.offset,
719                run.len,
720                batch.num_rows()
721            )?;
722        }
723        // input rb is already sorted, we can emit it directly
724        Ok(SortedRunSet {
725            runs_with_batch: vec![(batch, run.clone())],
726            sort_column,
727        })
728    } else {
729        // those slice should be zero copy, so supposedly no new reservation needed
730        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
731        for run in sorted_runs_offset {
732            if run.offset + run.len > batch.num_rows() {
733                internal_err!(
734                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
735                    run.offset,
736                    run.len,
737                    batch.num_rows()
738                )?;
739            }
740            let new_rb = batch.slice(run.offset, run.len);
741            ret.push((new_rb, run));
742        }
743        Ok(SortedRunSet {
744            runs_with_batch: ret,
745            sort_column,
746        })
747    }
748}
749
750/// Downcast a temporal array to a specific type
751///
752/// usage similar to `downcast_primitive!` in `arrow-array` crate
753#[macro_export]
754macro_rules! downcast_ts_array {
755    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
756    {
757        match $data_type {
758            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
759                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
760            }
761            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
762                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
763            }
764            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
765                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
766            }
767            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
768                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
769            }
770            $($p => $fallback,)*
771        }
772    };
773}
774
775/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
776///
777/// Return the offset and length of the slice
778fn find_slice_from_range(
779    sort_column: &SortColumn,
780    range: &TimeRange,
781) -> datafusion_common::Result<(usize, usize)> {
782    let time_unit = sort_timestamp_unit(sort_column.values.data_type())?;
783    let array = &sort_column.values;
784    let opt = &sort_column.options.unwrap_or_default();
785    let descending = opt.descending;
786    let range = convert_time_range_for_sort(range, time_unit)?;
787
788    let typed_sorted_range = [range.start, range.end]
789        .iter()
790        .map(|t| {
791            t.convert_to(time_unit.into())
792                .ok_or_else(|| {
793                    DataFusionError::Internal(format!(
794                        "Failed to convert timestamp from {:?} to {:?}",
795                        t.unit(),
796                        time_unit
797                    ))
798                })
799                .and_then(|typed_ts| {
800                    let value = Value::Timestamp(typed_ts);
801                    value
802                        .try_to_scalar_value(&value.data_type())
803                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
804                })
805        })
806        .try_collect::<_, Vec<_>, _>()?;
807
808    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
809
810    // get slice that in which all data that `min_val<=data<max_val`
811    let (start, end) = if descending {
812        // note that `data < max_val`
813        // i,e, for max_val = 4, array = [5,3,2] should be start=1
814        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
815        let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
816        // min_val = 1, array = [3, 2, 1, 0], end = 3
817        // min_val = 1, array = [3, 2, 0], end = 2
818        let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
819        (start, end)
820    } else {
821        // min_val = 1, array = [1, 2, 3], start = 0
822        // min_val = 1, array = [0, 2, 3], start = 1
823        let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
824        // max_val = 3, array = [1, 3, 4], end = 1
825        // max_val = 3, array = [1, 2, 4], end = 2
826        let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
827        (start, end)
828    };
829
830    Ok((start, end - start))
831}
832
833fn sort_timestamp_unit(data_type: &DataType) -> datafusion_common::Result<arrow_schema::TimeUnit> {
834    if let DataType::Timestamp(unit, _) = data_type {
835        Ok(*unit)
836    } else {
837        Err(DataFusionError::Internal(format!(
838            "Unsupported sort column type: {data_type}"
839        )))
840    }
841}
842
843#[derive(Debug, Clone, Copy)]
844enum RangeBoundKind {
845    InclusiveStart,
846    ExclusiveEnd,
847}
848
849fn convert_time_range_for_sort(
850    range: &TimeRange,
851    time_unit: arrow_schema::TimeUnit,
852) -> datafusion_common::Result<TimeRange> {
853    let target_unit = time_unit.into();
854    Ok(TimeRange::new(
855        convert_timestamp_range_bound(range.start, target_unit, RangeBoundKind::InclusiveStart)?,
856        convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?,
857    ))
858}
859
860fn convert_timestamp_range_bound(
861    timestamp: Timestamp,
862    target_unit: TimestampUnit,
863    bound_kind: RangeBoundKind,
864) -> datafusion_common::Result<Timestamp> {
865    let converted = match bound_kind {
866        RangeBoundKind::InclusiveStart => timestamp.convert_to(target_unit),
867        RangeBoundKind::ExclusiveEnd => timestamp.convert_to_ceil(target_unit),
868    };
869
870    converted.ok_or_else(|| {
871        DataFusionError::Internal(format!(
872            "Failed to convert timestamp from {:?} to {:?}",
873            timestamp.unit(),
874            target_unit
875        ))
876    })
877}
878
879pub(crate) fn project_partition_range_for_sort(
880    range: PartitionRange,
881    sort_data_type: &DataType,
882) -> datafusion_common::Result<PartitionRange> {
883    let target_unit = sort_timestamp_unit(sort_data_type)?.into();
884    Ok(PartitionRange {
885        start: convert_timestamp_range_bound(
886            range.start,
887            target_unit,
888            RangeBoundKind::InclusiveStart,
889        )?,
890        end: convert_timestamp_range_bound(range.end, target_unit, RangeBoundKind::ExclusiveEnd)?,
891        ..range
892    })
893}
894
895fn discrete_exclusive_end(timestamp: Timestamp) -> Timestamp {
896    Timestamp::new(timestamp.value() + 1, timestamp.unit())
897}
898
899/// Get an iterator from a primitive array.
900///
901/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
902#[macro_export]
903macro_rules! array_iter_helper {
904    ($t:ty, $unit:expr, $arr:expr) => {{
905        let typed = $arr
906            .as_any()
907            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
908            .unwrap();
909        let iter = typed.iter().enumerate();
910        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
911    }};
912}
913
914/// Compare with options, note None is considered as NULL here
915///
916/// default to null first
917fn cmp_with_opts<T: Ord>(
918    a: &Option<T>,
919    b: &Option<T>,
920    opt: &Option<SortOptions>,
921) -> std::cmp::Ordering {
922    let opt = opt.unwrap_or_default();
923
924    if let (Some(a), Some(b)) = (a, b) {
925        if opt.descending { b.cmp(a) } else { a.cmp(b) }
926    } else if opt.nulls_first {
927        // now we know at leatst one of them is None
928        // in rust None < Some(_)
929        a.cmp(b)
930    } else {
931        match (a, b) {
932            (Some(a), Some(b)) => a.cmp(b),
933            (Some(_), None) => std::cmp::Ordering::Less,
934            (None, Some(_)) => std::cmp::Ordering::Greater,
935            (None, None) => std::cmp::Ordering::Equal,
936        }
937    }
938}
939
940#[derive(Debug, Clone)]
941struct SortedRunSet<N: Ord> {
942    /// sorted runs with batch corresponding to them
943    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
944    /// sorted column from eval sorting expr
945    sort_column: SortColumn,
946}
947
948/// A struct to represent a successive run in the input iterator
949#[derive(Debug, Clone, PartialEq)]
950struct SucRun<N: Ord> {
951    /// offset of the first element in the run
952    offset: usize,
953    /// length of the run
954    len: usize,
955    /// first non-null value in the run
956    first_val: Option<N>,
957    /// last non-null value in the run
958    last_val: Option<N>,
959}
960
961impl SucRun<Timestamp> {
962    /// Get the time range of the run, which is [min_val, max_val + 1)
963    fn get_time_range(&self) -> Option<TimeRange> {
964        let start = self.first_val.min(self.last_val);
965        let end = self
966            .first_val
967            .max(self.last_val)
968            .map(discrete_exclusive_end);
969        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
970    }
971}
972
973/// find all successive runs in the input iterator
974fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
975    iter: T,
976    sort_opts: &Option<SortOptions>,
977) -> Vec<SucRun<N>> {
978    let mut runs = Vec::new();
979    let mut last_value = None;
980    let mut iter_len = None;
981
982    let mut last_offset = 0;
983    let mut first_val: Option<N> = None;
984    let mut last_val: Option<N> = None;
985
986    for (idx, t) in iter {
987        if let Some(last_value) = &last_value
988            && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
989        {
990            // we found a boundary
991            let len = idx - last_offset;
992            let run = SucRun {
993                offset: last_offset,
994                len,
995                first_val,
996                last_val,
997            };
998            runs.push(run);
999            first_val = None;
1000            last_val = None;
1001
1002            last_offset = idx;
1003        }
1004        last_value = Some(t);
1005        if let Some(t) = t {
1006            first_val = first_val.or(Some(t));
1007            last_val = Some(t).or(last_val);
1008        }
1009        iter_len = Some(idx);
1010    }
1011    let run = SucRun {
1012        offset: last_offset,
1013        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
1014        first_val,
1015        last_val,
1016    };
1017    runs.push(run);
1018
1019    runs
1020}
1021
1022/// return a list of non-overlapping (offset, length) which represent sorted runs, and
1023/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
1024/// Returned runs will be as long as possible, and will not overlap with each other
1025fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
1026    let ty = sort_column.values.data_type();
1027    if let DataType::Timestamp(unit, _) = ty {
1028        let array = &sort_column.values;
1029        let iter = downcast_ts_array!(
1030            array.data_type() => (array_iter_helper, array),
1031            _ => internal_err!("Unsupported sort column type: {ty}")?
1032        );
1033
1034        let raw = find_successive_runs(iter, &sort_column.options);
1035        let ts_runs = raw
1036            .into_iter()
1037            .map(|run| SucRun {
1038                offset: run.offset,
1039                len: run.len,
1040                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
1041                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
1042            })
1043            .collect_vec();
1044        Ok(ts_runs)
1045    } else {
1046        Err(DataFusionError::Internal(format!(
1047            "Unsupported sort column type: {ty}"
1048        )))
1049    }
1050}
1051
1052/// Left(`start`) inclusive right(`end`) exclusive,
1053///
1054/// This is just tuple with extra methods
1055#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
1056struct TimeRange {
1057    start: Timestamp,
1058    end: Timestamp,
1059}
1060
1061impl From<&PartitionRange> for TimeRange {
1062    fn from(range: &PartitionRange) -> Self {
1063        Self::new(range.start, range.end)
1064    }
1065}
1066
1067impl From<(Timestamp, Timestamp)> for TimeRange {
1068    fn from(range: (Timestamp, Timestamp)) -> Self {
1069        Self::new(range.0, range.1)
1070    }
1071}
1072
1073impl From<&(Timestamp, Timestamp)> for TimeRange {
1074    fn from(range: &(Timestamp, Timestamp)) -> Self {
1075        Self::new(range.0, range.1)
1076    }
1077}
1078
1079impl TimeRange {
1080    /// Create a new TimeRange, if start is greater than end, swap them
1081    fn new(start: Timestamp, end: Timestamp) -> Self {
1082        if start > end {
1083            Self {
1084                start: end,
1085                end: start,
1086            }
1087        } else {
1088            Self { start, end }
1089        }
1090    }
1091
1092    fn is_subset(&self, other: &Self) -> bool {
1093        self.start >= other.start && self.end <= other.end
1094    }
1095
1096    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1097    fn is_overlapping(&self, other: &Self) -> bool {
1098        !(self.start >= other.end || self.end <= other.start)
1099    }
1100
1101    fn intersection(&self, other: &Self) -> Option<Self> {
1102        if self.is_overlapping(other) {
1103            Some(Self::new(
1104                self.start.max(other.start),
1105                self.end.min(other.end),
1106            ))
1107        } else {
1108            None
1109        }
1110    }
1111
1112    fn difference(&self, other: &Self) -> Vec<Self> {
1113        if !self.is_overlapping(other) {
1114            vec![*self]
1115        } else {
1116            let mut ret = Vec::new();
1117            if self.start < other.start && self.end > other.end {
1118                ret.push(Self::new(self.start, other.start));
1119                ret.push(Self::new(other.end, self.end));
1120            } else if self.start < other.start {
1121                ret.push(Self::new(self.start, other.start));
1122            } else if self.end > other.end {
1123                ret.push(Self::new(other.end, self.end));
1124            }
1125            ret
1126        }
1127    }
1128}
1129
1130/// split input range by `split_by` range to one, two or three parts.
1131fn split_range_by(
1132    input_range: &TimeRange,
1133    input_parts: &[usize],
1134    split_by: &TimeRange,
1135    split_idx: usize,
1136) -> Vec<Action> {
1137    let mut ret = Vec::new();
1138    if input_range.is_overlapping(split_by) {
1139        let input_parts = input_parts.to_vec();
1140        let new_parts = {
1141            let mut new_parts = input_parts.clone();
1142            new_parts.push(split_idx);
1143            new_parts
1144        };
1145
1146        ret.push(Action::Pop(*input_range));
1147        if let Some(intersection) = input_range.intersection(split_by) {
1148            ret.push(Action::Push(intersection, new_parts.clone()));
1149        }
1150        for diff in input_range.difference(split_by) {
1151            ret.push(Action::Push(diff, input_parts.clone()));
1152        }
1153    }
1154    ret
1155}
1156
1157#[derive(Debug, Clone, PartialEq, Eq)]
1158enum Action {
1159    Pop(TimeRange),
1160    Push(TimeRange, Vec<usize>),
1161}
1162
1163/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1164///
1165/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1166///
1167/// hence we can merge sort current working range once that happens
1168///
1169/// if `descending` is true, the working ranges will be in descending order
1170fn compute_all_working_ranges(
1171    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1172    descending: bool,
1173) -> Vec<(TimeRange, BTreeSet<usize>)> {
1174    let mut ret = Vec::new();
1175    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1176    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1177        Box::new(overlap_counts.iter().rev()) as _
1178    } else {
1179        Box::new(overlap_counts.iter()) as _
1180    };
1181    for (range, set) in overlap_iter {
1182        match &mut cur_range_set {
1183            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1184            Some((working_range, working_set)) => {
1185                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1186                // to be read before merge sorting), and `working_set` have >1 count
1187                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1188                // so that merge sort is possible
1189                let need_expand = {
1190                    let last_part = working_set.last();
1191                    let inter: BTreeSet<usize> = working_set
1192                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1193                        .cloned()
1194                        .collect();
1195                    if let Some(one) = inter.first()
1196                        && inter.len() == 1
1197                        && Some(one) == last_part
1198                    {
1199                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1200                        if set.iter().all(|p| Some(p) >= last_part) {
1201                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1202                            false
1203                        } else {
1204                            // elsewise, we need to expand working range to include next overlap range
1205                            true
1206                        }
1207                    } else if inter.is_empty() {
1208                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1209                        false
1210                    } else {
1211                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1212                        true
1213                    }
1214                };
1215
1216                if need_expand {
1217                    if descending {
1218                        working_range.start = range.start;
1219                    } else {
1220                        working_range.end = range.end;
1221                    }
1222                    working_set.extend(set.iter().cloned());
1223                } else {
1224                    ret.push((*working_range, std::mem::take(working_set)));
1225                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1226                }
1227            }
1228        }
1229    }
1230
1231    if let Some(cur_range_set) = cur_range_set {
1232        ret.push(cur_range_set)
1233    }
1234
1235    ret
1236}
1237
1238/// return a map of non-overlapping ranges and their corresponding index
1239/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1240fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1241    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1242    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1243    for (idx, range) in ranges.iter().enumerate() {
1244        let key: TimeRange = (range.start, range.end).into();
1245        let mut actions = Vec::new();
1246        let mut untouched = vec![key];
1247        // create a forward and backward iterator to find all overlapping ranges
1248        // given that tuple is sorted in lexicographical order and promise to not overlap,
1249        // since range is sorted that way, we can stop when we find a non-overlapping range
1250        let forward_iter = ret
1251            .range(key..)
1252            .take_while(|(range, _)| range.is_overlapping(&key));
1253        let backward_iter = ret
1254            .range(..key)
1255            .rev()
1256            .take_while(|(range, _)| range.is_overlapping(&key));
1257
1258        for (range, parts) in forward_iter.chain(backward_iter) {
1259            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1260            let act = split_range_by(range, parts, &key, idx);
1261            actions.extend(act);
1262        }
1263
1264        for action in actions {
1265            match action {
1266                Action::Pop(range) => {
1267                    ret.remove(&range);
1268                }
1269                Action::Push(range, parts) => {
1270                    ret.insert(range, parts);
1271                }
1272            }
1273        }
1274
1275        // insert untouched ranges
1276        for range in untouched {
1277            ret.insert(range, vec![idx]);
1278        }
1279    }
1280    ret
1281}
1282
1283/// Get timestamp from array at offset
1284fn get_timestamp_from_idx(
1285    array: &ArrayRef,
1286    offset: usize,
1287) -> datafusion_common::Result<Option<Timestamp>> {
1288    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1289        unit
1290    } else {
1291        return Err(DataFusionError::Internal(format!(
1292            "Unsupported sort column type: {}",
1293            array.data_type()
1294        )));
1295    };
1296    let ty = array.data_type();
1297    let array = array.slice(offset, 1);
1298    let mut iter = downcast_ts_array!(
1299        array.data_type() => (array_iter_helper, array),
1300        _ => internal_err!("Unsupported sort column type: {ty}")?
1301    );
1302    let (_idx, val) = iter.next().ok_or_else(|| {
1303        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1304    })?;
1305    let val = if let Some(val) = val {
1306        val
1307    } else {
1308        return Ok(None);
1309    };
1310    let gt_timestamp = Timestamp::new(val, time_unit.into());
1311    Ok(Some(gt_timestamp))
1312}
1313
1314#[cfg(test)]
1315mod test {
1316    use std::io::Write;
1317    use std::sync::Arc;
1318
1319    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1320    use arrow::compute::concat_batches;
1321    use arrow::json::ArrayWriter;
1322    use arrow_schema::{Field, Schema, TimeUnit};
1323    use futures::StreamExt;
1324    use pretty_assertions::assert_eq;
1325    use serde_json::json;
1326
1327    use super::*;
1328    use crate::test_util::{MockInputExec, new_ts_array};
1329
1330    // Test helpers to reduce duplication
1331    mod helpers {
1332        use datafusion::physical_plan::expressions::Column;
1333
1334        use super::*;
1335
1336        pub fn default_sort_opts(descending: bool) -> SortOptions {
1337            SortOptions {
1338                descending,
1339                nulls_first: true,
1340            }
1341        }
1342
1343        pub fn ts_field(unit: TimeUnit) -> Field {
1344            Field::new("ts", DataType::Timestamp(unit, None), false)
1345        }
1346
1347        pub fn ts_column() -> Column {
1348            Column::new("ts", 0)
1349        }
1350
1351        pub fn partition_range(start: i64, end: i64, num_rows: usize, id: usize) -> PartitionRange {
1352            PartitionRange {
1353                start: Timestamp::new_millisecond(start),
1354                end: Timestamp::new_millisecond(end),
1355                num_rows,
1356                identifier: id,
1357            }
1358        }
1359
1360        pub fn ts_array(values: impl IntoIterator<Item = i64>) -> ArrayRef {
1361            Arc::new(TimestampMillisecondArray::from_iter_values(values))
1362        }
1363    }
1364
1365    #[test]
1366    fn test_overlapping() {
1367        let testcases = [
1368            (
1369                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1370                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1371                false,
1372            ),
1373            (
1374                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1375                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1376                true,
1377            ),
1378            (
1379                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1380                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1381                true,
1382            ),
1383            (
1384                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1385                (
1386                    Timestamp::new_millisecond(1000),
1387                    Timestamp::new_millisecond(1002),
1388                ),
1389                true,
1390            ),
1391            (
1392                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1393                (
1394                    Timestamp::new_millisecond(1001),
1395                    Timestamp::new_millisecond(1002),
1396                ),
1397                false,
1398            ),
1399            (
1400                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1401                (
1402                    Timestamp::new_millisecond(1002),
1403                    Timestamp::new_millisecond(1003),
1404                ),
1405                false,
1406            ),
1407        ];
1408
1409        for (range1, range2, expected) in testcases.iter() {
1410            assert_eq!(
1411                TimeRange::from(range1).is_overlapping(&range2.into()),
1412                *expected,
1413                "range1: {:?}, range2: {:?}",
1414                range1,
1415                range2
1416            );
1417        }
1418    }
1419
1420    #[test]
1421    fn test_split() {
1422        let testcases = [
1423            // no split
1424            (
1425                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1426                vec![0],
1427                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1428                1,
1429                vec![],
1430            ),
1431            // one part
1432            (
1433                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1434                vec![0],
1435                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1436                1,
1437                vec![
1438                    Action::Pop(
1439                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1440                    ),
1441                    Action::Push(
1442                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1443                        vec![0, 1],
1444                    ),
1445                ],
1446            ),
1447            (
1448                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1449                vec![0],
1450                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1451                1,
1452                vec![
1453                    Action::Pop(
1454                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1455                    ),
1456                    Action::Push(
1457                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1458                        vec![0, 1],
1459                    ),
1460                ],
1461            ),
1462            (
1463                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1464                vec![0],
1465                (
1466                    Timestamp::new_millisecond(1000),
1467                    Timestamp::new_millisecond(1002),
1468                ),
1469                1,
1470                vec![
1471                    Action::Pop(
1472                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1473                    ),
1474                    Action::Push(
1475                        (
1476                            Timestamp::new_millisecond(1000),
1477                            Timestamp::new_millisecond(1001),
1478                        )
1479                            .into(),
1480                        vec![0, 1],
1481                    ),
1482                ],
1483            ),
1484            // two part
1485            (
1486                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1487                vec![0],
1488                (
1489                    Timestamp::new_millisecond(1001),
1490                    Timestamp::new_millisecond(1002),
1491                ),
1492                1,
1493                vec![
1494                    Action::Pop(
1495                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1496                    ),
1497                    Action::Push(
1498                        (
1499                            Timestamp::new_millisecond(1001),
1500                            Timestamp::new_millisecond(1002),
1501                        )
1502                            .into(),
1503                        vec![0, 1],
1504                    ),
1505                    Action::Push(
1506                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1507                        vec![0],
1508                    ),
1509                ],
1510            ),
1511            // three part
1512            (
1513                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1514                vec![0],
1515                (
1516                    Timestamp::new_millisecond(1001),
1517                    Timestamp::new_millisecond(1002),
1518                ),
1519                1,
1520                vec![
1521                    Action::Pop(
1522                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1523                    ),
1524                    Action::Push(
1525                        (
1526                            Timestamp::new_millisecond(1001),
1527                            Timestamp::new_millisecond(1002),
1528                        )
1529                            .into(),
1530                        vec![0, 1],
1531                    ),
1532                    Action::Push(
1533                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1534                        vec![0],
1535                    ),
1536                    Action::Push(
1537                        (
1538                            Timestamp::new_millisecond(1002),
1539                            Timestamp::new_millisecond(1004),
1540                        )
1541                            .into(),
1542                        vec![0],
1543                    ),
1544                ],
1545            ),
1546        ];
1547
1548        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1549            assert_eq!(
1550                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1551                *expected,
1552                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1553                range,
1554                parts,
1555                split_by,
1556                split_idx
1557            );
1558        }
1559    }
1560
1561    #[test]
1562    fn test_project_partition_range_for_sort_uses_ceil_on_exclusive_end() {
1563        let range = PartitionRange {
1564            start: Timestamp::new_nanosecond(1_000_000),
1565            end: Timestamp::new_nanosecond(1_000_001),
1566            num_rows: 1,
1567            identifier: 0,
1568        };
1569
1570        let projected = project_partition_range_for_sort(
1571            range,
1572            &DataType::Timestamp(TimeUnit::Millisecond, None),
1573        )
1574        .unwrap();
1575
1576        assert_eq!(Timestamp::new_millisecond(1), projected.start);
1577        assert_eq!(Timestamp::new_millisecond(2), projected.end);
1578    }
1579
1580    #[test]
1581    fn test_find_slice_from_range_preserves_last_row_after_precision_drop() {
1582        let sort_column = SortColumn {
1583            values: Arc::new(TimestampMillisecondArray::from_iter_values([1])) as ArrayRef,
1584            options: Some(SortOptions::default()),
1585        };
1586        let range = TimeRange::new(
1587            Timestamp::new_nanosecond(1_000_000),
1588            Timestamp::new_nanosecond(1_000_001),
1589        );
1590
1591        assert_eq!((0, 1), find_slice_from_range(&sort_column, &range).unwrap());
1592    }
1593
1594    #[test]
1595    fn test_discrete_exclusive_end_creates_half_open_upper_bound() {
1596        let timestamp = Timestamp::new_millisecond(42);
1597
1598        assert_eq!(
1599            Timestamp::new_millisecond(43),
1600            discrete_exclusive_end(timestamp)
1601        );
1602    }
1603
1604    #[allow(clippy::type_complexity)]
1605    fn run_compute_working_ranges_test(
1606        testcases: Vec<(
1607            BTreeMap<(Timestamp, Timestamp), Vec<usize>>,
1608            Vec<((Timestamp, Timestamp), BTreeSet<usize>)>,
1609        )>,
1610        descending: bool,
1611    ) {
1612        for (input, expected) in testcases {
1613            let expected = expected
1614                .into_iter()
1615                .map(|(r, s)| (r.into(), s))
1616                .collect_vec();
1617            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1618            assert_eq!(
1619                compute_all_working_ranges(&input, descending),
1620                expected,
1621                "input: {:?}, descending: {}",
1622                input,
1623                descending
1624            );
1625        }
1626    }
1627
1628    #[test]
1629    fn test_compute_working_ranges_descending() {
1630        let testcases = vec![
1631            (
1632                BTreeMap::from([(
1633                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1634                    vec![0],
1635                )]),
1636                vec![(
1637                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1638                    BTreeSet::from([0]),
1639                )],
1640            ),
1641            (
1642                BTreeMap::from([(
1643                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1644                    vec![0, 1],
1645                )]),
1646                vec![(
1647                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1648                    BTreeSet::from([0, 1]),
1649                )],
1650            ),
1651            (
1652                BTreeMap::from([
1653                    (
1654                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1655                        vec![0],
1656                    ),
1657                    (
1658                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1659                        vec![0, 1],
1660                    ),
1661                ]),
1662                vec![
1663                    (
1664                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1665                        BTreeSet::from([0]),
1666                    ),
1667                    (
1668                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1669                        BTreeSet::from([0, 1]),
1670                    ),
1671                ],
1672            ),
1673            (
1674                BTreeMap::from([
1675                    (
1676                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1677                        vec![0, 1],
1678                    ),
1679                    (
1680                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1681                        vec![1],
1682                    ),
1683                ]),
1684                vec![
1685                    (
1686                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1687                        BTreeSet::from([0, 1]),
1688                    ),
1689                    (
1690                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1691                        BTreeSet::from([1]),
1692                    ),
1693                ],
1694            ),
1695            (
1696                BTreeMap::from([
1697                    (
1698                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1699                        vec![0],
1700                    ),
1701                    (
1702                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1703                        vec![0, 1],
1704                    ),
1705                    (
1706                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1707                        vec![1],
1708                    ),
1709                ]),
1710                vec![
1711                    (
1712                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1713                        BTreeSet::from([0]),
1714                    ),
1715                    (
1716                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1717                        BTreeSet::from([0, 1]),
1718                    ),
1719                    (
1720                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1721                        BTreeSet::from([1]),
1722                    ),
1723                ],
1724            ),
1725            (
1726                BTreeMap::from([
1727                    (
1728                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1729                        vec![0, 2],
1730                    ),
1731                    (
1732                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1733                        vec![0, 1, 2],
1734                    ),
1735                    (
1736                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1737                        vec![1, 2],
1738                    ),
1739                ]),
1740                vec![(
1741                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1742                    BTreeSet::from([0, 1, 2]),
1743                )],
1744            ),
1745            (
1746                BTreeMap::from([
1747                    (
1748                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1749                        vec![0, 2],
1750                    ),
1751                    (
1752                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1753                        vec![1, 2],
1754                    ),
1755                ]),
1756                vec![(
1757                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1758                    BTreeSet::from([0, 1, 2]),
1759                )],
1760            ),
1761            (
1762                BTreeMap::from([
1763                    (
1764                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1765                        vec![0, 1],
1766                    ),
1767                    (
1768                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1769                        vec![0, 1, 2],
1770                    ),
1771                    (
1772                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1773                        vec![1, 2],
1774                    ),
1775                ]),
1776                vec![(
1777                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1778                    BTreeSet::from([0, 1, 2]),
1779                )],
1780            ),
1781            (
1782                BTreeMap::from([
1783                    (
1784                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1785                        vec![0, 1],
1786                    ),
1787                    (
1788                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1789                        vec![1, 2],
1790                    ),
1791                ]),
1792                vec![
1793                    (
1794                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1795                        BTreeSet::from([0, 1]),
1796                    ),
1797                    (
1798                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1799                        BTreeSet::from([1, 2]),
1800                    ),
1801                ],
1802            ),
1803            // non-overlapping
1804            (
1805                BTreeMap::from([
1806                    (
1807                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1808                        vec![0],
1809                    ),
1810                    (
1811                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1812                        vec![1, 2],
1813                    ),
1814                ]),
1815                vec![
1816                    (
1817                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1818                        BTreeSet::from([0]),
1819                    ),
1820                    (
1821                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1822                        BTreeSet::from([1, 2]),
1823                    ),
1824                ],
1825            ),
1826        ];
1827
1828        run_compute_working_ranges_test(testcases, true);
1829    }
1830
1831    #[test]
1832    fn test_compute_working_ranges_ascending() {
1833        let testcases = vec![
1834            (
1835                BTreeMap::from([(
1836                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1837                    vec![0],
1838                )]),
1839                vec![(
1840                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1841                    BTreeSet::from([0]),
1842                )],
1843            ),
1844            (
1845                BTreeMap::from([(
1846                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1847                    vec![0, 1],
1848                )]),
1849                vec![(
1850                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1851                    BTreeSet::from([0, 1]),
1852                )],
1853            ),
1854            (
1855                BTreeMap::from([
1856                    (
1857                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1858                        vec![0, 1],
1859                    ),
1860                    (
1861                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1862                        vec![1],
1863                    ),
1864                ]),
1865                vec![
1866                    (
1867                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1868                        BTreeSet::from([0, 1]),
1869                    ),
1870                    (
1871                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1872                        BTreeSet::from([1]),
1873                    ),
1874                ],
1875            ),
1876            (
1877                BTreeMap::from([
1878                    (
1879                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1880                        vec![0],
1881                    ),
1882                    (
1883                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1884                        vec![0, 1],
1885                    ),
1886                ]),
1887                vec![
1888                    (
1889                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1890                        BTreeSet::from([0]),
1891                    ),
1892                    (
1893                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1894                        BTreeSet::from([0, 1]),
1895                    ),
1896                ],
1897            ),
1898            // test if only one count in working set get it's own working range
1899            (
1900                BTreeMap::from([
1901                    (
1902                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1903                        vec![0],
1904                    ),
1905                    (
1906                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1907                        vec![0, 1],
1908                    ),
1909                    (
1910                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1911                        vec![1],
1912                    ),
1913                ]),
1914                vec![
1915                    (
1916                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1917                        BTreeSet::from([0]),
1918                    ),
1919                    (
1920                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1921                        BTreeSet::from([0, 1]),
1922                    ),
1923                    (
1924                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1925                        BTreeSet::from([1]),
1926                    ),
1927                ],
1928            ),
1929            (
1930                BTreeMap::from([
1931                    (
1932                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1933                        vec![0, 2],
1934                    ),
1935                    (
1936                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1937                        vec![0, 1, 2],
1938                    ),
1939                    (
1940                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1941                        vec![1, 2],
1942                    ),
1943                ]),
1944                vec![(
1945                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1946                    BTreeSet::from([0, 1, 2]),
1947                )],
1948            ),
1949            (
1950                BTreeMap::from([
1951                    (
1952                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1953                        vec![0, 2],
1954                    ),
1955                    (
1956                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1957                        vec![1, 2],
1958                    ),
1959                ]),
1960                vec![(
1961                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1962                    BTreeSet::from([0, 1, 2]),
1963                )],
1964            ),
1965            (
1966                BTreeMap::from([
1967                    (
1968                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1969                        vec![0, 1],
1970                    ),
1971                    (
1972                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1973                        vec![0, 1, 2],
1974                    ),
1975                    (
1976                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1977                        vec![1, 2],
1978                    ),
1979                ]),
1980                vec![(
1981                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1982                    BTreeSet::from([0, 1, 2]),
1983                )],
1984            ),
1985            (
1986                BTreeMap::from([
1987                    (
1988                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1989                        vec![0, 1],
1990                    ),
1991                    (
1992                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1993                        vec![1, 2],
1994                    ),
1995                ]),
1996                vec![
1997                    (
1998                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1999                        BTreeSet::from([0, 1]),
2000                    ),
2001                    (
2002                        (Timestamp::new_second(2), Timestamp::new_second(3)),
2003                        BTreeSet::from([1, 2]),
2004                    ),
2005                ],
2006            ),
2007            // non-overlapping
2008            (
2009                BTreeMap::from([
2010                    (
2011                        (Timestamp::new_second(1), Timestamp::new_second(2)),
2012                        vec![0, 1],
2013                    ),
2014                    (
2015                        (Timestamp::new_second(2), Timestamp::new_second(3)),
2016                        vec![2],
2017                    ),
2018                ]),
2019                vec![
2020                    (
2021                        (Timestamp::new_second(1), Timestamp::new_second(2)),
2022                        BTreeSet::from([0, 1]),
2023                    ),
2024                    (
2025                        (Timestamp::new_second(2), Timestamp::new_second(3)),
2026                        BTreeSet::from([2]),
2027                    ),
2028                ],
2029            ),
2030        ];
2031
2032        run_compute_working_ranges_test(testcases, false);
2033    }
2034
2035    #[test]
2036    fn test_split_overlap_range() {
2037        let testcases = vec![
2038            // simple one range
2039            (
2040                vec![PartitionRange {
2041                    start: Timestamp::new_second(1),
2042                    end: Timestamp::new_second(2),
2043                    num_rows: 2,
2044                    identifier: 0,
2045                }],
2046                BTreeMap::from_iter(
2047                    vec![(
2048                        (Timestamp::new_second(1), Timestamp::new_second(2)),
2049                        vec![0],
2050                    )]
2051                    .into_iter(),
2052                ),
2053            ),
2054            // two overlapping range
2055            (
2056                vec![
2057                    PartitionRange {
2058                        start: Timestamp::new_second(1),
2059                        end: Timestamp::new_second(2),
2060                        num_rows: 2,
2061                        identifier: 0,
2062                    },
2063                    PartitionRange {
2064                        start: Timestamp::new_second(1),
2065                        end: Timestamp::new_second(2),
2066                        num_rows: 2,
2067                        identifier: 1,
2068                    },
2069                ],
2070                BTreeMap::from_iter(
2071                    vec![(
2072                        (Timestamp::new_second(1), Timestamp::new_second(2)),
2073                        vec![0, 1],
2074                    )]
2075                    .into_iter(),
2076                ),
2077            ),
2078            (
2079                vec![
2080                    PartitionRange {
2081                        start: Timestamp::new_second(1),
2082                        end: Timestamp::new_second(3),
2083                        num_rows: 2,
2084                        identifier: 0,
2085                    },
2086                    PartitionRange {
2087                        start: Timestamp::new_second(2),
2088                        end: Timestamp::new_second(4),
2089                        num_rows: 2,
2090                        identifier: 1,
2091                    },
2092                ],
2093                BTreeMap::from_iter(
2094                    vec![
2095                        (
2096                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2097                            vec![0],
2098                        ),
2099                        (
2100                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2101                            vec![0, 1],
2102                        ),
2103                        (
2104                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2105                            vec![1],
2106                        ),
2107                    ]
2108                    .into_iter(),
2109                ),
2110            ),
2111            // three or more overlapping range
2112            (
2113                vec![
2114                    PartitionRange {
2115                        start: Timestamp::new_second(1),
2116                        end: Timestamp::new_second(3),
2117                        num_rows: 2,
2118                        identifier: 0,
2119                    },
2120                    PartitionRange {
2121                        start: Timestamp::new_second(2),
2122                        end: Timestamp::new_second(4),
2123                        num_rows: 2,
2124                        identifier: 1,
2125                    },
2126                    PartitionRange {
2127                        start: Timestamp::new_second(1),
2128                        end: Timestamp::new_second(4),
2129                        num_rows: 2,
2130                        identifier: 2,
2131                    },
2132                ],
2133                BTreeMap::from_iter(
2134                    vec![
2135                        (
2136                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2137                            vec![0, 2],
2138                        ),
2139                        (
2140                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2141                            vec![0, 1, 2],
2142                        ),
2143                        (
2144                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2145                            vec![1, 2],
2146                        ),
2147                    ]
2148                    .into_iter(),
2149                ),
2150            ),
2151            (
2152                vec![
2153                    PartitionRange {
2154                        start: Timestamp::new_second(1),
2155                        end: Timestamp::new_second(3),
2156                        num_rows: 2,
2157                        identifier: 0,
2158                    },
2159                    PartitionRange {
2160                        start: Timestamp::new_second(1),
2161                        end: Timestamp::new_second(4),
2162                        num_rows: 2,
2163                        identifier: 1,
2164                    },
2165                    PartitionRange {
2166                        start: Timestamp::new_second(2),
2167                        end: Timestamp::new_second(4),
2168                        num_rows: 2,
2169                        identifier: 2,
2170                    },
2171                ],
2172                BTreeMap::from_iter(
2173                    vec![
2174                        (
2175                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2176                            vec![0, 1],
2177                        ),
2178                        (
2179                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2180                            vec![0, 1, 2],
2181                        ),
2182                        (
2183                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2184                            vec![1, 2],
2185                        ),
2186                    ]
2187                    .into_iter(),
2188                ),
2189            ),
2190        ];
2191
2192        for (input, expected) in testcases {
2193            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2194            assert_eq!(split_overlapping_ranges(&input), expected);
2195        }
2196    }
2197
2198    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2199        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2200            Self {
2201                offset: offset as usize,
2202                len: len as usize,
2203                first_val: min_val,
2204                last_val: max_val,
2205            }
2206        }
2207    }
2208
2209    #[test]
2210    fn test_find_successive_runs() {
2211        let testcases = vec![
2212            (
2213                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2214                Some(SortOptions {
2215                    descending: false,
2216                    nulls_first: false,
2217                }),
2218                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2219            ),
2220            (
2221                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2222                Some(SortOptions {
2223                    descending: false,
2224                    nulls_first: false,
2225                }),
2226                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2227            ),
2228            (
2229                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2230                Some(SortOptions {
2231                    descending: false,
2232                    nulls_first: false,
2233                }),
2234                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2235            ),
2236            (
2237                vec![Some(1), Some(2), Some(1), Some(3)],
2238                Some(SortOptions {
2239                    descending: false,
2240                    nulls_first: false,
2241                }),
2242                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2243            ),
2244            (
2245                vec![Some(1), Some(2), Some(1), Some(3)],
2246                Some(SortOptions {
2247                    descending: true,
2248                    nulls_first: false,
2249                }),
2250                vec![
2251                    (0, 1, Some(1), Some(1)),
2252                    (1, 2, Some(2), Some(1)),
2253                    (3, 1, Some(3), Some(3)),
2254                ],
2255            ),
2256            (
2257                vec![Some(1), Some(2), None, Some(3)],
2258                Some(SortOptions {
2259                    descending: false,
2260                    nulls_first: true,
2261                }),
2262                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2263            ),
2264            (
2265                vec![Some(1), Some(2), None, Some(3)],
2266                Some(SortOptions {
2267                    descending: false,
2268                    nulls_first: false,
2269                }),
2270                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2271            ),
2272            (
2273                vec![Some(2), Some(1), None, Some(3)],
2274                Some(SortOptions {
2275                    descending: true,
2276                    nulls_first: true,
2277                }),
2278                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2279            ),
2280            (
2281                vec![],
2282                Some(SortOptions {
2283                    descending: false,
2284                    nulls_first: true,
2285                }),
2286                vec![(0, 0, None, None)],
2287            ),
2288            (
2289                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2290                Some(SortOptions {
2291                    descending: true,
2292                    nulls_first: true,
2293                }),
2294                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2295            ),
2296            (
2297                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2298                Some(SortOptions {
2299                    descending: true,
2300                    nulls_first: false,
2301                }),
2302                vec![
2303                    (0, 2, None, None),
2304                    (2, 3, Some(2), Some(1)),
2305                    (5, 2, Some(5), Some(4)),
2306                ],
2307            ),
2308        ];
2309        for (input, sort_opts, expected) in testcases {
2310            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2311            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2312            assert_eq!(
2313                ret, expected,
2314                "input: {:?}, opt: {:?},expected: {:?}",
2315                input, sort_opts, expected
2316            );
2317        }
2318    }
2319
2320    #[test]
2321    fn test_cmp_with_opts() {
2322        let testcases = vec![
2323            // Test ascending vs descending for Some values
2324            (
2325                Some(1),
2326                Some(2),
2327                Some(SortOptions {
2328                    descending: false,
2329                    nulls_first: false,
2330                }),
2331                std::cmp::Ordering::Less,
2332            ),
2333            (
2334                Some(1),
2335                Some(2),
2336                Some(SortOptions {
2337                    descending: true,
2338                    nulls_first: false,
2339                }),
2340                std::cmp::Ordering::Greater,
2341            ),
2342            // Test Some vs None with nulls_first
2343            (
2344                Some(1),
2345                None,
2346                Some(SortOptions {
2347                    descending: false,
2348                    nulls_first: true,
2349                }),
2350                std::cmp::Ordering::Greater,
2351            ),
2352            (
2353                Some(1),
2354                None,
2355                Some(SortOptions {
2356                    descending: true,
2357                    nulls_first: true,
2358                }),
2359                std::cmp::Ordering::Greater,
2360            ),
2361            // Test Some vs None with nulls_last
2362            (
2363                Some(1),
2364                None,
2365                Some(SortOptions {
2366                    descending: true,
2367                    nulls_first: false,
2368                }),
2369                std::cmp::Ordering::Less,
2370            ),
2371            (
2372                Some(1),
2373                None,
2374                Some(SortOptions {
2375                    descending: false,
2376                    nulls_first: false,
2377                }),
2378                std::cmp::Ordering::Less,
2379            ),
2380            // Test None vs None - always Equal regardless of sort options
2381            (
2382                None,
2383                None,
2384                Some(SortOptions {
2385                    descending: false,
2386                    nulls_first: true,
2387                }),
2388                std::cmp::Ordering::Equal,
2389            ),
2390        ];
2391        for (a, b, opts, expected) in testcases {
2392            assert_eq!(
2393                cmp_with_opts(&a, &b, &opts),
2394                expected,
2395                "a: {:?}, b: {:?}, opts: {:?}",
2396                a,
2397                b,
2398                opts
2399            );
2400        }
2401    }
2402
2403    #[test]
2404    fn test_find_slice_from_range() {
2405        let test_cases = vec![
2406            // test for off by one case
2407            (
2408                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2409                false,
2410                TimeRange {
2411                    start: Timestamp::new_millisecond(2),
2412                    end: Timestamp::new_millisecond(4),
2413                },
2414                Ok((1, 2)),
2415            ),
2416            (
2417                Arc::new(TimestampMillisecondArray::from_iter_values([
2418                    -2, -1, 0, 1, 2, 3, 4, 5,
2419                ])) as ArrayRef,
2420                false,
2421                TimeRange {
2422                    start: Timestamp::new_millisecond(-1),
2423                    end: Timestamp::new_millisecond(4),
2424                },
2425                Ok((1, 5)),
2426            ),
2427            (
2428                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2429                false,
2430                TimeRange {
2431                    start: Timestamp::new_millisecond(2),
2432                    end: Timestamp::new_millisecond(5),
2433                },
2434                Ok((1, 2)),
2435            ),
2436            (
2437                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2438                false,
2439                TimeRange {
2440                    start: Timestamp::new_millisecond(2),
2441                    end: Timestamp::new_millisecond(5),
2442                },
2443                Ok((1, 3)),
2444            ),
2445            (
2446                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2447                false,
2448                TimeRange {
2449                    start: Timestamp::new_millisecond(2),
2450                    end: Timestamp::new_millisecond(5),
2451                },
2452                Ok((1, 2)),
2453            ),
2454            (
2455                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2456                false,
2457                TimeRange {
2458                    start: Timestamp::new_millisecond(6),
2459                    end: Timestamp::new_millisecond(7),
2460                },
2461                Ok((5, 0)),
2462            ),
2463            // descending off by one cases
2464            (
2465                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2466                true,
2467                TimeRange {
2468                    end: Timestamp::new_millisecond(4),
2469                    start: Timestamp::new_millisecond(1),
2470                },
2471                Ok((1, 3)),
2472            ),
2473            (
2474                Arc::new(TimestampMillisecondArray::from_iter_values([
2475                    5, 4, 3, 2, 1, 0,
2476                ])) as ArrayRef,
2477                true,
2478                TimeRange {
2479                    end: Timestamp::new_millisecond(4),
2480                    start: Timestamp::new_millisecond(1),
2481                },
2482                Ok((2, 3)),
2483            ),
2484            (
2485                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2486                true,
2487                TimeRange {
2488                    end: Timestamp::new_millisecond(4),
2489                    start: Timestamp::new_millisecond(1),
2490                },
2491                Ok((1, 2)),
2492            ),
2493            (
2494                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2495                true,
2496                TimeRange {
2497                    end: Timestamp::new_millisecond(4),
2498                    start: Timestamp::new_millisecond(1),
2499                },
2500                Ok((2, 2)),
2501            ),
2502            (
2503                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2504                true,
2505                TimeRange {
2506                    end: Timestamp::new_millisecond(5),
2507                    start: Timestamp::new_millisecond(2),
2508                },
2509                Ok((1, 3)),
2510            ),
2511            (
2512                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2513                true,
2514                TimeRange {
2515                    end: Timestamp::new_millisecond(5),
2516                    start: Timestamp::new_millisecond(2),
2517                },
2518                Ok((1, 2)),
2519            ),
2520            (
2521                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2522                true,
2523                TimeRange {
2524                    end: Timestamp::new_millisecond(5),
2525                    start: Timestamp::new_millisecond(2),
2526                },
2527                Ok((1, 3)),
2528            ),
2529            (
2530                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2531                true,
2532                TimeRange {
2533                    end: Timestamp::new_millisecond(5),
2534                    start: Timestamp::new_millisecond(2),
2535                },
2536                Ok((1, 2)),
2537            ),
2538            (
2539                Arc::new(TimestampMillisecondArray::from_iter_values([
2540                    10, 9, 8, 7, 6,
2541                ])) as ArrayRef,
2542                true,
2543                TimeRange {
2544                    end: Timestamp::new_millisecond(5),
2545                    start: Timestamp::new_millisecond(2),
2546                },
2547                Ok((5, 0)),
2548            ),
2549            // test off by one case
2550            (
2551                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2552                true,
2553                TimeRange {
2554                    end: Timestamp::new_millisecond(4),
2555                    start: Timestamp::new_millisecond(3),
2556                },
2557                Ok((0, 1)),
2558            ),
2559            (
2560                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2561                true,
2562                TimeRange {
2563                    end: Timestamp::new_millisecond(4),
2564                    start: Timestamp::new_millisecond(3),
2565                },
2566                Ok((1, 1)),
2567            ),
2568            (
2569                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2570                true,
2571                TimeRange {
2572                    end: Timestamp::new_millisecond(4),
2573                    start: Timestamp::new_millisecond(3),
2574                },
2575                Ok((2, 1)),
2576            ),
2577        ];
2578
2579        for (sort_vals, descending, range, expected) in test_cases {
2580            let sort_column = SortColumn {
2581                values: sort_vals,
2582                options: Some(SortOptions {
2583                    descending,
2584                    ..Default::default()
2585                }),
2586            };
2587            let ret = find_slice_from_range(&sort_column, &range);
2588            match (ret, expected) {
2589                (Ok(ret), Ok(expected)) => {
2590                    assert_eq!(
2591                        ret, expected,
2592                        "sort_vals: {:?}, range: {:?}",
2593                        sort_column, range
2594                    )
2595                }
2596                (Err(err), Err(expected)) => {
2597                    let expected: &str = expected;
2598                    assert!(
2599                        err.to_string().contains(expected),
2600                        "err: {:?}, expected: {:?}",
2601                        err,
2602                        expected
2603                    );
2604                }
2605                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2606            }
2607        }
2608    }
2609
2610    #[derive(Debug)]
2611    struct TestStream {
2612        expression: PhysicalSortExpr,
2613        fetch: Option<usize>,
2614        input: Vec<(PartitionRange, DfRecordBatch)>,
2615        output: Vec<DfRecordBatch>,
2616        schema: SchemaRef,
2617    }
2618
2619    impl TestStream {
2620        fn new(
2621            opt: SortOptions,
2622            fetch: Option<usize>,
2623            unit: TimeUnit,
2624            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2625            expected: Vec<Vec<ArrayRef>>,
2626        ) -> Self {
2627            let expression = PhysicalSortExpr {
2628                expr: Arc::new(helpers::ts_column()),
2629                options: opt,
2630            };
2631            let schema = Schema::new(vec![helpers::ts_field(unit)]);
2632            let schema = Arc::new(schema);
2633            let input = input
2634                .into_iter()
2635                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2636                .collect_vec();
2637            let output_batchs = expected
2638                .into_iter()
2639                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2640                .collect_vec();
2641            Self {
2642                expression,
2643                fetch,
2644                input,
2645                output: output_batchs,
2646                schema,
2647            }
2648        }
2649
2650        fn new_simple(
2651            descending: bool,
2652            fetch: Option<usize>,
2653            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2654            expected: Vec<Vec<ArrayRef>>,
2655        ) -> Self {
2656            Self::new(
2657                helpers::default_sort_opts(descending),
2658                fetch,
2659                TimeUnit::Millisecond,
2660                input,
2661                expected,
2662            )
2663        }
2664
2665        async fn run_test(&self) -> Vec<DfRecordBatch> {
2666            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2667
2668            let mock_input = MockInputExec::new(vec![batches], self.schema.clone());
2669
2670            let exec = WindowedSortExec::try_new(
2671                self.expression.clone(),
2672                self.fetch,
2673                vec![ranges],
2674                Arc::new(mock_input),
2675            )
2676            .unwrap();
2677
2678            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2679
2680            let real_output = exec_stream.collect::<Vec<_>>().await;
2681            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2682            real_output
2683        }
2684    }
2685
2686    #[tokio::test]
2687    async fn test_window_sort_empty_and_minimal() {
2688        use helpers::*;
2689        let test_cases = [
2690            // Empty input
2691            TestStream::new_simple(false, None, vec![], vec![]),
2692            // One empty batch, one with data
2693            TestStream::new_simple(
2694                false,
2695                None,
2696                vec![
2697                    (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2698                    (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2699                ],
2700                vec![vec![ts_array([2])]],
2701            ),
2702            // Both batches empty
2703            TestStream::new_simple(
2704                false,
2705                None,
2706                vec![
2707                    (partition_range(1, 2, 1, 0), vec![ts_array([])]),
2708                    (partition_range(1, 3, 1, 0), vec![ts_array([])]),
2709                ],
2710                vec![],
2711            ),
2712            // Indistinguishable boundary case - value at exact boundary
2713            TestStream::new_simple(
2714                false,
2715                None,
2716                vec![
2717                    (partition_range(1, 2, 1, 0), vec![ts_array([1])]),
2718                    (partition_range(1, 3, 1, 0), vec![ts_array([2])]),
2719                ],
2720                vec![vec![ts_array([1])], vec![ts_array([2])]],
2721            ),
2722        ];
2723
2724        for (idx, testcase) in test_cases.iter().enumerate() {
2725            let output = testcase.run_test().await;
2726            assert_eq!(output, testcase.output, "empty/minimal case {idx} failed");
2727        }
2728    }
2729
2730    #[tokio::test]
2731    async fn test_window_sort_overlapping() {
2732        use helpers::*;
2733        let test_cases = [
2734            // Direct emit - overlapping ranges without merge
2735            TestStream::new_simple(
2736                false,
2737                None,
2738                vec![
2739                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2740                    (partition_range(1, 4, 1, 0), vec![ts_array([2, 3])]),
2741                ],
2742                vec![
2743                    vec![ts_array([1, 2])],
2744                    vec![ts_array([2])],
2745                    vec![ts_array([3])],
2746                ],
2747            ),
2748            // Cross working range batch intersection - triggers merge
2749            TestStream::new_simple(
2750                false,
2751                None,
2752                vec![
2753                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2754                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2755                ],
2756                vec![vec![ts_array([1, 1, 2, 2])], vec![ts_array([3])]],
2757            ),
2758            // No overlap case - separate ranges
2759            TestStream::new_simple(
2760                false,
2761                None,
2762                vec![
2763                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2764                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2765                    (partition_range(4, 6, 1, 1), vec![ts_array([4, 5])]),
2766                ],
2767                vec![
2768                    vec![ts_array([1, 1, 2, 2])],
2769                    vec![ts_array([3])],
2770                    vec![ts_array([4, 5])],
2771                ],
2772            ),
2773        ];
2774
2775        for (idx, testcase) in test_cases.iter().enumerate() {
2776            let output = testcase.run_test().await;
2777            assert_eq!(output, testcase.output, "overlapping case {idx} failed");
2778        }
2779    }
2780
2781    #[tokio::test]
2782    async fn test_window_sort_with_fetch() {
2783        use helpers::*;
2784        let test_cases = [
2785            // Fetch limit stops at 6 rows
2786            TestStream::new_simple(
2787                false,
2788                Some(6),
2789                vec![
2790                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2791                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2792                    (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2793                ],
2794                vec![
2795                    vec![ts_array([1, 1, 2, 2])],
2796                    vec![ts_array([3])],
2797                    vec![ts_array([4])],
2798                ],
2799            ),
2800            // Fetch limit stops at 3 rows
2801            TestStream::new_simple(
2802                false,
2803                Some(3),
2804                vec![
2805                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2806                    (partition_range(1, 4, 1, 1), vec![ts_array([1, 2, 3])]),
2807                    (partition_range(3, 6, 1, 1), vec![ts_array([4, 5])]),
2808                ],
2809                vec![vec![ts_array([1, 1, 2])]],
2810            ),
2811        ];
2812
2813        for (idx, testcase) in test_cases.iter().enumerate() {
2814            let output = testcase.run_test().await;
2815            assert_eq!(output, testcase.output, "fetch case {idx} failed");
2816        }
2817    }
2818
2819    #[tokio::test]
2820    async fn test_window_sort_descending() {
2821        use helpers::*;
2822        let test_cases = [
2823            // Descending order sort
2824            TestStream::new_simple(
2825                true,
2826                None,
2827                vec![
2828                    (partition_range(3, 6, 1, 1), vec![ts_array([5, 4])]),
2829                    (partition_range(1, 4, 1, 1), vec![ts_array([3, 2, 1])]),
2830                    (partition_range(1, 3, 1, 0), vec![ts_array([2, 1])]),
2831                ],
2832                vec![
2833                    vec![ts_array([5, 4])],
2834                    vec![ts_array([3])],
2835                    vec![ts_array([2, 2, 1, 1])],
2836                ],
2837            ),
2838        ];
2839
2840        for (idx, testcase) in test_cases.iter().enumerate() {
2841            let output = testcase.run_test().await;
2842            assert_eq!(output, testcase.output, "descending case {idx} failed");
2843        }
2844    }
2845
2846    #[tokio::test]
2847    async fn test_window_sort_complex() {
2848        use helpers::*;
2849        let test_cases = [
2850            // Long range with subset short run
2851            TestStream::new_simple(
2852                false,
2853                None,
2854                vec![
2855                    (partition_range(1, 10, 1, 0), vec![ts_array([1, 5, 9])]),
2856                    (partition_range(3, 7, 1, 1), vec![ts_array([3, 4, 5, 6])]),
2857                ],
2858                vec![vec![ts_array([1])], vec![ts_array([3, 4, 5, 5, 6, 9])]],
2859            ),
2860            // Complex multi-range overlap
2861            TestStream::new_simple(
2862                false,
2863                None,
2864                vec![
2865                    (partition_range(1, 3, 1, 0), vec![ts_array([1, 2])]),
2866                    (
2867                        partition_range(1, 10, 1, 1),
2868                        vec![ts_array([1, 3, 4, 5, 6, 8])],
2869                    ),
2870                    (partition_range(7, 10, 1, 1), vec![ts_array([7, 8, 9])]),
2871                ],
2872                vec![
2873                    vec![ts_array([1, 1, 2])],
2874                    vec![ts_array([3, 4, 5, 6])],
2875                    vec![ts_array([7, 8, 8, 9])],
2876                ],
2877            ),
2878            // Subset with duplicate datapoints
2879            TestStream::new_simple(
2880                false,
2881                None,
2882                vec![
2883                    (
2884                        partition_range(1, 11, 1, 0),
2885                        vec![ts_array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])],
2886                    ),
2887                    (partition_range(5, 7, 1, 1), vec![ts_array([5, 6])]),
2888                ],
2889                vec![
2890                    vec![ts_array([1, 2, 3, 4])],
2891                    vec![ts_array([5, 5, 6, 6, 7, 8, 9, 10])],
2892                ],
2893            ),
2894        ];
2895
2896        for (idx, testcase) in test_cases.iter().enumerate() {
2897            let output = testcase.run_test().await;
2898            assert_eq!(output, testcase.output, "complex case {idx} failed");
2899        }
2900    }
2901
2902    #[tokio::test]
2903    async fn fuzzy_ish_test_window_sort_stream() {
2904        let test_cnt = 100;
2905        let part_cnt_bound = 100;
2906        let range_size_bound = 100;
2907        let range_offset_bound = 100;
2908        let in_range_datapoint_cnt_bound = 100;
2909        let fetch_bound = 100;
2910
2911        let mut rng = fastrand::Rng::new();
2912        let rng_seed = rng.u64(..);
2913        rng.seed(rng_seed);
2914        let mut bound_val = None;
2915        // construct testcases
2916        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
2917        let mut full_testcase_list = Vec::new();
2918        for _case_id in 0..test_cnt {
2919            let descending = rng.bool();
2920            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
2921                if descending {
2922                    return Box::new(|a: &T, b: &T| b.cmp(a));
2923                }
2924                Box::new(|a: &T, b: &T| a.cmp(b))
2925            }
2926            let unit = match rng.u8(0..3) {
2927                0 => TimeUnit::Second,
2928                1 => TimeUnit::Millisecond,
2929                2 => TimeUnit::Microsecond,
2930                _ => TimeUnit::Nanosecond,
2931            };
2932            let fetch = if rng.bool() {
2933                Some(rng.usize(0..fetch_bound))
2934            } else {
2935                None
2936            };
2937
2938            let mut input_ranged_data = vec![];
2939            let mut output_data: Vec<i64> = vec![];
2940            // generate input data
2941            for part_id in 0..rng.usize(0..part_cnt_bound) {
2942                let (start, end) = if descending {
2943                    // Use 1..=range_offset_bound to ensure strictly decreasing end values
2944                    let end = bound_val
2945                        .map(|i| i - rng.i64(1..=range_offset_bound))
2946                        .unwrap_or_else(|| rng.i64(..));
2947                    bound_val = Some(end);
2948                    let start = end - rng.i64(1..range_size_bound);
2949                    let start = Timestamp::new(start, unit.into());
2950                    let end = Timestamp::new(end, unit.into());
2951                    (start, end)
2952                } else {
2953                    // Use 1..=range_offset_bound to ensure strictly increasing start values
2954                    let start = bound_val
2955                        .map(|i| i + rng.i64(1..=range_offset_bound))
2956                        .unwrap_or_else(|| rng.i64(..));
2957                    bound_val = Some(start);
2958                    let end = start + rng.i64(1..range_size_bound);
2959                    let start = Timestamp::new(start, unit.into());
2960                    let end = Timestamp::new(end, unit.into());
2961                    (start, end)
2962                };
2963
2964                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
2965                let data_gen = iter
2966                    .map(|_| rng.i64(start.value()..end.value()))
2967                    .sorted_by(ret_cmp_fn(descending))
2968                    .collect_vec();
2969                output_data.extend(data_gen.clone());
2970                let arr = new_ts_array(unit, data_gen);
2971                let range = PartitionRange {
2972                    start,
2973                    end,
2974                    num_rows: arr.len(),
2975                    identifier: part_id,
2976                };
2977                input_ranged_data.push((range, vec![arr]));
2978            }
2979
2980            output_data.sort_by(ret_cmp_fn(descending));
2981            if let Some(fetch) = fetch {
2982                output_data.truncate(fetch);
2983            }
2984            let output_arr = new_ts_array(unit, output_data);
2985
2986            let test_stream = TestStream::new(
2987                helpers::default_sort_opts(descending),
2988                fetch,
2989                unit,
2990                input_ranged_data.clone(),
2991                vec![vec![output_arr]],
2992            );
2993            full_testcase_list.push(test_stream);
2994        }
2995
2996        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
2997            let res = test_stream.run_test().await;
2998            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
2999            let expected = test_stream.output;
3000            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3001
3002            if res_concat != expected_concat {
3003                {
3004                    let mut f_input = std::io::stderr();
3005                    f_input.write_all(b"[").unwrap();
3006                    for (input_range, input_arr) in test_stream.input {
3007                        let range_json = json!({
3008                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3009                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3010                            "num_rows": input_range.num_rows,
3011                            "identifier": input_range.identifier,
3012                        });
3013                        let buf = Vec::new();
3014                        let mut input_writer = ArrayWriter::new(buf);
3015                        input_writer.write(&input_arr).unwrap();
3016                        input_writer.finish().unwrap();
3017                        let res_str =
3018                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3019                        let whole_json =
3020                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3021                        f_input.write_all(whole_json.as_bytes()).unwrap();
3022                    }
3023                    f_input.write_all(b"]").unwrap();
3024                }
3025                {
3026                    let mut f_res = std::io::stderr();
3027                    f_res.write_all(b"[").unwrap();
3028                    for batch in &res {
3029                        let mut res_writer = ArrayWriter::new(f_res);
3030                        res_writer.write(batch).unwrap();
3031                        res_writer.finish().unwrap();
3032                        f_res = res_writer.into_inner();
3033                        f_res.write_all(b",").unwrap();
3034                    }
3035                    f_res.write_all(b"]").unwrap();
3036
3037                    let f_res_concat = std::io::stderr();
3038                    let mut res_writer = ArrayWriter::new(f_res_concat);
3039                    res_writer.write(&res_concat).unwrap();
3040                    res_writer.finish().unwrap();
3041
3042                    let f_expected = std::io::stderr();
3043                    let mut expected_writer = ArrayWriter::new(f_expected);
3044                    expected_writer.write(&expected_concat).unwrap();
3045                    expected_writer.finish().unwrap();
3046                }
3047                panic!(
3048                    "case failed, case id: {0}, output and expected output to stderr",
3049                    case_id
3050                );
3051            }
3052            assert_eq!(
3053                res_concat, expected_concat,
3054                "case failed, case id: {}, rng seed: {}",
3055                case_id, rng_seed
3056            );
3057        }
3058    }
3059}