query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{internal_err, DataFusionError};
43use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
53/// merge sort them whenever possible, and emit the sorted result as soon as possible.
54/// This sorting plan only accept sort by ts and will not sort by other fields.
55///
56/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
57///
58/// # Invariant Promise on Input Stream
59/// 1. The input stream must be sorted by timestamp and
60/// 2. in the order of `PartitionRange` in `ranges`
61/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
62/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
63///
64///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
65
66#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68    /// Physical sort expressions(that is, sort by timestamp)
69    expression: PhysicalSortExpr,
70    /// Optional number of rows to fetch. Stops producing rows after this fetch
71    fetch: Option<usize>,
72    /// The input ranges indicate input stream will be composed of those ranges in given order.
73    ///
74    /// Each partition has one vector of `PartitionRange`.
75    ranges: Vec<Vec<PartitionRange>>,
76    /// All available working ranges and their corresponding working set
77    ///
78    /// working ranges promise once input stream get a value out of current range, future values will never
79    /// be in this range. Each partition has one vector of ranges.
80    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81    input: Arc<dyn ExecutionPlan>,
82    /// Execution metrics
83    metrics: ExecutionPlanMetricsSet,
84    properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88    ranges: &[Vec<PartitionRange>],
89    descending: bool,
90) -> Result<()> {
91    let is_valid = ranges.iter().all(|r| {
92        if descending {
93            r.windows(2).all(|w| w[0].end >= w[1].end)
94        } else {
95            r.windows(2).all(|w| w[0].start <= w[1].start)
96        }
97    });
98
99    if !is_valid {
100        let msg = if descending {
101            "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102        } else {
103            "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104        };
105        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107    } else {
108        Ok(())
109    }
110}
111
112impl WindowedSortExec {
113    pub fn try_new(
114        expression: PhysicalSortExpr,
115        fetch: Option<usize>,
116        ranges: Vec<Vec<PartitionRange>>,
117        input: Arc<dyn ExecutionPlan>,
118    ) -> Result<Self> {
119        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121        let properties = input.properties();
122        let properties = PlanProperties::new(
123            input
124                .equivalence_properties()
125                .clone()
126                .with_reorder(LexOrdering::new(vec![expression.clone()])),
127            input.output_partitioning().clone(),
128            properties.emission_type,
129            properties.boundedness,
130        );
131
132        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133        for r in &ranges {
134            let overlap_counts = split_overlapping_ranges(r);
135            let working_ranges =
136                compute_all_working_ranges(&overlap_counts, expression.options.descending);
137            all_avail_working_range.push(working_ranges);
138        }
139
140        Ok(Self {
141            expression,
142            fetch,
143            ranges,
144            all_avail_working_range,
145            input,
146            metrics: ExecutionPlanMetricsSet::new(),
147            properties,
148        })
149    }
150
151    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
152    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
153    /// of working set, we can merge results before whenever possible.
154    pub fn to_stream(
155        &self,
156        context: Arc<TaskContext>,
157        partition: usize,
158    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159        let input_stream: DfSendableRecordBatchStream =
160            self.input.execute(partition, context.clone())?;
161
162        let df_stream = Box::pin(WindowedSortStream::new(
163            context,
164            self,
165            input_stream,
166            partition,
167        )) as _;
168
169        Ok(df_stream)
170    }
171}
172
173impl DisplayAs for WindowedSortExec {
174    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        write!(
176            f,
177            "WindowedSortExec: expr={} num_ranges={}",
178            self.expression,
179            self.ranges.len()
180        )?;
181        if let Some(fetch) = self.fetch {
182            write!(f, " fetch={}", fetch)?;
183        }
184        Ok(())
185    }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189    fn as_any(&self) -> &dyn Any {
190        self
191    }
192
193    fn schema(&self) -> SchemaRef {
194        self.input.schema()
195    }
196
197    fn properties(&self) -> &PlanProperties {
198        &self.properties
199    }
200
201    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202        vec![&self.input]
203    }
204
205    fn with_new_children(
206        self: Arc<Self>,
207        children: Vec<Arc<dyn ExecutionPlan>>,
208    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209        let new_input = if let Some(first) = children.first() {
210            first
211        } else {
212            internal_err!("No children found")?
213        };
214        let new = Self::try_new(
215            self.expression.clone(),
216            self.fetch,
217            self.ranges.clone(),
218            new_input.clone(),
219        )?;
220        Ok(Arc::new(new))
221    }
222
223    fn execute(
224        &self,
225        partition: usize,
226        context: Arc<TaskContext>,
227    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228        self.to_stream(context, partition)
229    }
230
231    fn metrics(&self) -> Option<MetricsSet> {
232        Some(self.metrics.clone_inner())
233    }
234
235    /// # Explain
236    ///
237    /// This plan needs to be executed on each partition independently,
238    /// and is expected to run directly on storage engine's output
239    /// distribution / partition.
240    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241        vec![false; self.ranges.len()]
242    }
243
244    fn name(&self) -> &str {
245        "WindowedSortExec"
246    }
247}
248
249/// The core logic of merging sort multiple sorted ranges
250///
251/// the flow of data is:
252/// ```md
253/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
254/// ```
255pub struct WindowedSortStream {
256    /// Memory pool for this stream
257    memory_pool: Arc<dyn MemoryPool>,
258    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
259    in_progress: Vec<DfRecordBatch>,
260    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
261    last_value: Option<Timestamp>,
262    /// Current working set of `PartitionRange` sorted RecordBatches
263    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
265    merge_stream: VecDeque<DfSendableRecordBatchStream>,
266    /// The number of times merge sort has been called
267    merge_count: usize,
268    /// Index into current `working_range` in `all_avail_working_range`
269    working_idx: usize,
270    /// input stream assumed reading in order of `PartitionRange`
271    input: DfSendableRecordBatchStream,
272    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
273    is_terminated: bool,
274    /// Output Schema, which is the same as input schema, since this is a sort plan
275    schema: SchemaRef,
276    /// Physical sort expressions(that is, sort by timestamp)
277    expression: PhysicalSortExpr,
278    /// Optional number of rows to fetch. Stops producing rows after this fetch
279    fetch: Option<usize>,
280    /// number of rows produced
281    produced: usize,
282    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
283    batch_size: usize,
284    /// All available working ranges and their corresponding working set
285    ///
286    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
287    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288    /// The input partition ranges
289    #[allow(dead_code)] // this is used under #[debug_assertions]
290    ranges: Vec<PartitionRange>,
291    /// Execution metrics
292    metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296    pub fn new(
297        context: Arc<TaskContext>,
298        exec: &WindowedSortExec,
299        input: DfSendableRecordBatchStream,
300        partition: usize,
301    ) -> Self {
302        Self {
303            memory_pool: context.runtime_env().memory_pool.clone(),
304            in_progress: Vec::new(),
305            last_value: None,
306            sorted_input_runs: Vec::new(),
307            merge_stream: VecDeque::new(),
308            merge_count: 0,
309            working_idx: 0,
310            schema: input.schema(),
311            input,
312            is_terminated: false,
313            expression: exec.expression.clone(),
314            fetch: exec.fetch,
315            produced: 0,
316            batch_size: context.session_config().batch_size(),
317            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318            ranges: exec.ranges[partition].clone(),
319            metrics: BaselineMetrics::new(&exec.metrics, partition),
320        }
321    }
322}
323
324impl WindowedSortStream {
325    #[cfg(debug_assertions)]
326    fn check_subset_ranges(&self, cur_range: &TimeRange) {
327        let cur_is_subset_to = self
328            .ranges
329            .iter()
330            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331            .collect_vec();
332        if cur_is_subset_to.is_empty() {
333            error!("Current range is not a subset of any PartitionRange");
334            // found in all ranges that are subset of current range
335            let subset_ranges = self
336                .ranges
337                .iter()
338                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339                .collect_vec();
340            let only_overlap = self
341                .ranges
342                .iter()
343                .filter(|r| {
344                    let r = TimeRange::from(*r);
345                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346                })
347                .collect_vec();
348            error!(
349                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350                subset_ranges.len(),
351                only_overlap.len(),
352                subset_ranges,
353                only_overlap
354            );
355        } else {
356            let only_overlap = self
357                .ranges
358                .iter()
359                .filter(|r| {
360                    let r = TimeRange::from(*r);
361                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362                })
363                .collect_vec();
364            error!(
365                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366                cur_is_subset_to.len(),
367                only_overlap.len(),
368                cur_is_subset_to,
369                only_overlap
370            );
371        }
372        let all_overlap_working_range = self
373            .all_avail_working_range
374            .iter()
375            .filter(|(range, _)| range.is_overlapping(cur_range))
376            .map(|(range, _)| range)
377            .collect_vec();
378        error!(
379            "Found {} working ranges that overlap with current range: {:?}",
380            all_overlap_working_range.len(),
381            all_overlap_working_range
382        );
383    }
384
385    /// Poll the next RecordBatch from the merge-sort's output stream
386    fn poll_result_stream(
387        &mut self,
388        cx: &mut Context<'_>,
389    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391            match merge_stream.as_mut().poll_next(cx) {
392                Poll::Ready(Some(Ok(batch))) => {
393                    let ret = if let Some(remaining) = self.remaining_fetch() {
394                        if remaining == 0 {
395                            self.is_terminated = true;
396                            None
397                        } else if remaining < batch.num_rows() {
398                            self.produced += remaining;
399                            Some(Ok(batch.slice(0, remaining)))
400                        } else {
401                            self.produced += batch.num_rows();
402                            Some(Ok(batch))
403                        }
404                    } else {
405                        self.produced += batch.num_rows();
406                        Some(Ok(batch))
407                    };
408                    return Poll::Ready(ret);
409                }
410                Poll::Ready(Some(Err(e))) => {
411                    return Poll::Ready(Some(Err(e)));
412                }
413                Poll::Ready(None) => {
414                    // current merge stream is done, we can start polling the next one
415
416                    self.merge_stream.pop_front();
417                    continue;
418                }
419                Poll::Pending => {
420                    return Poll::Pending;
421                }
422            }
423        }
424        // if no output stream is available
425        Poll::Ready(None)
426    }
427
428    /// The core logic of merging sort multiple sorted ranges
429    ///
430    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
431    pub fn poll_next_inner(
432        mut self: Pin<&mut Self>,
433        cx: &mut Context<'_>,
434    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435        // first check and send out the merge result
436        match self.poll_result_stream(cx) {
437            Poll::Ready(None) => {
438                if self.is_terminated {
439                    return Poll::Ready(None);
440                }
441            }
442            x => return x,
443        };
444
445        // consume input stream
446        while !self.is_terminated {
447            // then we get a new RecordBatch from input stream
448            let new_input_rbs = match self.input.as_mut().poll_next(cx) {
449                Poll::Ready(Some(Ok(batch))) => {
450                    Some(split_batch_to_sorted_run(batch, &self.expression)?)
451                }
452                Poll::Ready(Some(Err(e))) => {
453                    return Poll::Ready(Some(Err(e)));
454                }
455                Poll::Ready(None) => {
456                    // input stream is done, we need to merge sort the remaining working set
457                    self.is_terminated = true;
458                    None
459                }
460                Poll::Pending => return Poll::Pending,
461            };
462
463            let Some(SortedRunSet {
464                runs_with_batch,
465                sort_column,
466            }) = new_input_rbs
467            else {
468                // input stream is done, we need to merge sort the remaining working set
469                self.build_sorted_stream()?;
470                self.start_new_merge_sort()?;
471                continue;
472            };
473            // The core logic to eargerly merge sort the working set
474
475            // compare with last_value to find boundary, then merge runs if needed
476
477            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
478            let mut last_remaining = None;
479            let mut run_iter = runs_with_batch.into_iter();
480            loop {
481                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
482                    break;
483                };
484                if sorted_rb.num_rows() == 0 {
485                    continue;
486                }
487                // determine if this batch is in current working range
488                let Some(cur_range) = run_info.get_time_range() else {
489                    internal_err!("Found NULL in time index column")?
490                };
491                let Some(working_range) = self.get_working_range() else {
492                    internal_err!("No working range found")?
493                };
494
495                // ensure the current batch is in the working range
496                if sort_column.options.unwrap_or_default().descending {
497                    if cur_range.end > working_range.end {
498                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
499                        #[cfg(debug_assertions)]
500                        self.check_subset_ranges(&cur_range);
501                        internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
502                    }
503                } else if cur_range.start < working_range.start {
504                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
505                    #[cfg(debug_assertions)]
506                    self.check_subset_ranges(&cur_range);
507                    internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
508                }
509
510                if cur_range.is_subset(&working_range) {
511                    // data still in range, can't merge sort yet
512                    // see if can concat entire sorted rb, merge sort need to wait
513                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
514                } else if let Some(intersection) = cur_range.intersection(&working_range) {
515                    // slice rb by intersection and concat it then merge sort
516                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
517                    let (offset, len) = find_slice_from_range(
518                        &SortColumn {
519                            values: cur_sort_column.clone(),
520                            options: sort_column.options,
521                        },
522                        &intersection,
523                    )?;
524
525                    if offset != 0 {
526                        internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
527                    }
528
529                    let sliced_rb = sorted_rb.slice(offset, len);
530
531                    // try to concat the sliced input batch to the current `in_progress` run
532                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
533                    // since no more sorted data in this range will come in now, build stream now
534                    self.build_sorted_stream()?;
535
536                    // since we are crossing working range, we need to merge sort the working set
537                    self.start_new_merge_sort()?;
538
539                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
540                    if r_len != 0 {
541                        // we have remaining data in this batch, put it back to input queue
542                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
543                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
544                        let new_run_info = SucRun {
545                            offset: run_info.offset + r_offset,
546                            len: r_len,
547                            first_val: new_first_val,
548                            last_val: run_info.last_val,
549                        };
550                        last_remaining = Some((remaining_rb, new_run_info));
551                    }
552                    // deal with remaining batch cross working range problem
553                    // i.e: this example require more slice, and we are currently at point A
554                    // |---1---|       |---3---|
555                    // |-------A--2------------|
556                    //  put the remaining batch back to iter and deal it in next loop
557                } else {
558                    // no overlap, we can merge sort the working set
559
560                    self.build_sorted_stream()?;
561                    self.start_new_merge_sort()?;
562
563                    // always put it back to input queue until some batch is in working range
564                    last_remaining = Some((sorted_rb, run_info));
565                }
566            }
567        }
568        // emit the merge result
569        self.poll_result_stream(cx)
570    }
571
572    fn push_batch(&mut self, batch: DfRecordBatch) {
573        self.in_progress.push(batch);
574    }
575
576    /// Try to concat the input batch to the current `in_progress` run
577    ///
578    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
579    fn try_concat_batch(
580        &mut self,
581        batch: DfRecordBatch,
582        run_info: &SucRun<Timestamp>,
583        opt: Option<SortOptions>,
584    ) -> datafusion_common::Result<()> {
585        let is_ok_to_concat =
586            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
587
588        if is_ok_to_concat {
589            self.push_batch(batch);
590            // next time we get input batch might still be ordered, so not build stream yet
591        } else {
592            // no more sorted data, build stream now
593            self.build_sorted_stream()?;
594            self.push_batch(batch);
595        }
596        self.last_value = run_info.last_val;
597        Ok(())
598    }
599
600    /// Get the current working range
601    fn get_working_range(&self) -> Option<TimeRange> {
602        self.all_avail_working_range
603            .get(self.working_idx)
604            .map(|(range, _)| *range)
605    }
606
607    /// Set current working range to the next working range
608    fn set_next_working_range(&mut self) {
609        self.working_idx += 1;
610    }
611
612    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
613    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
614        if self.in_progress.is_empty() {
615            return Ok(());
616        }
617        let data = std::mem::take(&mut self.in_progress);
618
619        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
620        self.sorted_input_runs.push(Box::pin(new_stream));
621        Ok(())
622    }
623
624    /// Start merging sort the current working set
625    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
626        if !self.in_progress.is_empty() {
627            return internal_err!("Starting a merge sort when in_progress is not empty")?;
628        }
629
630        self.set_next_working_range();
631
632        let streams = std::mem::take(&mut self.sorted_input_runs);
633        if streams.is_empty() {
634            return Ok(());
635        } else if streams.len() == 1 {
636            self.merge_stream
637                .push_back(streams.into_iter().next().unwrap());
638            return Ok(());
639        }
640
641        let fetch = self.remaining_fetch();
642        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
643            .register(&self.memory_pool);
644        self.merge_count += 1;
645        let lex_ordering = LexOrdering::new(vec![self.expression.clone()]);
646
647        let resulting_stream = StreamingMergeBuilder::new()
648            .with_streams(streams)
649            .with_schema(self.schema())
650            .with_expressions(&lex_ordering)
651            .with_metrics(self.metrics.clone())
652            .with_batch_size(self.batch_size)
653            .with_fetch(fetch)
654            .with_reservation(reservation)
655            .build()?;
656        self.merge_stream.push_back(resulting_stream);
657        // this working range is done, move to next working range
658        Ok(())
659    }
660
661    /// Remaining number of rows to fetch, if no fetch limit, return None
662    /// if fetch limit is reached, return Some(0)
663    fn remaining_fetch(&self) -> Option<usize> {
664        let total_now = self.produced;
665        self.fetch.map(|p| p.saturating_sub(total_now))
666    }
667}
668
669impl Stream for WindowedSortStream {
670    type Item = datafusion_common::Result<DfRecordBatch>;
671
672    fn poll_next(
673        mut self: Pin<&mut Self>,
674        cx: &mut Context<'_>,
675    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
676        let result = self.as_mut().poll_next_inner(cx);
677        self.metrics.record_poll(result)
678    }
679}
680
681impl RecordBatchStream for WindowedSortStream {
682    fn schema(&self) -> SchemaRef {
683        self.schema.clone()
684    }
685}
686
687/// split batch to sorted runs
688fn split_batch_to_sorted_run(
689    batch: DfRecordBatch,
690    expression: &PhysicalSortExpr,
691) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
692    // split input rb to sorted runs
693    let sort_column = expression.evaluate_to_sort_column(&batch)?;
694    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
695    if let Some(run) = sorted_runs_offset.first()
696        && sorted_runs_offset.len() == 1
697    {
698        if !(run.offset == 0 && run.len == batch.num_rows()) {
699            internal_err!(
700                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
701                run.offset,
702                run.len,
703                batch.num_rows()
704            )?;
705        }
706        // input rb is already sorted, we can emit it directly
707        Ok(SortedRunSet {
708            runs_with_batch: vec![(batch, run.clone())],
709            sort_column,
710        })
711    } else {
712        // those slice should be zero copy, so supposedly no new reservation needed
713        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
714        for run in sorted_runs_offset {
715            if run.offset + run.len > batch.num_rows() {
716                internal_err!(
717                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
718                    run.offset,
719                    run.len,
720                    batch.num_rows()
721                )?;
722            }
723            let new_rb = batch.slice(run.offset, run.len);
724            ret.push((new_rb, run));
725        }
726        Ok(SortedRunSet {
727            runs_with_batch: ret,
728            sort_column,
729        })
730    }
731}
732
733/// Downcast a temporal array to a specific type
734///
735/// usage similar to `downcast_primitive!` in `arrow-array` crate
736#[macro_export]
737macro_rules! downcast_ts_array {
738    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
739    {
740        match $data_type {
741            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
742                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
743            }
744            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
745                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
746            }
747            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
748                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
749            }
750            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
751                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
752            }
753            $($p => $fallback,)*
754        }
755    };
756}
757
758/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
759///
760/// Return the offset and length of the slice
761fn find_slice_from_range(
762    sort_column: &SortColumn,
763    range: &TimeRange,
764) -> datafusion_common::Result<(usize, usize)> {
765    let ty = sort_column.values.data_type();
766    let time_unit = if let DataType::Timestamp(unit, _) = ty {
767        unit
768    } else {
769        return Err(DataFusionError::Internal(format!(
770            "Unsupported sort column type: {}",
771            sort_column.values.data_type()
772        )));
773    };
774    let array = &sort_column.values;
775    let opt = &sort_column.options.unwrap_or_default();
776    let descending = opt.descending;
777
778    let typed_sorted_range = [range.start, range.end]
779        .iter()
780        .map(|t| {
781            t.convert_to(time_unit.into())
782                .ok_or_else(|| {
783                    DataFusionError::Internal(format!(
784                        "Failed to convert timestamp from {:?} to {:?}",
785                        t.unit(),
786                        time_unit
787                    ))
788                })
789                .and_then(|typed_ts| {
790                    let value = Value::Timestamp(typed_ts);
791                    value
792                        .try_to_scalar_value(&value.data_type())
793                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
794                })
795        })
796        .try_collect::<_, Vec<_>, _>()?;
797
798    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
799
800    // get slice that in which all data that `min_val<=data<max_val`
801    let (start, end) = if descending {
802        // note that `data < max_val`
803        // i,e, for max_val = 4, array = [5,3,2] should be start=1
804        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
805        let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
806        // min_val = 1, array = [3, 2, 1, 0], end = 3
807        // min_val = 1, array = [3, 2, 0], end = 2
808        let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
809        (start, end)
810    } else {
811        // min_val = 1, array = [1, 2, 3], start = 0
812        // min_val = 1, array = [0, 2, 3], start = 1
813        let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
814        // max_val = 3, array = [1, 3, 4], end = 1
815        // max_val = 3, array = [1, 2, 4], end = 2
816        let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
817        (start, end)
818    };
819
820    Ok((start, end - start))
821}
822
823/// Get an iterator from a primitive array.
824///
825/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
826#[macro_export]
827macro_rules! array_iter_helper {
828    ($t:ty, $unit:expr, $arr:expr) => {{
829        let typed = $arr
830            .as_any()
831            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
832            .unwrap();
833        let iter = typed.iter().enumerate();
834        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
835    }};
836}
837
838/// Compare with options, note None is considered as NULL here
839///
840/// default to null first
841fn cmp_with_opts<T: Ord>(
842    a: &Option<T>,
843    b: &Option<T>,
844    opt: &Option<SortOptions>,
845) -> std::cmp::Ordering {
846    let opt = opt.unwrap_or_default();
847
848    if let (Some(a), Some(b)) = (a, b) {
849        if opt.descending {
850            b.cmp(a)
851        } else {
852            a.cmp(b)
853        }
854    } else if opt.nulls_first {
855        // now we know at leatst one of them is None
856        // in rust None < Some(_)
857        a.cmp(b)
858    } else {
859        match (a, b) {
860            (Some(a), Some(b)) => a.cmp(b),
861            (Some(_), None) => std::cmp::Ordering::Less,
862            (None, Some(_)) => std::cmp::Ordering::Greater,
863            (None, None) => std::cmp::Ordering::Equal,
864        }
865    }
866}
867
868#[derive(Debug, Clone)]
869struct SortedRunSet<N: Ord> {
870    /// sorted runs with batch corresponding to them
871    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
872    /// sorted column from eval sorting expr
873    sort_column: SortColumn,
874}
875
876/// A struct to represent a successive run in the input iterator
877#[derive(Debug, Clone, PartialEq)]
878struct SucRun<N: Ord> {
879    /// offset of the first element in the run
880    offset: usize,
881    /// length of the run
882    len: usize,
883    /// first non-null value in the run
884    first_val: Option<N>,
885    /// last non-null value in the run
886    last_val: Option<N>,
887}
888
889impl SucRun<Timestamp> {
890    /// Get the time range of the run, which is [min_val, max_val + 1)
891    fn get_time_range(&self) -> Option<TimeRange> {
892        let start = self.first_val.min(self.last_val);
893        let end = self
894            .first_val
895            .max(self.last_val)
896            .map(|i| Timestamp::new(i.value() + 1, i.unit()));
897        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
898    }
899}
900
901/// find all successive runs in the input iterator
902fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
903    iter: T,
904    sort_opts: &Option<SortOptions>,
905) -> Vec<SucRun<N>> {
906    let mut runs = Vec::new();
907    let mut last_value = None;
908    let mut iter_len = None;
909
910    let mut last_offset = 0;
911    let mut first_val: Option<N> = None;
912    let mut last_val: Option<N> = None;
913
914    for (idx, t) in iter {
915        if let Some(last_value) = &last_value {
916            if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
917                // we found a boundary
918                let len = idx - last_offset;
919                let run = SucRun {
920                    offset: last_offset,
921                    len,
922                    first_val,
923                    last_val,
924                };
925                runs.push(run);
926                first_val = None;
927                last_val = None;
928
929                last_offset = idx;
930            }
931        }
932        last_value = Some(t);
933        if let Some(t) = t {
934            first_val = first_val.or(Some(t));
935            last_val = Some(t).or(last_val);
936        }
937        iter_len = Some(idx);
938    }
939    let run = SucRun {
940        offset: last_offset,
941        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
942        first_val,
943        last_val,
944    };
945    runs.push(run);
946
947    runs
948}
949
950/// return a list of non-overlapping (offset, length) which represent sorted runs, and
951/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
952/// Returned runs will be as long as possible, and will not overlap with each other
953fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
954    let ty = sort_column.values.data_type();
955    if let DataType::Timestamp(unit, _) = ty {
956        let array = &sort_column.values;
957        let iter = downcast_ts_array!(
958            array.data_type() => (array_iter_helper, array),
959            _ => internal_err!("Unsupported sort column type: {ty}")?
960        );
961
962        let raw = find_successive_runs(iter, &sort_column.options);
963        let ts_runs = raw
964            .into_iter()
965            .map(|run| SucRun {
966                offset: run.offset,
967                len: run.len,
968                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
969                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
970            })
971            .collect_vec();
972        Ok(ts_runs)
973    } else {
974        Err(DataFusionError::Internal(format!(
975            "Unsupported sort column type: {ty}"
976        )))
977    }
978}
979
980/// Left(`start`) inclusive right(`end`) exclusive,
981///
982/// This is just tuple with extra methods
983#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
984struct TimeRange {
985    start: Timestamp,
986    end: Timestamp,
987}
988
989impl From<&PartitionRange> for TimeRange {
990    fn from(range: &PartitionRange) -> Self {
991        Self::new(range.start, range.end)
992    }
993}
994
995impl From<(Timestamp, Timestamp)> for TimeRange {
996    fn from(range: (Timestamp, Timestamp)) -> Self {
997        Self::new(range.0, range.1)
998    }
999}
1000
1001impl From<&(Timestamp, Timestamp)> for TimeRange {
1002    fn from(range: &(Timestamp, Timestamp)) -> Self {
1003        Self::new(range.0, range.1)
1004    }
1005}
1006
1007impl TimeRange {
1008    /// Create a new TimeRange, if start is greater than end, swap them
1009    fn new(start: Timestamp, end: Timestamp) -> Self {
1010        if start > end {
1011            Self {
1012                start: end,
1013                end: start,
1014            }
1015        } else {
1016            Self { start, end }
1017        }
1018    }
1019
1020    fn is_subset(&self, other: &Self) -> bool {
1021        self.start >= other.start && self.end <= other.end
1022    }
1023
1024    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1025    fn is_overlapping(&self, other: &Self) -> bool {
1026        !(self.start >= other.end || self.end <= other.start)
1027    }
1028
1029    fn intersection(&self, other: &Self) -> Option<Self> {
1030        if self.is_overlapping(other) {
1031            Some(Self::new(
1032                self.start.max(other.start),
1033                self.end.min(other.end),
1034            ))
1035        } else {
1036            None
1037        }
1038    }
1039
1040    fn difference(&self, other: &Self) -> Vec<Self> {
1041        if !self.is_overlapping(other) {
1042            vec![*self]
1043        } else {
1044            let mut ret = Vec::new();
1045            if self.start < other.start && self.end > other.end {
1046                ret.push(Self::new(self.start, other.start));
1047                ret.push(Self::new(other.end, self.end));
1048            } else if self.start < other.start {
1049                ret.push(Self::new(self.start, other.start));
1050            } else if self.end > other.end {
1051                ret.push(Self::new(other.end, self.end));
1052            }
1053            ret
1054        }
1055    }
1056}
1057
1058/// split input range by `split_by` range to one, two or three parts.
1059fn split_range_by(
1060    input_range: &TimeRange,
1061    input_parts: &[usize],
1062    split_by: &TimeRange,
1063    split_idx: usize,
1064) -> Vec<Action> {
1065    let mut ret = Vec::new();
1066    if input_range.is_overlapping(split_by) {
1067        let input_parts = input_parts.to_vec();
1068        let new_parts = {
1069            let mut new_parts = input_parts.clone();
1070            new_parts.push(split_idx);
1071            new_parts
1072        };
1073
1074        ret.push(Action::Pop(*input_range));
1075        if let Some(intersection) = input_range.intersection(split_by) {
1076            ret.push(Action::Push(intersection, new_parts.clone()));
1077        }
1078        for diff in input_range.difference(split_by) {
1079            ret.push(Action::Push(diff, input_parts.clone()));
1080        }
1081    }
1082    ret
1083}
1084
1085#[derive(Debug, Clone, PartialEq, Eq)]
1086enum Action {
1087    Pop(TimeRange),
1088    Push(TimeRange, Vec<usize>),
1089}
1090
1091/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1092///
1093/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1094///
1095/// hence we can merge sort current working range once that happens
1096///
1097/// if `descending` is true, the working ranges will be in descending order
1098fn compute_all_working_ranges(
1099    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1100    descending: bool,
1101) -> Vec<(TimeRange, BTreeSet<usize>)> {
1102    let mut ret = Vec::new();
1103    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1104    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1105        Box::new(overlap_counts.iter().rev()) as _
1106    } else {
1107        Box::new(overlap_counts.iter()) as _
1108    };
1109    for (range, set) in overlap_iter {
1110        match &mut cur_range_set {
1111            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1112            Some((working_range, working_set)) => {
1113                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1114                // to be read before merge sorting), and `working_set` have >1 count
1115                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1116                // so that merge sort is possible
1117                let need_expand = {
1118                    let last_part = working_set.last();
1119                    let inter: BTreeSet<usize> = working_set
1120                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1121                        .cloned()
1122                        .collect();
1123                    if let Some(one) = inter.first()
1124                        && inter.len() == 1
1125                        && Some(one) == last_part
1126                    {
1127                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1128                        if set.iter().all(|p| Some(p) >= last_part) {
1129                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1130                            false
1131                        } else {
1132                            // elsewise, we need to expand working range to include next overlap range
1133                            true
1134                        }
1135                    } else if inter.is_empty() {
1136                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1137                        false
1138                    } else {
1139                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1140                        true
1141                    }
1142                };
1143
1144                if need_expand {
1145                    if descending {
1146                        working_range.start = range.start;
1147                    } else {
1148                        working_range.end = range.end;
1149                    }
1150                    working_set.extend(set.iter().cloned());
1151                } else {
1152                    ret.push((*working_range, std::mem::take(working_set)));
1153                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1154                }
1155            }
1156        }
1157    }
1158
1159    if let Some(cur_range_set) = cur_range_set {
1160        ret.push(cur_range_set)
1161    }
1162
1163    ret
1164}
1165
1166/// return a map of non-overlapping ranges and their corresponding index
1167/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1168fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1169    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1170    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1171    for (idx, range) in ranges.iter().enumerate() {
1172        let key: TimeRange = (range.start, range.end).into();
1173        let mut actions = Vec::new();
1174        let mut untouched = vec![key];
1175        // create a forward and backward iterator to find all overlapping ranges
1176        // given that tuple is sorted in lexicographical order and promise to not overlap,
1177        // since range is sorted that way, we can stop when we find a non-overlapping range
1178        let forward_iter = ret
1179            .range(key..)
1180            .take_while(|(range, _)| range.is_overlapping(&key));
1181        let backward_iter = ret
1182            .range(..key)
1183            .rev()
1184            .take_while(|(range, _)| range.is_overlapping(&key));
1185
1186        for (range, parts) in forward_iter.chain(backward_iter) {
1187            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1188            let act = split_range_by(range, parts, &key, idx);
1189            actions.extend(act.into_iter());
1190        }
1191
1192        for action in actions {
1193            match action {
1194                Action::Pop(range) => {
1195                    ret.remove(&range);
1196                }
1197                Action::Push(range, parts) => {
1198                    ret.insert(range, parts);
1199                }
1200            }
1201        }
1202
1203        // insert untouched ranges
1204        for range in untouched {
1205            ret.insert(range, vec![idx]);
1206        }
1207    }
1208    ret
1209}
1210
1211/// Get timestamp from array at offset
1212fn get_timestamp_from_idx(
1213    array: &ArrayRef,
1214    offset: usize,
1215) -> datafusion_common::Result<Option<Timestamp>> {
1216    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1217        unit
1218    } else {
1219        return Err(DataFusionError::Internal(format!(
1220            "Unsupported sort column type: {}",
1221            array.data_type()
1222        )));
1223    };
1224    let ty = array.data_type();
1225    let array = array.slice(offset, 1);
1226    let mut iter = downcast_ts_array!(
1227        array.data_type() => (array_iter_helper, array),
1228        _ => internal_err!("Unsupported sort column type: {ty}")?
1229    );
1230    let (_idx, val) = iter.next().ok_or_else(|| {
1231        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1232    })?;
1233    let val = if let Some(val) = val {
1234        val
1235    } else {
1236        return Ok(None);
1237    };
1238    let gt_timestamp = Timestamp::new(val, time_unit.into());
1239    Ok(Some(gt_timestamp))
1240}
1241
1242#[cfg(test)]
1243mod test {
1244    use std::io::Write;
1245    use std::sync::Arc;
1246
1247    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1248    use arrow::compute::concat_batches;
1249    use arrow::json::ArrayWriter;
1250    use arrow_schema::{Field, Schema, TimeUnit};
1251    use futures::StreamExt;
1252    use pretty_assertions::assert_eq;
1253    use serde_json::json;
1254
1255    use super::*;
1256    use crate::test_util::{new_ts_array, MockInputExec};
1257
1258    #[test]
1259    fn test_overlapping() {
1260        let testcases = [
1261            (
1262                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1263                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1264                false,
1265            ),
1266            (
1267                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1268                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1269                true,
1270            ),
1271            (
1272                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1273                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1274                true,
1275            ),
1276            (
1277                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1278                (
1279                    Timestamp::new_millisecond(1000),
1280                    Timestamp::new_millisecond(1002),
1281                ),
1282                true,
1283            ),
1284            (
1285                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1286                (
1287                    Timestamp::new_millisecond(1001),
1288                    Timestamp::new_millisecond(1002),
1289                ),
1290                false,
1291            ),
1292            (
1293                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1294                (
1295                    Timestamp::new_millisecond(1002),
1296                    Timestamp::new_millisecond(1003),
1297                ),
1298                false,
1299            ),
1300        ];
1301
1302        for (range1, range2, expected) in testcases.iter() {
1303            assert_eq!(
1304                TimeRange::from(range1).is_overlapping(&range2.into()),
1305                *expected,
1306                "range1: {:?}, range2: {:?}",
1307                range1,
1308                range2
1309            );
1310        }
1311    }
1312
1313    #[test]
1314    fn test_split() {
1315        let testcases = [
1316            // no split
1317            (
1318                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1319                vec![0],
1320                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1321                1,
1322                vec![],
1323            ),
1324            // one part
1325            (
1326                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1327                vec![0],
1328                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1329                1,
1330                vec![
1331                    Action::Pop(
1332                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1333                    ),
1334                    Action::Push(
1335                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1336                        vec![0, 1],
1337                    ),
1338                ],
1339            ),
1340            (
1341                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1342                vec![0],
1343                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1344                1,
1345                vec![
1346                    Action::Pop(
1347                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1348                    ),
1349                    Action::Push(
1350                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1351                        vec![0, 1],
1352                    ),
1353                ],
1354            ),
1355            (
1356                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1357                vec![0],
1358                (
1359                    Timestamp::new_millisecond(1000),
1360                    Timestamp::new_millisecond(1002),
1361                ),
1362                1,
1363                vec![
1364                    Action::Pop(
1365                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1366                    ),
1367                    Action::Push(
1368                        (
1369                            Timestamp::new_millisecond(1000),
1370                            Timestamp::new_millisecond(1001),
1371                        )
1372                            .into(),
1373                        vec![0, 1],
1374                    ),
1375                ],
1376            ),
1377            // two part
1378            (
1379                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1380                vec![0],
1381                (
1382                    Timestamp::new_millisecond(1001),
1383                    Timestamp::new_millisecond(1002),
1384                ),
1385                1,
1386                vec![
1387                    Action::Pop(
1388                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1389                    ),
1390                    Action::Push(
1391                        (
1392                            Timestamp::new_millisecond(1001),
1393                            Timestamp::new_millisecond(1002),
1394                        )
1395                            .into(),
1396                        vec![0, 1],
1397                    ),
1398                    Action::Push(
1399                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1400                        vec![0],
1401                    ),
1402                ],
1403            ),
1404            // three part
1405            (
1406                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1407                vec![0],
1408                (
1409                    Timestamp::new_millisecond(1001),
1410                    Timestamp::new_millisecond(1002),
1411                ),
1412                1,
1413                vec![
1414                    Action::Pop(
1415                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1416                    ),
1417                    Action::Push(
1418                        (
1419                            Timestamp::new_millisecond(1001),
1420                            Timestamp::new_millisecond(1002),
1421                        )
1422                            .into(),
1423                        vec![0, 1],
1424                    ),
1425                    Action::Push(
1426                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1427                        vec![0],
1428                    ),
1429                    Action::Push(
1430                        (
1431                            Timestamp::new_millisecond(1002),
1432                            Timestamp::new_millisecond(1004),
1433                        )
1434                            .into(),
1435                        vec![0],
1436                    ),
1437                ],
1438            ),
1439        ];
1440
1441        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1442            assert_eq!(
1443                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1444                *expected,
1445                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1446                range,
1447                parts,
1448                split_by,
1449                split_idx
1450            );
1451        }
1452    }
1453
1454    #[test]
1455    fn test_compute_working_ranges_rev() {
1456        let testcases = vec![
1457            (
1458                BTreeMap::from([(
1459                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1460                    vec![0],
1461                )]),
1462                vec![(
1463                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1464                    BTreeSet::from([0]),
1465                )],
1466            ),
1467            (
1468                BTreeMap::from([(
1469                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1470                    vec![0, 1],
1471                )]),
1472                vec![(
1473                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1474                    BTreeSet::from([0, 1]),
1475                )],
1476            ),
1477            (
1478                BTreeMap::from([
1479                    (
1480                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1481                        vec![0],
1482                    ),
1483                    (
1484                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1485                        vec![0, 1],
1486                    ),
1487                ]),
1488                vec![
1489                    (
1490                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1491                        BTreeSet::from([0]),
1492                    ),
1493                    (
1494                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1495                        BTreeSet::from([0, 1]),
1496                    ),
1497                ],
1498            ),
1499            (
1500                BTreeMap::from([
1501                    (
1502                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1503                        vec![0, 1],
1504                    ),
1505                    (
1506                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1507                        vec![1],
1508                    ),
1509                ]),
1510                vec![
1511                    (
1512                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1513                        BTreeSet::from([0, 1]),
1514                    ),
1515                    (
1516                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1517                        BTreeSet::from([1]),
1518                    ),
1519                ],
1520            ),
1521            (
1522                BTreeMap::from([
1523                    (
1524                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1525                        vec![0],
1526                    ),
1527                    (
1528                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1529                        vec![0, 1],
1530                    ),
1531                    (
1532                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1533                        vec![1],
1534                    ),
1535                ]),
1536                vec![
1537                    (
1538                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1539                        BTreeSet::from([0]),
1540                    ),
1541                    (
1542                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1543                        BTreeSet::from([0, 1]),
1544                    ),
1545                    (
1546                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1547                        BTreeSet::from([1]),
1548                    ),
1549                ],
1550            ),
1551            (
1552                BTreeMap::from([
1553                    (
1554                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1555                        vec![0, 2],
1556                    ),
1557                    (
1558                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1559                        vec![0, 1, 2],
1560                    ),
1561                    (
1562                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1563                        vec![1, 2],
1564                    ),
1565                ]),
1566                vec![(
1567                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1568                    BTreeSet::from([0, 1, 2]),
1569                )],
1570            ),
1571            (
1572                BTreeMap::from([
1573                    (
1574                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1575                        vec![0, 2],
1576                    ),
1577                    (
1578                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1579                        vec![1, 2],
1580                    ),
1581                ]),
1582                vec![(
1583                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1584                    BTreeSet::from([0, 1, 2]),
1585                )],
1586            ),
1587            (
1588                BTreeMap::from([
1589                    (
1590                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1591                        vec![0, 1],
1592                    ),
1593                    (
1594                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1595                        vec![0, 1, 2],
1596                    ),
1597                    (
1598                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1599                        vec![1, 2],
1600                    ),
1601                ]),
1602                vec![(
1603                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1604                    BTreeSet::from([0, 1, 2]),
1605                )],
1606            ),
1607            (
1608                BTreeMap::from([
1609                    (
1610                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1611                        vec![0, 1],
1612                    ),
1613                    (
1614                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1615                        vec![1, 2],
1616                    ),
1617                ]),
1618                vec![
1619                    (
1620                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1621                        BTreeSet::from([0, 1]),
1622                    ),
1623                    (
1624                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1625                        BTreeSet::from([1, 2]),
1626                    ),
1627                ],
1628            ),
1629            // non-overlapping
1630            (
1631                BTreeMap::from([
1632                    (
1633                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1634                        vec![0],
1635                    ),
1636                    (
1637                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1638                        vec![1, 2],
1639                    ),
1640                ]),
1641                vec![
1642                    (
1643                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1644                        BTreeSet::from([0]),
1645                    ),
1646                    (
1647                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1648                        BTreeSet::from([1, 2]),
1649                    ),
1650                ],
1651            ),
1652        ];
1653
1654        for (input, expected) in testcases {
1655            let expected = expected
1656                .into_iter()
1657                .map(|(r, s)| (r.into(), s))
1658                .collect_vec();
1659            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1660            assert_eq!(
1661                compute_all_working_ranges(&input, true),
1662                expected,
1663                "input: {:?}",
1664                input
1665            );
1666        }
1667    }
1668
1669    #[test]
1670    fn test_compute_working_ranges() {
1671        let testcases = vec![
1672            (
1673                BTreeMap::from([(
1674                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1675                    vec![0],
1676                )]),
1677                vec![(
1678                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1679                    BTreeSet::from([0]),
1680                )],
1681            ),
1682            (
1683                BTreeMap::from([(
1684                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1685                    vec![0, 1],
1686                )]),
1687                vec![(
1688                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1689                    BTreeSet::from([0, 1]),
1690                )],
1691            ),
1692            (
1693                BTreeMap::from([
1694                    (
1695                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1696                        vec![0, 1],
1697                    ),
1698                    (
1699                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1700                        vec![1],
1701                    ),
1702                ]),
1703                vec![
1704                    (
1705                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1706                        BTreeSet::from([0, 1]),
1707                    ),
1708                    (
1709                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1710                        BTreeSet::from([1]),
1711                    ),
1712                ],
1713            ),
1714            (
1715                BTreeMap::from([
1716                    (
1717                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1718                        vec![0],
1719                    ),
1720                    (
1721                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1722                        vec![0, 1],
1723                    ),
1724                ]),
1725                vec![
1726                    (
1727                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1728                        BTreeSet::from([0]),
1729                    ),
1730                    (
1731                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1732                        BTreeSet::from([0, 1]),
1733                    ),
1734                ],
1735            ),
1736            // test if only one count in working set get it's own working range
1737            (
1738                BTreeMap::from([
1739                    (
1740                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1741                        vec![0],
1742                    ),
1743                    (
1744                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1745                        vec![0, 1],
1746                    ),
1747                    (
1748                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1749                        vec![1],
1750                    ),
1751                ]),
1752                vec![
1753                    (
1754                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1755                        BTreeSet::from([0]),
1756                    ),
1757                    (
1758                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1759                        BTreeSet::from([0, 1]),
1760                    ),
1761                    (
1762                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1763                        BTreeSet::from([1]),
1764                    ),
1765                ],
1766            ),
1767            (
1768                BTreeMap::from([
1769                    (
1770                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1771                        vec![0, 2],
1772                    ),
1773                    (
1774                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1775                        vec![0, 1, 2],
1776                    ),
1777                    (
1778                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1779                        vec![1, 2],
1780                    ),
1781                ]),
1782                vec![(
1783                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1784                    BTreeSet::from([0, 1, 2]),
1785                )],
1786            ),
1787            (
1788                BTreeMap::from([
1789                    (
1790                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1791                        vec![0, 2],
1792                    ),
1793                    (
1794                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1795                        vec![1, 2],
1796                    ),
1797                ]),
1798                vec![(
1799                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1800                    BTreeSet::from([0, 1, 2]),
1801                )],
1802            ),
1803            (
1804                BTreeMap::from([
1805                    (
1806                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1807                        vec![0, 1],
1808                    ),
1809                    (
1810                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1811                        vec![0, 1, 2],
1812                    ),
1813                    (
1814                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1815                        vec![1, 2],
1816                    ),
1817                ]),
1818                vec![(
1819                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1820                    BTreeSet::from([0, 1, 2]),
1821                )],
1822            ),
1823            (
1824                BTreeMap::from([
1825                    (
1826                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1827                        vec![0, 1],
1828                    ),
1829                    (
1830                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1831                        vec![1, 2],
1832                    ),
1833                ]),
1834                vec![
1835                    (
1836                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1837                        BTreeSet::from([0, 1]),
1838                    ),
1839                    (
1840                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1841                        BTreeSet::from([1, 2]),
1842                    ),
1843                ],
1844            ),
1845            // non-overlapping
1846            (
1847                BTreeMap::from([
1848                    (
1849                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1850                        vec![0, 1],
1851                    ),
1852                    (
1853                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1854                        vec![2],
1855                    ),
1856                ]),
1857                vec![
1858                    (
1859                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1860                        BTreeSet::from([0, 1]),
1861                    ),
1862                    (
1863                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1864                        BTreeSet::from([2]),
1865                    ),
1866                ],
1867            ),
1868        ];
1869
1870        for (input, expected) in testcases {
1871            let expected = expected
1872                .into_iter()
1873                .map(|(r, s)| (r.into(), s))
1874                .collect_vec();
1875            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1876            assert_eq!(
1877                compute_all_working_ranges(&input, false),
1878                expected,
1879                "input: {:?}",
1880                input
1881            );
1882        }
1883    }
1884
1885    #[test]
1886    fn test_split_overlap_range() {
1887        let testcases = vec![
1888            // simple one range
1889            (
1890                vec![PartitionRange {
1891                    start: Timestamp::new_second(1),
1892                    end: Timestamp::new_second(2),
1893                    num_rows: 2,
1894                    identifier: 0,
1895                }],
1896                BTreeMap::from_iter(
1897                    vec![(
1898                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1899                        vec![0],
1900                    )]
1901                    .into_iter(),
1902                ),
1903            ),
1904            // two overlapping range
1905            (
1906                vec![
1907                    PartitionRange {
1908                        start: Timestamp::new_second(1),
1909                        end: Timestamp::new_second(2),
1910                        num_rows: 2,
1911                        identifier: 0,
1912                    },
1913                    PartitionRange {
1914                        start: Timestamp::new_second(1),
1915                        end: Timestamp::new_second(2),
1916                        num_rows: 2,
1917                        identifier: 1,
1918                    },
1919                ],
1920                BTreeMap::from_iter(
1921                    vec![(
1922                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1923                        vec![0, 1],
1924                    )]
1925                    .into_iter(),
1926                ),
1927            ),
1928            (
1929                vec![
1930                    PartitionRange {
1931                        start: Timestamp::new_second(1),
1932                        end: Timestamp::new_second(3),
1933                        num_rows: 2,
1934                        identifier: 0,
1935                    },
1936                    PartitionRange {
1937                        start: Timestamp::new_second(2),
1938                        end: Timestamp::new_second(4),
1939                        num_rows: 2,
1940                        identifier: 1,
1941                    },
1942                ],
1943                BTreeMap::from_iter(
1944                    vec![
1945                        (
1946                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1947                            vec![0],
1948                        ),
1949                        (
1950                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1951                            vec![0, 1],
1952                        ),
1953                        (
1954                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1955                            vec![1],
1956                        ),
1957                    ]
1958                    .into_iter(),
1959                ),
1960            ),
1961            // three or more overlapping range
1962            (
1963                vec![
1964                    PartitionRange {
1965                        start: Timestamp::new_second(1),
1966                        end: Timestamp::new_second(3),
1967                        num_rows: 2,
1968                        identifier: 0,
1969                    },
1970                    PartitionRange {
1971                        start: Timestamp::new_second(2),
1972                        end: Timestamp::new_second(4),
1973                        num_rows: 2,
1974                        identifier: 1,
1975                    },
1976                    PartitionRange {
1977                        start: Timestamp::new_second(1),
1978                        end: Timestamp::new_second(4),
1979                        num_rows: 2,
1980                        identifier: 2,
1981                    },
1982                ],
1983                BTreeMap::from_iter(
1984                    vec![
1985                        (
1986                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1987                            vec![0, 2],
1988                        ),
1989                        (
1990                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1991                            vec![0, 1, 2],
1992                        ),
1993                        (
1994                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1995                            vec![1, 2],
1996                        ),
1997                    ]
1998                    .into_iter(),
1999                ),
2000            ),
2001            (
2002                vec![
2003                    PartitionRange {
2004                        start: Timestamp::new_second(1),
2005                        end: Timestamp::new_second(3),
2006                        num_rows: 2,
2007                        identifier: 0,
2008                    },
2009                    PartitionRange {
2010                        start: Timestamp::new_second(1),
2011                        end: Timestamp::new_second(4),
2012                        num_rows: 2,
2013                        identifier: 1,
2014                    },
2015                    PartitionRange {
2016                        start: Timestamp::new_second(2),
2017                        end: Timestamp::new_second(4),
2018                        num_rows: 2,
2019                        identifier: 2,
2020                    },
2021                ],
2022                BTreeMap::from_iter(
2023                    vec![
2024                        (
2025                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2026                            vec![0, 1],
2027                        ),
2028                        (
2029                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2030                            vec![0, 1, 2],
2031                        ),
2032                        (
2033                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2034                            vec![1, 2],
2035                        ),
2036                    ]
2037                    .into_iter(),
2038                ),
2039            ),
2040        ];
2041
2042        for (input, expected) in testcases {
2043            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2044            assert_eq!(split_overlapping_ranges(&input), expected);
2045        }
2046    }
2047
2048    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2049        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2050            Self {
2051                offset: offset as usize,
2052                len: len as usize,
2053                first_val: min_val,
2054                last_val: max_val,
2055            }
2056        }
2057    }
2058
2059    #[test]
2060    fn test_find_successive_runs() {
2061        let testcases = vec![
2062            (
2063                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2064                Some(SortOptions {
2065                    descending: false,
2066                    nulls_first: false,
2067                }),
2068                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2069            ),
2070            (
2071                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2072                Some(SortOptions {
2073                    descending: false,
2074                    nulls_first: false,
2075                }),
2076                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2077            ),
2078            (
2079                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2080                Some(SortOptions {
2081                    descending: false,
2082                    nulls_first: false,
2083                }),
2084                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2085            ),
2086            (
2087                vec![Some(1), Some(2), Some(1), Some(3)],
2088                Some(SortOptions {
2089                    descending: false,
2090                    nulls_first: false,
2091                }),
2092                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2093            ),
2094            (
2095                vec![Some(1), Some(2), Some(1), Some(3)],
2096                Some(SortOptions {
2097                    descending: true,
2098                    nulls_first: false,
2099                }),
2100                vec![
2101                    (0, 1, Some(1), Some(1)),
2102                    (1, 2, Some(2), Some(1)),
2103                    (3, 1, Some(3), Some(3)),
2104                ],
2105            ),
2106            (
2107                vec![Some(1), Some(2), None, Some(3)],
2108                Some(SortOptions {
2109                    descending: false,
2110                    nulls_first: true,
2111                }),
2112                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2113            ),
2114            (
2115                vec![Some(1), Some(2), None, Some(3)],
2116                Some(SortOptions {
2117                    descending: false,
2118                    nulls_first: false,
2119                }),
2120                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2121            ),
2122            (
2123                vec![Some(2), Some(1), None, Some(3)],
2124                Some(SortOptions {
2125                    descending: true,
2126                    nulls_first: true,
2127                }),
2128                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2129            ),
2130            (
2131                vec![],
2132                Some(SortOptions {
2133                    descending: false,
2134                    nulls_first: true,
2135                }),
2136                vec![(0, 0, None, None)],
2137            ),
2138            (
2139                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2140                Some(SortOptions {
2141                    descending: true,
2142                    nulls_first: true,
2143                }),
2144                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2145            ),
2146            (
2147                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2148                Some(SortOptions {
2149                    descending: true,
2150                    nulls_first: false,
2151                }),
2152                vec![
2153                    (0, 2, None, None),
2154                    (2, 3, Some(2), Some(1)),
2155                    (5, 2, Some(5), Some(4)),
2156                ],
2157            ),
2158        ];
2159        for (input, sort_opts, expected) in testcases {
2160            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2161            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2162            assert_eq!(
2163                ret, expected,
2164                "input: {:?}, opt: {:?},expected: {:?}",
2165                input, sort_opts, expected
2166            );
2167        }
2168    }
2169
2170    #[test]
2171    fn test_cmp_with_opts() {
2172        let testcases = vec![
2173            (
2174                Some(1),
2175                Some(2),
2176                Some(SortOptions {
2177                    descending: false,
2178                    nulls_first: false,
2179                }),
2180                std::cmp::Ordering::Less,
2181            ),
2182            (
2183                Some(1),
2184                Some(2),
2185                Some(SortOptions {
2186                    descending: true,
2187                    nulls_first: false,
2188                }),
2189                std::cmp::Ordering::Greater,
2190            ),
2191            (
2192                Some(1),
2193                None,
2194                Some(SortOptions {
2195                    descending: false,
2196                    nulls_first: true,
2197                }),
2198                std::cmp::Ordering::Greater,
2199            ),
2200            (
2201                Some(1),
2202                None,
2203                Some(SortOptions {
2204                    descending: true,
2205                    nulls_first: true,
2206                }),
2207                std::cmp::Ordering::Greater,
2208            ),
2209            (
2210                Some(1),
2211                None,
2212                Some(SortOptions {
2213                    descending: true,
2214                    nulls_first: false,
2215                }),
2216                std::cmp::Ordering::Less,
2217            ),
2218            (
2219                Some(1),
2220                None,
2221                Some(SortOptions {
2222                    descending: false,
2223                    nulls_first: false,
2224                }),
2225                std::cmp::Ordering::Less,
2226            ),
2227            (
2228                None,
2229                None,
2230                Some(SortOptions {
2231                    descending: true,
2232                    nulls_first: true,
2233                }),
2234                std::cmp::Ordering::Equal,
2235            ),
2236            (
2237                None,
2238                None,
2239                Some(SortOptions {
2240                    descending: false,
2241                    nulls_first: true,
2242                }),
2243                std::cmp::Ordering::Equal,
2244            ),
2245            (
2246                None,
2247                None,
2248                Some(SortOptions {
2249                    descending: true,
2250                    nulls_first: false,
2251                }),
2252                std::cmp::Ordering::Equal,
2253            ),
2254            (
2255                None,
2256                None,
2257                Some(SortOptions {
2258                    descending: false,
2259                    nulls_first: false,
2260                }),
2261                std::cmp::Ordering::Equal,
2262            ),
2263        ];
2264        for (a, b, opts, expected) in testcases {
2265            assert_eq!(
2266                cmp_with_opts(&a, &b, &opts),
2267                expected,
2268                "a: {:?}, b: {:?}, opts: {:?}",
2269                a,
2270                b,
2271                opts
2272            );
2273        }
2274    }
2275
2276    #[test]
2277    fn test_find_slice_from_range() {
2278        let test_cases = vec![
2279            // test for off by one case
2280            (
2281                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2282                false,
2283                TimeRange {
2284                    start: Timestamp::new_millisecond(2),
2285                    end: Timestamp::new_millisecond(4),
2286                },
2287                Ok((1, 2)),
2288            ),
2289            (
2290                Arc::new(TimestampMillisecondArray::from_iter_values([
2291                    -2, -1, 0, 1, 2, 3, 4, 5,
2292                ])) as ArrayRef,
2293                false,
2294                TimeRange {
2295                    start: Timestamp::new_millisecond(-1),
2296                    end: Timestamp::new_millisecond(4),
2297                },
2298                Ok((1, 5)),
2299            ),
2300            (
2301                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2302                false,
2303                TimeRange {
2304                    start: Timestamp::new_millisecond(2),
2305                    end: Timestamp::new_millisecond(5),
2306                },
2307                Ok((1, 2)),
2308            ),
2309            (
2310                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2311                false,
2312                TimeRange {
2313                    start: Timestamp::new_millisecond(2),
2314                    end: Timestamp::new_millisecond(5),
2315                },
2316                Ok((1, 3)),
2317            ),
2318            (
2319                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2320                false,
2321                TimeRange {
2322                    start: Timestamp::new_millisecond(2),
2323                    end: Timestamp::new_millisecond(5),
2324                },
2325                Ok((1, 2)),
2326            ),
2327            (
2328                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2329                false,
2330                TimeRange {
2331                    start: Timestamp::new_millisecond(6),
2332                    end: Timestamp::new_millisecond(7),
2333                },
2334                Ok((5, 0)),
2335            ),
2336            // descending off by one cases
2337            (
2338                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2339                true,
2340                TimeRange {
2341                    end: Timestamp::new_millisecond(4),
2342                    start: Timestamp::new_millisecond(1),
2343                },
2344                Ok((1, 3)),
2345            ),
2346            (
2347                Arc::new(TimestampMillisecondArray::from_iter_values([
2348                    5, 4, 3, 2, 1, 0,
2349                ])) as ArrayRef,
2350                true,
2351                TimeRange {
2352                    end: Timestamp::new_millisecond(4),
2353                    start: Timestamp::new_millisecond(1),
2354                },
2355                Ok((2, 3)),
2356            ),
2357            (
2358                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2359                true,
2360                TimeRange {
2361                    end: Timestamp::new_millisecond(4),
2362                    start: Timestamp::new_millisecond(1),
2363                },
2364                Ok((1, 2)),
2365            ),
2366            (
2367                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2368                true,
2369                TimeRange {
2370                    end: Timestamp::new_millisecond(4),
2371                    start: Timestamp::new_millisecond(1),
2372                },
2373                Ok((2, 2)),
2374            ),
2375            (
2376                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2377                true,
2378                TimeRange {
2379                    end: Timestamp::new_millisecond(5),
2380                    start: Timestamp::new_millisecond(2),
2381                },
2382                Ok((1, 3)),
2383            ),
2384            (
2385                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2386                true,
2387                TimeRange {
2388                    end: Timestamp::new_millisecond(5),
2389                    start: Timestamp::new_millisecond(2),
2390                },
2391                Ok((1, 2)),
2392            ),
2393            (
2394                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2395                true,
2396                TimeRange {
2397                    end: Timestamp::new_millisecond(5),
2398                    start: Timestamp::new_millisecond(2),
2399                },
2400                Ok((1, 3)),
2401            ),
2402            (
2403                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2404                true,
2405                TimeRange {
2406                    end: Timestamp::new_millisecond(5),
2407                    start: Timestamp::new_millisecond(2),
2408                },
2409                Ok((1, 2)),
2410            ),
2411            (
2412                Arc::new(TimestampMillisecondArray::from_iter_values([
2413                    10, 9, 8, 7, 6,
2414                ])) as ArrayRef,
2415                true,
2416                TimeRange {
2417                    end: Timestamp::new_millisecond(5),
2418                    start: Timestamp::new_millisecond(2),
2419                },
2420                Ok((5, 0)),
2421            ),
2422            // test off by one case
2423            (
2424                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2425                true,
2426                TimeRange {
2427                    end: Timestamp::new_millisecond(4),
2428                    start: Timestamp::new_millisecond(3),
2429                },
2430                Ok((0, 1)),
2431            ),
2432            (
2433                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2434                true,
2435                TimeRange {
2436                    end: Timestamp::new_millisecond(4),
2437                    start: Timestamp::new_millisecond(3),
2438                },
2439                Ok((1, 1)),
2440            ),
2441            (
2442                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2443                true,
2444                TimeRange {
2445                    end: Timestamp::new_millisecond(4),
2446                    start: Timestamp::new_millisecond(3),
2447                },
2448                Ok((2, 1)),
2449            ),
2450        ];
2451
2452        for (sort_vals, descending, range, expected) in test_cases {
2453            let sort_column = SortColumn {
2454                values: sort_vals,
2455                options: Some(SortOptions {
2456                    descending,
2457                    ..Default::default()
2458                }),
2459            };
2460            let ret = find_slice_from_range(&sort_column, &range);
2461            match (ret, expected) {
2462                (Ok(ret), Ok(expected)) => {
2463                    assert_eq!(
2464                        ret, expected,
2465                        "sort_vals: {:?}, range: {:?}",
2466                        sort_column, range
2467                    )
2468                }
2469                (Err(err), Err(expected)) => {
2470                    let expected: &str = expected;
2471                    assert!(
2472                        err.to_string().contains(expected),
2473                        "err: {:?}, expected: {:?}",
2474                        err,
2475                        expected
2476                    );
2477                }
2478                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2479            }
2480        }
2481    }
2482
2483    #[derive(Debug)]
2484    struct TestStream {
2485        expression: PhysicalSortExpr,
2486        fetch: Option<usize>,
2487        input: Vec<(PartitionRange, DfRecordBatch)>,
2488        output: Vec<DfRecordBatch>,
2489        schema: SchemaRef,
2490    }
2491    use datafusion::physical_plan::expressions::Column;
2492    impl TestStream {
2493        fn new(
2494            ts_col: Column,
2495            opt: SortOptions,
2496            fetch: Option<usize>,
2497            schema: impl Into<arrow_schema::Fields>,
2498            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2499            expected: Vec<Vec<ArrayRef>>,
2500        ) -> Self {
2501            let expression = PhysicalSortExpr {
2502                expr: Arc::new(ts_col),
2503                options: opt,
2504            };
2505            let schema = Schema::new(schema.into());
2506            let schema = Arc::new(schema);
2507            let input = input
2508                .into_iter()
2509                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2510                .collect_vec();
2511            let output_batchs = expected
2512                .into_iter()
2513                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2514                .collect_vec();
2515            Self {
2516                expression,
2517                fetch,
2518                input,
2519                output: output_batchs,
2520                schema,
2521            }
2522        }
2523
2524        async fn run_test(&self) -> Vec<DfRecordBatch> {
2525            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2526
2527            let mock_input = MockInputExec::new(batches, self.schema.clone());
2528
2529            let exec = WindowedSortExec::try_new(
2530                self.expression.clone(),
2531                self.fetch,
2532                vec![ranges],
2533                Arc::new(mock_input),
2534            )
2535            .unwrap();
2536
2537            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2538
2539            let real_output = exec_stream.collect::<Vec<_>>().await;
2540            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2541            real_output
2542        }
2543    }
2544
2545    #[tokio::test]
2546    async fn test_window_sort_stream() {
2547        let test_cases = [
2548            TestStream::new(
2549                Column::new("ts", 0),
2550                SortOptions {
2551                    descending: false,
2552                    nulls_first: true,
2553                },
2554                None,
2555                vec![Field::new(
2556                    "ts",
2557                    DataType::Timestamp(TimeUnit::Millisecond, None),
2558                    false,
2559                )],
2560                vec![],
2561                vec![],
2562            ),
2563            TestStream::new(
2564                Column::new("ts", 0),
2565                SortOptions {
2566                    descending: false,
2567                    nulls_first: true,
2568                },
2569                None,
2570                vec![Field::new(
2571                    "ts",
2572                    DataType::Timestamp(TimeUnit::Millisecond, None),
2573                    false,
2574                )],
2575                vec![
2576                    // test one empty
2577                    (
2578                        PartitionRange {
2579                            start: Timestamp::new_millisecond(1),
2580                            end: Timestamp::new_millisecond(2),
2581                            num_rows: 1,
2582                            identifier: 0,
2583                        },
2584                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2585                    ),
2586                    (
2587                        PartitionRange {
2588                            start: Timestamp::new_millisecond(1),
2589                            end: Timestamp::new_millisecond(3),
2590                            num_rows: 1,
2591                            identifier: 0,
2592                        },
2593                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2594                    ),
2595                ],
2596                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2597                    [2],
2598                ))]],
2599            ),
2600            TestStream::new(
2601                Column::new("ts", 0),
2602                SortOptions {
2603                    descending: false,
2604                    nulls_first: true,
2605                },
2606                None,
2607                vec![Field::new(
2608                    "ts",
2609                    DataType::Timestamp(TimeUnit::Millisecond, None),
2610                    false,
2611                )],
2612                vec![
2613                    // test one empty
2614                    (
2615                        PartitionRange {
2616                            start: Timestamp::new_millisecond(1),
2617                            end: Timestamp::new_millisecond(2),
2618                            num_rows: 1,
2619                            identifier: 0,
2620                        },
2621                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2622                    ),
2623                    (
2624                        PartitionRange {
2625                            start: Timestamp::new_millisecond(1),
2626                            end: Timestamp::new_millisecond(3),
2627                            num_rows: 1,
2628                            identifier: 0,
2629                        },
2630                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2631                    ),
2632                ],
2633                vec![],
2634            ),
2635            TestStream::new(
2636                Column::new("ts", 0),
2637                SortOptions {
2638                    descending: false,
2639                    nulls_first: true,
2640                },
2641                None,
2642                vec![Field::new(
2643                    "ts",
2644                    DataType::Timestamp(TimeUnit::Millisecond, None),
2645                    false,
2646                )],
2647                vec![
2648                    // test indistinguishable case
2649                    // we can't know whether `2` belong to which range
2650                    (
2651                        PartitionRange {
2652                            start: Timestamp::new_millisecond(1),
2653                            end: Timestamp::new_millisecond(2),
2654                            num_rows: 1,
2655                            identifier: 0,
2656                        },
2657                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2658                    ),
2659                    (
2660                        PartitionRange {
2661                            start: Timestamp::new_millisecond(1),
2662                            end: Timestamp::new_millisecond(3),
2663                            num_rows: 1,
2664                            identifier: 0,
2665                        },
2666                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2667                    ),
2668                ],
2669                vec![
2670                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2671                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2672                ],
2673            ),
2674            TestStream::new(
2675                Column::new("ts", 0),
2676                SortOptions {
2677                    descending: false,
2678                    nulls_first: true,
2679                },
2680                None,
2681                vec![Field::new(
2682                    "ts",
2683                    DataType::Timestamp(TimeUnit::Millisecond, None),
2684                    false,
2685                )],
2686                vec![
2687                    // test direct emit
2688                    (
2689                        PartitionRange {
2690                            start: Timestamp::new_millisecond(1),
2691                            end: Timestamp::new_millisecond(3),
2692                            num_rows: 1,
2693                            identifier: 0,
2694                        },
2695                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2696                            1, 2,
2697                        ]))],
2698                    ),
2699                    (
2700                        PartitionRange {
2701                            start: Timestamp::new_millisecond(1),
2702                            end: Timestamp::new_millisecond(4),
2703                            num_rows: 1,
2704                            identifier: 0,
2705                        },
2706                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2707                            2, 3,
2708                        ]))],
2709                    ),
2710                ],
2711                vec![
2712                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2713                        1, 2,
2714                    ]))],
2715                    // didn't trigger a merge sort/concat here so this is it
2716                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2717                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2718                ],
2719            ),
2720            TestStream::new(
2721                Column::new("ts", 0),
2722                SortOptions {
2723                    descending: false,
2724                    nulls_first: true,
2725                },
2726                None,
2727                vec![Field::new(
2728                    "ts",
2729                    DataType::Timestamp(TimeUnit::Millisecond, None),
2730                    false,
2731                )],
2732                vec![
2733                    // test more of cross working range batch intersection
2734                    (
2735                        PartitionRange {
2736                            start: Timestamp::new_millisecond(1),
2737                            end: Timestamp::new_millisecond(3),
2738                            num_rows: 1,
2739                            identifier: 0,
2740                        },
2741                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2742                            1, 2,
2743                        ]))],
2744                    ),
2745                    (
2746                        PartitionRange {
2747                            start: Timestamp::new_millisecond(1),
2748                            end: Timestamp::new_millisecond(4),
2749                            num_rows: 1,
2750                            identifier: 1,
2751                        },
2752                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2753                            1, 2, 3,
2754                        ]))],
2755                    ),
2756                ],
2757                vec![
2758                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2759                        1, 1, 2, 2,
2760                    ]))],
2761                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2762                ],
2763            ),
2764            TestStream::new(
2765                Column::new("ts", 0),
2766                SortOptions {
2767                    descending: false,
2768                    nulls_first: true,
2769                },
2770                None,
2771                vec![Field::new(
2772                    "ts",
2773                    DataType::Timestamp(TimeUnit::Millisecond, None),
2774                    false,
2775                )],
2776                vec![
2777                    // no overlap, empty intersection batch case
2778                    (
2779                        PartitionRange {
2780                            start: Timestamp::new_millisecond(1),
2781                            end: Timestamp::new_millisecond(3),
2782                            num_rows: 1,
2783                            identifier: 0,
2784                        },
2785                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2786                            1, 2,
2787                        ]))],
2788                    ),
2789                    (
2790                        PartitionRange {
2791                            start: Timestamp::new_millisecond(1),
2792                            end: Timestamp::new_millisecond(4),
2793                            num_rows: 1,
2794                            identifier: 1,
2795                        },
2796                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2797                            1, 2, 3,
2798                        ]))],
2799                    ),
2800                    (
2801                        PartitionRange {
2802                            start: Timestamp::new_millisecond(4),
2803                            end: Timestamp::new_millisecond(6),
2804                            num_rows: 1,
2805                            identifier: 1,
2806                        },
2807                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2808                            4, 5,
2809                        ]))],
2810                    ),
2811                ],
2812                vec![
2813                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2814                        1, 1, 2, 2,
2815                    ]))],
2816                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2817                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2818                        4, 5,
2819                    ]))],
2820                ],
2821            ),
2822            // test fetch
2823            TestStream::new(
2824                Column::new("ts", 0),
2825                SortOptions {
2826                    descending: false,
2827                    nulls_first: true,
2828                },
2829                Some(6),
2830                vec![Field::new(
2831                    "ts",
2832                    DataType::Timestamp(TimeUnit::Millisecond, None),
2833                    false,
2834                )],
2835                vec![
2836                    // no overlap, empty intersection batch case
2837                    (
2838                        PartitionRange {
2839                            start: Timestamp::new_millisecond(1),
2840                            end: Timestamp::new_millisecond(3),
2841                            num_rows: 1,
2842                            identifier: 0,
2843                        },
2844                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2845                            1, 2,
2846                        ]))],
2847                    ),
2848                    (
2849                        PartitionRange {
2850                            start: Timestamp::new_millisecond(1),
2851                            end: Timestamp::new_millisecond(4),
2852                            num_rows: 1,
2853                            identifier: 1,
2854                        },
2855                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2856                            1, 2, 3,
2857                        ]))],
2858                    ),
2859                    (
2860                        PartitionRange {
2861                            start: Timestamp::new_millisecond(3),
2862                            end: Timestamp::new_millisecond(6),
2863                            num_rows: 1,
2864                            identifier: 1,
2865                        },
2866                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2867                            4, 5,
2868                        ]))],
2869                    ),
2870                ],
2871                vec![
2872                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2873                        1, 1, 2, 2,
2874                    ]))],
2875                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2876                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2877                ],
2878            ),
2879            TestStream::new(
2880                Column::new("ts", 0),
2881                SortOptions {
2882                    descending: false,
2883                    nulls_first: true,
2884                },
2885                Some(3),
2886                vec![Field::new(
2887                    "ts",
2888                    DataType::Timestamp(TimeUnit::Millisecond, None),
2889                    false,
2890                )],
2891                vec![
2892                    // no overlap, empty intersection batch case
2893                    (
2894                        PartitionRange {
2895                            start: Timestamp::new_millisecond(1),
2896                            end: Timestamp::new_millisecond(3),
2897                            num_rows: 1,
2898                            identifier: 0,
2899                        },
2900                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2901                            1, 2,
2902                        ]))],
2903                    ),
2904                    (
2905                        PartitionRange {
2906                            start: Timestamp::new_millisecond(1),
2907                            end: Timestamp::new_millisecond(4),
2908                            num_rows: 1,
2909                            identifier: 1,
2910                        },
2911                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2912                            1, 2, 3,
2913                        ]))],
2914                    ),
2915                    (
2916                        PartitionRange {
2917                            start: Timestamp::new_millisecond(3),
2918                            end: Timestamp::new_millisecond(6),
2919                            num_rows: 1,
2920                            identifier: 1,
2921                        },
2922                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2923                            4, 5,
2924                        ]))],
2925                    ),
2926                ],
2927                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2928                    [1, 1, 2],
2929                ))]],
2930            ),
2931            // rev case
2932            TestStream::new(
2933                Column::new("ts", 0),
2934                SortOptions {
2935                    descending: true,
2936                    nulls_first: true,
2937                },
2938                None,
2939                vec![Field::new(
2940                    "ts",
2941                    DataType::Timestamp(TimeUnit::Millisecond, None),
2942                    false,
2943                )],
2944                vec![
2945                    // reverse order
2946                    (
2947                        PartitionRange {
2948                            start: Timestamp::new_millisecond(3),
2949                            end: Timestamp::new_millisecond(6),
2950                            num_rows: 1,
2951                            identifier: 1,
2952                        },
2953                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2954                            5, 4,
2955                        ]))],
2956                    ),
2957                    (
2958                        PartitionRange {
2959                            start: Timestamp::new_millisecond(1),
2960                            end: Timestamp::new_millisecond(4),
2961                            num_rows: 1,
2962                            identifier: 1,
2963                        },
2964                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2965                            3, 2, 1,
2966                        ]))],
2967                    ),
2968                    (
2969                        PartitionRange {
2970                            start: Timestamp::new_millisecond(1),
2971                            end: Timestamp::new_millisecond(3),
2972                            num_rows: 1,
2973                            identifier: 0,
2974                        },
2975                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2976                            2, 1,
2977                        ]))],
2978                    ),
2979                ],
2980                vec![
2981                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2982                        5, 4,
2983                    ]))],
2984                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2985                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2986                        2, 2, 1, 1,
2987                    ]))],
2988                ],
2989            ),
2990            TestStream::new(
2991                Column::new("ts", 0),
2992                SortOptions {
2993                    descending: false,
2994                    nulls_first: true,
2995                },
2996                None,
2997                vec![Field::new(
2998                    "ts",
2999                    DataType::Timestamp(TimeUnit::Millisecond, None),
3000                    false,
3001                )],
3002                vec![
3003                    // long have subset short run case
3004                    (
3005                        PartitionRange {
3006                            start: Timestamp::new_millisecond(1),
3007                            end: Timestamp::new_millisecond(10),
3008                            num_rows: 1,
3009                            identifier: 0,
3010                        },
3011                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3012                            1, 5, 9,
3013                        ]))],
3014                    ),
3015                    (
3016                        PartitionRange {
3017                            start: Timestamp::new_millisecond(3),
3018                            end: Timestamp::new_millisecond(7),
3019                            num_rows: 1,
3020                            identifier: 1,
3021                        },
3022                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3023                            3, 4, 5, 6,
3024                        ]))],
3025                    ),
3026                ],
3027                vec![
3028                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3029                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3030                        3, 4, 5, 5, 6, 9,
3031                    ]))],
3032                ],
3033            ),
3034            TestStream::new(
3035                Column::new("ts", 0),
3036                SortOptions {
3037                    descending: false,
3038                    nulls_first: true,
3039                },
3040                None,
3041                vec![Field::new(
3042                    "ts",
3043                    DataType::Timestamp(TimeUnit::Millisecond, None),
3044                    false,
3045                )],
3046                vec![
3047                    // complex overlap
3048                    (
3049                        PartitionRange {
3050                            start: Timestamp::new_millisecond(1),
3051                            end: Timestamp::new_millisecond(3),
3052                            num_rows: 1,
3053                            identifier: 0,
3054                        },
3055                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3056                            1, 2,
3057                        ]))],
3058                    ),
3059                    (
3060                        PartitionRange {
3061                            start: Timestamp::new_millisecond(1),
3062                            end: Timestamp::new_millisecond(10),
3063                            num_rows: 1,
3064                            identifier: 1,
3065                        },
3066                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3067                            1, 3, 4, 5, 6, 8,
3068                        ]))],
3069                    ),
3070                    (
3071                        PartitionRange {
3072                            start: Timestamp::new_millisecond(7),
3073                            end: Timestamp::new_millisecond(10),
3074                            num_rows: 1,
3075                            identifier: 1,
3076                        },
3077                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3078                            7, 8, 9,
3079                        ]))],
3080                    ),
3081                ],
3082                vec![
3083                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3084                        1, 1, 2,
3085                    ]))],
3086                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3087                        3, 4, 5, 6,
3088                    ]))],
3089                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3090                        7, 8, 8, 9,
3091                    ]))],
3092                ],
3093            ),
3094            TestStream::new(
3095                Column::new("ts", 0),
3096                SortOptions {
3097                    descending: false,
3098                    nulls_first: true,
3099                },
3100                None,
3101                vec![Field::new(
3102                    "ts",
3103                    DataType::Timestamp(TimeUnit::Millisecond, None),
3104                    false,
3105                )],
3106                vec![
3107                    // complex subset with having same datapoint
3108                    (
3109                        PartitionRange {
3110                            start: Timestamp::new_millisecond(1),
3111                            end: Timestamp::new_millisecond(11),
3112                            num_rows: 1,
3113                            identifier: 0,
3114                        },
3115                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3116                            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3117                        ]))],
3118                    ),
3119                    (
3120                        PartitionRange {
3121                            start: Timestamp::new_millisecond(5),
3122                            end: Timestamp::new_millisecond(7),
3123                            num_rows: 1,
3124                            identifier: 1,
3125                        },
3126                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3127                            5, 6,
3128                        ]))],
3129                    ),
3130                ],
3131                vec![
3132                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3133                        1, 2, 3, 4,
3134                    ]))],
3135                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3136                        5, 5, 6, 6, 7, 8, 9, 10,
3137                    ]))],
3138                ],
3139            ),
3140        ];
3141
3142        let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3143
3144        for (idx, testcase) in &indexed_test_cases {
3145            let output = testcase.run_test().await;
3146            assert_eq!(output, testcase.output, "case {idx} failed.");
3147        }
3148    }
3149
3150    #[tokio::test]
3151    async fn fuzzy_ish_test_window_sort_stream() {
3152        let test_cnt = 100;
3153        let part_cnt_bound = 100;
3154        let range_size_bound = 100;
3155        let range_offset_bound = 100;
3156        let in_range_datapoint_cnt_bound = 100;
3157        let fetch_bound = 100;
3158
3159        let mut rng = fastrand::Rng::new();
3160        let rng_seed = rng.u64(..);
3161        rng.seed(rng_seed);
3162        let mut bound_val = None;
3163        // construct testcases
3164        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3165        let mut full_testcase_list = Vec::new();
3166        for _case_id in 0..test_cnt {
3167            let descending = rng.bool();
3168            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3169                if descending {
3170                    return Box::new(|a: &T, b: &T| b.cmp(a));
3171                }
3172                Box::new(|a: &T, b: &T| a.cmp(b))
3173            }
3174            let unit = match rng.u8(0..3) {
3175                0 => TimeUnit::Second,
3176                1 => TimeUnit::Millisecond,
3177                2 => TimeUnit::Microsecond,
3178                _ => TimeUnit::Nanosecond,
3179            };
3180            let fetch = if rng.bool() {
3181                Some(rng.usize(0..fetch_bound))
3182            } else {
3183                None
3184            };
3185
3186            let mut input_ranged_data = vec![];
3187            let mut output_data: Vec<i64> = vec![];
3188            // generate input data
3189            for part_id in 0..rng.usize(0..part_cnt_bound) {
3190                let (start, end) = if descending {
3191                    let end = bound_val
3192                        .map(|i| i - rng.i64(0..range_offset_bound))
3193                        .unwrap_or_else(|| rng.i64(..));
3194                    bound_val = Some(end);
3195                    let start = end - rng.i64(1..range_size_bound);
3196                    let start = Timestamp::new(start, unit.into());
3197                    let end = Timestamp::new(end, unit.into());
3198                    (start, end)
3199                } else {
3200                    let start = bound_val
3201                        .map(|i| i + rng.i64(0..range_offset_bound))
3202                        .unwrap_or_else(|| rng.i64(..));
3203                    bound_val = Some(start);
3204                    let end = start + rng.i64(1..range_size_bound);
3205                    let start = Timestamp::new(start, unit.into());
3206                    let end = Timestamp::new(end, unit.into());
3207                    (start, end)
3208                };
3209
3210                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3211                let data_gen = iter
3212                    .map(|_| rng.i64(start.value()..end.value()))
3213                    .sorted_by(ret_cmp_fn(descending))
3214                    .collect_vec();
3215                output_data.extend(data_gen.clone());
3216                let arr = new_ts_array(unit, data_gen);
3217                let range = PartitionRange {
3218                    start,
3219                    end,
3220                    num_rows: arr.len(),
3221                    identifier: part_id,
3222                };
3223                input_ranged_data.push((range, vec![arr]));
3224            }
3225
3226            output_data.sort_by(ret_cmp_fn(descending));
3227            if let Some(fetch) = fetch {
3228                output_data.truncate(fetch);
3229            }
3230            let output_arr = new_ts_array(unit, output_data);
3231
3232            let test_stream = TestStream::new(
3233                Column::new("ts", 0),
3234                SortOptions {
3235                    descending,
3236                    nulls_first: true,
3237                },
3238                fetch,
3239                vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3240                input_ranged_data.clone(),
3241                vec![vec![output_arr]],
3242            );
3243            full_testcase_list.push(test_stream);
3244        }
3245
3246        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3247            let res = test_stream.run_test().await;
3248            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3249            let expected = test_stream.output;
3250            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3251
3252            if res_concat != expected_concat {
3253                {
3254                    let mut f_input = std::io::stderr();
3255                    f_input.write_all(b"[").unwrap();
3256                    for (input_range, input_arr) in test_stream.input {
3257                        let range_json = json!({
3258                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3259                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3260                            "num_rows": input_range.num_rows,
3261                            "identifier": input_range.identifier,
3262                        });
3263                        let buf = Vec::new();
3264                        let mut input_writer = ArrayWriter::new(buf);
3265                        input_writer.write(&input_arr).unwrap();
3266                        input_writer.finish().unwrap();
3267                        let res_str =
3268                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3269                        let whole_json =
3270                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3271                        f_input.write_all(whole_json.as_bytes()).unwrap();
3272                    }
3273                    f_input.write_all(b"]").unwrap();
3274                }
3275                {
3276                    let mut f_res = std::io::stderr();
3277                    f_res.write_all(b"[").unwrap();
3278                    for batch in &res {
3279                        let mut res_writer = ArrayWriter::new(f_res);
3280                        res_writer.write(batch).unwrap();
3281                        res_writer.finish().unwrap();
3282                        f_res = res_writer.into_inner();
3283                        f_res.write_all(b",").unwrap();
3284                    }
3285                    f_res.write_all(b"]").unwrap();
3286
3287                    let f_res_concat = std::io::stderr();
3288                    let mut res_writer = ArrayWriter::new(f_res_concat);
3289                    res_writer.write(&res_concat).unwrap();
3290                    res_writer.finish().unwrap();
3291
3292                    let f_expected = std::io::stderr();
3293                    let mut expected_writer = ArrayWriter::new(f_expected);
3294                    expected_writer.write(&expected_concat).unwrap();
3295                    expected_writer.finish().unwrap();
3296                }
3297                panic!(
3298                    "case failed, case id: {0}, output and expected output to stderr",
3299                    case_id
3300                );
3301            }
3302            assert_eq!(
3303                res_concat, expected_concat,
3304                "case failed, case id: {}, rng seed: {}",
3305                case_id, rng_seed
3306            );
3307        }
3308    }
3309}