query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{internal_err, DataFusionError};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
53/// merge sort them whenever possible, and emit the sorted result as soon as possible.
54/// This sorting plan only accept sort by ts and will not sort by other fields.
55///
56/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
57///
58/// # Invariant Promise on Input Stream
59/// 1. The input stream must be sorted by timestamp and
60/// 2. in the order of `PartitionRange` in `ranges`
61/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
62/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
63///
64///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
65
66#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68    /// Physical sort expressions(that is, sort by timestamp)
69    expression: PhysicalSortExpr,
70    /// Optional number of rows to fetch. Stops producing rows after this fetch
71    fetch: Option<usize>,
72    /// The input ranges indicate input stream will be composed of those ranges in given order.
73    ///
74    /// Each partition has one vector of `PartitionRange`.
75    ranges: Vec<Vec<PartitionRange>>,
76    /// All available working ranges and their corresponding working set
77    ///
78    /// working ranges promise once input stream get a value out of current range, future values will never
79    /// be in this range. Each partition has one vector of ranges.
80    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81    input: Arc<dyn ExecutionPlan>,
82    /// Execution metrics
83    metrics: ExecutionPlanMetricsSet,
84    properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88    ranges: &[Vec<PartitionRange>],
89    descending: bool,
90) -> Result<()> {
91    let is_valid = ranges.iter().all(|r| {
92        if descending {
93            r.windows(2).all(|w| w[0].end >= w[1].end)
94        } else {
95            r.windows(2).all(|w| w[0].start <= w[1].start)
96        }
97    });
98
99    if !is_valid {
100        let msg = if descending {
101            "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102        } else {
103            "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104        };
105        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107    } else {
108        Ok(())
109    }
110}
111
112impl WindowedSortExec {
113    pub fn try_new(
114        expression: PhysicalSortExpr,
115        fetch: Option<usize>,
116        ranges: Vec<Vec<PartitionRange>>,
117        input: Arc<dyn ExecutionPlan>,
118    ) -> Result<Self> {
119        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121        let mut eq_properties = input.equivalence_properties().clone();
122        eq_properties.reorder(vec![expression.clone()])?;
123
124        let properties = input.properties();
125        let properties = PlanProperties::new(
126            eq_properties,
127            input.output_partitioning().clone(),
128            properties.emission_type,
129            properties.boundedness,
130        );
131
132        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133        for r in &ranges {
134            let overlap_counts = split_overlapping_ranges(r);
135            let working_ranges =
136                compute_all_working_ranges(&overlap_counts, expression.options.descending);
137            all_avail_working_range.push(working_ranges);
138        }
139
140        Ok(Self {
141            expression,
142            fetch,
143            ranges,
144            all_avail_working_range,
145            input,
146            metrics: ExecutionPlanMetricsSet::new(),
147            properties,
148        })
149    }
150
151    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
152    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
153    /// of working set, we can merge results before whenever possible.
154    pub fn to_stream(
155        &self,
156        context: Arc<TaskContext>,
157        partition: usize,
158    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159        let input_stream: DfSendableRecordBatchStream =
160            self.input.execute(partition, context.clone())?;
161
162        let df_stream = Box::pin(WindowedSortStream::new(
163            context,
164            self,
165            input_stream,
166            partition,
167        )) as _;
168
169        Ok(df_stream)
170    }
171}
172
173impl DisplayAs for WindowedSortExec {
174    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        write!(
176            f,
177            "WindowedSortExec: expr={} num_ranges={}",
178            self.expression,
179            self.ranges.len()
180        )?;
181        if let Some(fetch) = self.fetch {
182            write!(f, " fetch={}", fetch)?;
183        }
184        Ok(())
185    }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189    fn as_any(&self) -> &dyn Any {
190        self
191    }
192
193    fn schema(&self) -> SchemaRef {
194        self.input.schema()
195    }
196
197    fn properties(&self) -> &PlanProperties {
198        &self.properties
199    }
200
201    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202        vec![&self.input]
203    }
204
205    fn with_new_children(
206        self: Arc<Self>,
207        children: Vec<Arc<dyn ExecutionPlan>>,
208    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209        let new_input = if let Some(first) = children.first() {
210            first
211        } else {
212            internal_err!("No children found")?
213        };
214        let new = Self::try_new(
215            self.expression.clone(),
216            self.fetch,
217            self.ranges.clone(),
218            new_input.clone(),
219        )?;
220        Ok(Arc::new(new))
221    }
222
223    fn execute(
224        &self,
225        partition: usize,
226        context: Arc<TaskContext>,
227    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228        self.to_stream(context, partition)
229    }
230
231    fn metrics(&self) -> Option<MetricsSet> {
232        Some(self.metrics.clone_inner())
233    }
234
235    /// # Explain
236    ///
237    /// This plan needs to be executed on each partition independently,
238    /// and is expected to run directly on storage engine's output
239    /// distribution / partition.
240    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241        vec![false]
242    }
243
244    fn name(&self) -> &str {
245        "WindowedSortExec"
246    }
247}
248
249/// The core logic of merging sort multiple sorted ranges
250///
251/// the flow of data is:
252/// ```md
253/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
254/// ```
255pub struct WindowedSortStream {
256    /// Memory pool for this stream
257    memory_pool: Arc<dyn MemoryPool>,
258    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
259    in_progress: Vec<DfRecordBatch>,
260    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
261    last_value: Option<Timestamp>,
262    /// Current working set of `PartitionRange` sorted RecordBatches
263    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
265    merge_stream: VecDeque<DfSendableRecordBatchStream>,
266    /// The number of times merge sort has been called
267    merge_count: usize,
268    /// Index into current `working_range` in `all_avail_working_range`
269    working_idx: usize,
270    /// input stream assumed reading in order of `PartitionRange`
271    input: DfSendableRecordBatchStream,
272    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
273    is_terminated: bool,
274    /// Output Schema, which is the same as input schema, since this is a sort plan
275    schema: SchemaRef,
276    /// Physical sort expressions(that is, sort by timestamp)
277    expression: PhysicalSortExpr,
278    /// Optional number of rows to fetch. Stops producing rows after this fetch
279    fetch: Option<usize>,
280    /// number of rows produced
281    produced: usize,
282    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
283    batch_size: usize,
284    /// All available working ranges and their corresponding working set
285    ///
286    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
287    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288    /// The input partition ranges
289    #[allow(dead_code)] // this is used under #[debug_assertions]
290    ranges: Vec<PartitionRange>,
291    /// Execution metrics
292    metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296    pub fn new(
297        context: Arc<TaskContext>,
298        exec: &WindowedSortExec,
299        input: DfSendableRecordBatchStream,
300        partition: usize,
301    ) -> Self {
302        Self {
303            memory_pool: context.runtime_env().memory_pool.clone(),
304            in_progress: Vec::new(),
305            last_value: None,
306            sorted_input_runs: Vec::new(),
307            merge_stream: VecDeque::new(),
308            merge_count: 0,
309            working_idx: 0,
310            schema: input.schema(),
311            input,
312            is_terminated: false,
313            expression: exec.expression.clone(),
314            fetch: exec.fetch,
315            produced: 0,
316            batch_size: context.session_config().batch_size(),
317            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318            ranges: exec.ranges[partition].clone(),
319            metrics: BaselineMetrics::new(&exec.metrics, partition),
320        }
321    }
322}
323
324impl WindowedSortStream {
325    #[cfg(debug_assertions)]
326    fn check_subset_ranges(&self, cur_range: &TimeRange) {
327        let cur_is_subset_to = self
328            .ranges
329            .iter()
330            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331            .collect_vec();
332        if cur_is_subset_to.is_empty() {
333            error!("Current range is not a subset of any PartitionRange");
334            // found in all ranges that are subset of current range
335            let subset_ranges = self
336                .ranges
337                .iter()
338                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339                .collect_vec();
340            let only_overlap = self
341                .ranges
342                .iter()
343                .filter(|r| {
344                    let r = TimeRange::from(*r);
345                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346                })
347                .collect_vec();
348            error!(
349                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350                subset_ranges.len(),
351                only_overlap.len(),
352                subset_ranges,
353                only_overlap
354            );
355        } else {
356            let only_overlap = self
357                .ranges
358                .iter()
359                .filter(|r| {
360                    let r = TimeRange::from(*r);
361                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362                })
363                .collect_vec();
364            error!(
365                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366                cur_is_subset_to.len(),
367                only_overlap.len(),
368                cur_is_subset_to,
369                only_overlap
370            );
371        }
372        let all_overlap_working_range = self
373            .all_avail_working_range
374            .iter()
375            .filter(|(range, _)| range.is_overlapping(cur_range))
376            .map(|(range, _)| range)
377            .collect_vec();
378        error!(
379            "Found {} working ranges that overlap with current range: {:?}",
380            all_overlap_working_range.len(),
381            all_overlap_working_range
382        );
383    }
384
385    /// Poll the next RecordBatch from the merge-sort's output stream
386    fn poll_result_stream(
387        &mut self,
388        cx: &mut Context<'_>,
389    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391            match merge_stream.as_mut().poll_next(cx) {
392                Poll::Ready(Some(Ok(batch))) => {
393                    let ret = if let Some(remaining) = self.remaining_fetch() {
394                        if remaining == 0 {
395                            self.is_terminated = true;
396                            None
397                        } else if remaining < batch.num_rows() {
398                            self.produced += remaining;
399                            Some(Ok(batch.slice(0, remaining)))
400                        } else {
401                            self.produced += batch.num_rows();
402                            Some(Ok(batch))
403                        }
404                    } else {
405                        self.produced += batch.num_rows();
406                        Some(Ok(batch))
407                    };
408                    return Poll::Ready(ret);
409                }
410                Poll::Ready(Some(Err(e))) => {
411                    return Poll::Ready(Some(Err(e)));
412                }
413                Poll::Ready(None) => {
414                    // current merge stream is done, we can start polling the next one
415
416                    self.merge_stream.pop_front();
417                    continue;
418                }
419                Poll::Pending => {
420                    return Poll::Pending;
421                }
422            }
423        }
424        // if no output stream is available
425        Poll::Ready(None)
426    }
427
428    /// The core logic of merging sort multiple sorted ranges
429    ///
430    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
431    pub fn poll_next_inner(
432        mut self: Pin<&mut Self>,
433        cx: &mut Context<'_>,
434    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435        // first check and send out the merge result
436        match self.poll_result_stream(cx) {
437            Poll::Ready(None) => {
438                if self.is_terminated {
439                    return Poll::Ready(None);
440                }
441            }
442            x => return x,
443        };
444
445        // consume input stream
446        while !self.is_terminated {
447            // then we get a new RecordBatch from input stream
448            let SortedRunSet {
449                runs_with_batch,
450                sort_column,
451            } = match self.input.as_mut().poll_next(cx) {
452                Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
453                Poll::Ready(Some(Err(e))) => {
454                    return Poll::Ready(Some(Err(e)));
455                }
456                Poll::Ready(None) => {
457                    // input stream is done, we need to merge sort the remaining working set
458                    self.is_terminated = true;
459                    self.build_sorted_stream()?;
460                    self.start_new_merge_sort()?;
461                    break;
462                }
463                Poll::Pending => return Poll::Pending,
464            };
465
466            // The core logic to eargerly merge sort the working set
467
468            // compare with last_value to find boundary, then merge runs if needed
469
470            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
471            let mut last_remaining = None;
472            let mut run_iter = runs_with_batch.into_iter();
473            loop {
474                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
475                    break;
476                };
477                if sorted_rb.num_rows() == 0 {
478                    continue;
479                }
480                // determine if this batch is in current working range
481                let Some(cur_range) = run_info.get_time_range() else {
482                    internal_err!("Found NULL in time index column")?
483                };
484                let Some(working_range) = self.get_working_range() else {
485                    internal_err!("No working range found")?
486                };
487
488                // ensure the current batch is in the working range
489                if sort_column.options.unwrap_or_default().descending {
490                    if cur_range.end > working_range.end {
491                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
492                        #[cfg(debug_assertions)]
493                        self.check_subset_ranges(&cur_range);
494                        internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
495                    }
496                } else if cur_range.start < working_range.start {
497                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
498                    #[cfg(debug_assertions)]
499                    self.check_subset_ranges(&cur_range);
500                    internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
501                }
502
503                if cur_range.is_subset(&working_range) {
504                    // data still in range, can't merge sort yet
505                    // see if can concat entire sorted rb, merge sort need to wait
506                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
507                } else if let Some(intersection) = cur_range.intersection(&working_range) {
508                    // slice rb by intersection and concat it then merge sort
509                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
510                    let (offset, len) = find_slice_from_range(
511                        &SortColumn {
512                            values: cur_sort_column.clone(),
513                            options: sort_column.options,
514                        },
515                        &intersection,
516                    )?;
517
518                    if offset != 0 {
519                        internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
520                    }
521
522                    let sliced_rb = sorted_rb.slice(offset, len);
523
524                    // try to concat the sliced input batch to the current `in_progress` run
525                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
526                    // since no more sorted data in this range will come in now, build stream now
527                    self.build_sorted_stream()?;
528
529                    // since we are crossing working range, we need to merge sort the working set
530                    self.start_new_merge_sort()?;
531
532                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
533                    if r_len != 0 {
534                        // we have remaining data in this batch, put it back to input queue
535                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
536                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
537                        let new_run_info = SucRun {
538                            offset: run_info.offset + r_offset,
539                            len: r_len,
540                            first_val: new_first_val,
541                            last_val: run_info.last_val,
542                        };
543                        last_remaining = Some((remaining_rb, new_run_info));
544                    }
545                    // deal with remaining batch cross working range problem
546                    // i.e: this example require more slice, and we are currently at point A
547                    // |---1---|       |---3---|
548                    // |-------A--2------------|
549                    //  put the remaining batch back to iter and deal it in next loop
550                } else {
551                    // no overlap, we can merge sort the working set
552
553                    self.build_sorted_stream()?;
554                    self.start_new_merge_sort()?;
555
556                    // always put it back to input queue until some batch is in working range
557                    last_remaining = Some((sorted_rb, run_info));
558                }
559            }
560
561            // poll result stream again to see if we can emit more results
562            match self.poll_result_stream(cx) {
563                Poll::Ready(None) => {
564                    if self.is_terminated {
565                        return Poll::Ready(None);
566                    }
567                }
568                x => return x,
569            };
570        }
571        // emit the merge result after terminated(all input stream is done)
572        self.poll_result_stream(cx)
573    }
574
575    fn push_batch(&mut self, batch: DfRecordBatch) {
576        self.in_progress.push(batch);
577    }
578
579    /// Try to concat the input batch to the current `in_progress` run
580    ///
581    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
582    fn try_concat_batch(
583        &mut self,
584        batch: DfRecordBatch,
585        run_info: &SucRun<Timestamp>,
586        opt: Option<SortOptions>,
587    ) -> datafusion_common::Result<()> {
588        let is_ok_to_concat =
589            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
590
591        if is_ok_to_concat {
592            self.push_batch(batch);
593            // next time we get input batch might still be ordered, so not build stream yet
594        } else {
595            // no more sorted data, build stream now
596            self.build_sorted_stream()?;
597            self.push_batch(batch);
598        }
599        self.last_value = run_info.last_val;
600        Ok(())
601    }
602
603    /// Get the current working range
604    fn get_working_range(&self) -> Option<TimeRange> {
605        self.all_avail_working_range
606            .get(self.working_idx)
607            .map(|(range, _)| *range)
608    }
609
610    /// Set current working range to the next working range
611    fn set_next_working_range(&mut self) {
612        self.working_idx += 1;
613    }
614
615    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
616    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
617        if self.in_progress.is_empty() {
618            return Ok(());
619        }
620        let data = std::mem::take(&mut self.in_progress);
621
622        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
623        self.sorted_input_runs.push(Box::pin(new_stream));
624        Ok(())
625    }
626
627    /// Start merging sort the current working set
628    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
629        if !self.in_progress.is_empty() {
630            return internal_err!("Starting a merge sort when in_progress is not empty")?;
631        }
632
633        self.set_next_working_range();
634
635        let streams = std::mem::take(&mut self.sorted_input_runs);
636        if streams.is_empty() {
637            return Ok(());
638        } else if streams.len() == 1 {
639            self.merge_stream
640                .push_back(streams.into_iter().next().unwrap());
641            return Ok(());
642        }
643
644        let fetch = self.remaining_fetch();
645        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
646            .register(&self.memory_pool);
647        self.merge_count += 1;
648
649        let resulting_stream = StreamingMergeBuilder::new()
650            .with_streams(streams)
651            .with_schema(self.schema())
652            .with_expressions(&[self.expression.clone()].into())
653            .with_metrics(self.metrics.clone())
654            .with_batch_size(self.batch_size)
655            .with_fetch(fetch)
656            .with_reservation(reservation)
657            .build()?;
658        self.merge_stream.push_back(resulting_stream);
659        // this working range is done, move to next working range
660        Ok(())
661    }
662
663    /// Remaining number of rows to fetch, if no fetch limit, return None
664    /// if fetch limit is reached, return Some(0)
665    fn remaining_fetch(&self) -> Option<usize> {
666        let total_now = self.produced;
667        self.fetch.map(|p| p.saturating_sub(total_now))
668    }
669}
670
671impl Stream for WindowedSortStream {
672    type Item = datafusion_common::Result<DfRecordBatch>;
673
674    fn poll_next(
675        mut self: Pin<&mut Self>,
676        cx: &mut Context<'_>,
677    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
678        let result = self.as_mut().poll_next_inner(cx);
679        self.metrics.record_poll(result)
680    }
681}
682
683impl RecordBatchStream for WindowedSortStream {
684    fn schema(&self) -> SchemaRef {
685        self.schema.clone()
686    }
687}
688
689/// split batch to sorted runs
690fn split_batch_to_sorted_run(
691    batch: DfRecordBatch,
692    expression: &PhysicalSortExpr,
693) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
694    // split input rb to sorted runs
695    let sort_column = expression.evaluate_to_sort_column(&batch)?;
696    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
697    if let Some(run) = sorted_runs_offset.first()
698        && sorted_runs_offset.len() == 1
699    {
700        if !(run.offset == 0 && run.len == batch.num_rows()) {
701            internal_err!(
702                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
703                run.offset,
704                run.len,
705                batch.num_rows()
706            )?;
707        }
708        // input rb is already sorted, we can emit it directly
709        Ok(SortedRunSet {
710            runs_with_batch: vec![(batch, run.clone())],
711            sort_column,
712        })
713    } else {
714        // those slice should be zero copy, so supposedly no new reservation needed
715        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
716        for run in sorted_runs_offset {
717            if run.offset + run.len > batch.num_rows() {
718                internal_err!(
719                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
720                    run.offset,
721                    run.len,
722                    batch.num_rows()
723                )?;
724            }
725            let new_rb = batch.slice(run.offset, run.len);
726            ret.push((new_rb, run));
727        }
728        Ok(SortedRunSet {
729            runs_with_batch: ret,
730            sort_column,
731        })
732    }
733}
734
735/// Downcast a temporal array to a specific type
736///
737/// usage similar to `downcast_primitive!` in `arrow-array` crate
738#[macro_export]
739macro_rules! downcast_ts_array {
740    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
741    {
742        match $data_type {
743            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
744                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
745            }
746            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
747                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
748            }
749            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
750                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
751            }
752            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
753                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
754            }
755            $($p => $fallback,)*
756        }
757    };
758}
759
760/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
761///
762/// Return the offset and length of the slice
763fn find_slice_from_range(
764    sort_column: &SortColumn,
765    range: &TimeRange,
766) -> datafusion_common::Result<(usize, usize)> {
767    let ty = sort_column.values.data_type();
768    let time_unit = if let DataType::Timestamp(unit, _) = ty {
769        unit
770    } else {
771        return Err(DataFusionError::Internal(format!(
772            "Unsupported sort column type: {}",
773            sort_column.values.data_type()
774        )));
775    };
776    let array = &sort_column.values;
777    let opt = &sort_column.options.unwrap_or_default();
778    let descending = opt.descending;
779
780    let typed_sorted_range = [range.start, range.end]
781        .iter()
782        .map(|t| {
783            t.convert_to(time_unit.into())
784                .ok_or_else(|| {
785                    DataFusionError::Internal(format!(
786                        "Failed to convert timestamp from {:?} to {:?}",
787                        t.unit(),
788                        time_unit
789                    ))
790                })
791                .and_then(|typed_ts| {
792                    let value = Value::Timestamp(typed_ts);
793                    value
794                        .try_to_scalar_value(&value.data_type())
795                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
796                })
797        })
798        .try_collect::<_, Vec<_>, _>()?;
799
800    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
801
802    // get slice that in which all data that `min_val<=data<max_val`
803    let (start, end) = if descending {
804        // note that `data < max_val`
805        // i,e, for max_val = 4, array = [5,3,2] should be start=1
806        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
807        let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
808        // min_val = 1, array = [3, 2, 1, 0], end = 3
809        // min_val = 1, array = [3, 2, 0], end = 2
810        let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
811        (start, end)
812    } else {
813        // min_val = 1, array = [1, 2, 3], start = 0
814        // min_val = 1, array = [0, 2, 3], start = 1
815        let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
816        // max_val = 3, array = [1, 3, 4], end = 1
817        // max_val = 3, array = [1, 2, 4], end = 2
818        let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
819        (start, end)
820    };
821
822    Ok((start, end - start))
823}
824
825/// Get an iterator from a primitive array.
826///
827/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
828#[macro_export]
829macro_rules! array_iter_helper {
830    ($t:ty, $unit:expr, $arr:expr) => {{
831        let typed = $arr
832            .as_any()
833            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
834            .unwrap();
835        let iter = typed.iter().enumerate();
836        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
837    }};
838}
839
840/// Compare with options, note None is considered as NULL here
841///
842/// default to null first
843fn cmp_with_opts<T: Ord>(
844    a: &Option<T>,
845    b: &Option<T>,
846    opt: &Option<SortOptions>,
847) -> std::cmp::Ordering {
848    let opt = opt.unwrap_or_default();
849
850    if let (Some(a), Some(b)) = (a, b) {
851        if opt.descending {
852            b.cmp(a)
853        } else {
854            a.cmp(b)
855        }
856    } else if opt.nulls_first {
857        // now we know at leatst one of them is None
858        // in rust None < Some(_)
859        a.cmp(b)
860    } else {
861        match (a, b) {
862            (Some(a), Some(b)) => a.cmp(b),
863            (Some(_), None) => std::cmp::Ordering::Less,
864            (None, Some(_)) => std::cmp::Ordering::Greater,
865            (None, None) => std::cmp::Ordering::Equal,
866        }
867    }
868}
869
870#[derive(Debug, Clone)]
871struct SortedRunSet<N: Ord> {
872    /// sorted runs with batch corresponding to them
873    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
874    /// sorted column from eval sorting expr
875    sort_column: SortColumn,
876}
877
878/// A struct to represent a successive run in the input iterator
879#[derive(Debug, Clone, PartialEq)]
880struct SucRun<N: Ord> {
881    /// offset of the first element in the run
882    offset: usize,
883    /// length of the run
884    len: usize,
885    /// first non-null value in the run
886    first_val: Option<N>,
887    /// last non-null value in the run
888    last_val: Option<N>,
889}
890
891impl SucRun<Timestamp> {
892    /// Get the time range of the run, which is [min_val, max_val + 1)
893    fn get_time_range(&self) -> Option<TimeRange> {
894        let start = self.first_val.min(self.last_val);
895        let end = self
896            .first_val
897            .max(self.last_val)
898            .map(|i| Timestamp::new(i.value() + 1, i.unit()));
899        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
900    }
901}
902
903/// find all successive runs in the input iterator
904fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
905    iter: T,
906    sort_opts: &Option<SortOptions>,
907) -> Vec<SucRun<N>> {
908    let mut runs = Vec::new();
909    let mut last_value = None;
910    let mut iter_len = None;
911
912    let mut last_offset = 0;
913    let mut first_val: Option<N> = None;
914    let mut last_val: Option<N> = None;
915
916    for (idx, t) in iter {
917        if let Some(last_value) = &last_value {
918            if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
919                // we found a boundary
920                let len = idx - last_offset;
921                let run = SucRun {
922                    offset: last_offset,
923                    len,
924                    first_val,
925                    last_val,
926                };
927                runs.push(run);
928                first_val = None;
929                last_val = None;
930
931                last_offset = idx;
932            }
933        }
934        last_value = Some(t);
935        if let Some(t) = t {
936            first_val = first_val.or(Some(t));
937            last_val = Some(t).or(last_val);
938        }
939        iter_len = Some(idx);
940    }
941    let run = SucRun {
942        offset: last_offset,
943        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
944        first_val,
945        last_val,
946    };
947    runs.push(run);
948
949    runs
950}
951
952/// return a list of non-overlapping (offset, length) which represent sorted runs, and
953/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
954/// Returned runs will be as long as possible, and will not overlap with each other
955fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
956    let ty = sort_column.values.data_type();
957    if let DataType::Timestamp(unit, _) = ty {
958        let array = &sort_column.values;
959        let iter = downcast_ts_array!(
960            array.data_type() => (array_iter_helper, array),
961            _ => internal_err!("Unsupported sort column type: {ty}")?
962        );
963
964        let raw = find_successive_runs(iter, &sort_column.options);
965        let ts_runs = raw
966            .into_iter()
967            .map(|run| SucRun {
968                offset: run.offset,
969                len: run.len,
970                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
971                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
972            })
973            .collect_vec();
974        Ok(ts_runs)
975    } else {
976        Err(DataFusionError::Internal(format!(
977            "Unsupported sort column type: {ty}"
978        )))
979    }
980}
981
982/// Left(`start`) inclusive right(`end`) exclusive,
983///
984/// This is just tuple with extra methods
985#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
986struct TimeRange {
987    start: Timestamp,
988    end: Timestamp,
989}
990
991impl From<&PartitionRange> for TimeRange {
992    fn from(range: &PartitionRange) -> Self {
993        Self::new(range.start, range.end)
994    }
995}
996
997impl From<(Timestamp, Timestamp)> for TimeRange {
998    fn from(range: (Timestamp, Timestamp)) -> Self {
999        Self::new(range.0, range.1)
1000    }
1001}
1002
1003impl From<&(Timestamp, Timestamp)> for TimeRange {
1004    fn from(range: &(Timestamp, Timestamp)) -> Self {
1005        Self::new(range.0, range.1)
1006    }
1007}
1008
1009impl TimeRange {
1010    /// Create a new TimeRange, if start is greater than end, swap them
1011    fn new(start: Timestamp, end: Timestamp) -> Self {
1012        if start > end {
1013            Self {
1014                start: end,
1015                end: start,
1016            }
1017        } else {
1018            Self { start, end }
1019        }
1020    }
1021
1022    fn is_subset(&self, other: &Self) -> bool {
1023        self.start >= other.start && self.end <= other.end
1024    }
1025
1026    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1027    fn is_overlapping(&self, other: &Self) -> bool {
1028        !(self.start >= other.end || self.end <= other.start)
1029    }
1030
1031    fn intersection(&self, other: &Self) -> Option<Self> {
1032        if self.is_overlapping(other) {
1033            Some(Self::new(
1034                self.start.max(other.start),
1035                self.end.min(other.end),
1036            ))
1037        } else {
1038            None
1039        }
1040    }
1041
1042    fn difference(&self, other: &Self) -> Vec<Self> {
1043        if !self.is_overlapping(other) {
1044            vec![*self]
1045        } else {
1046            let mut ret = Vec::new();
1047            if self.start < other.start && self.end > other.end {
1048                ret.push(Self::new(self.start, other.start));
1049                ret.push(Self::new(other.end, self.end));
1050            } else if self.start < other.start {
1051                ret.push(Self::new(self.start, other.start));
1052            } else if self.end > other.end {
1053                ret.push(Self::new(other.end, self.end));
1054            }
1055            ret
1056        }
1057    }
1058}
1059
1060/// split input range by `split_by` range to one, two or three parts.
1061fn split_range_by(
1062    input_range: &TimeRange,
1063    input_parts: &[usize],
1064    split_by: &TimeRange,
1065    split_idx: usize,
1066) -> Vec<Action> {
1067    let mut ret = Vec::new();
1068    if input_range.is_overlapping(split_by) {
1069        let input_parts = input_parts.to_vec();
1070        let new_parts = {
1071            let mut new_parts = input_parts.clone();
1072            new_parts.push(split_idx);
1073            new_parts
1074        };
1075
1076        ret.push(Action::Pop(*input_range));
1077        if let Some(intersection) = input_range.intersection(split_by) {
1078            ret.push(Action::Push(intersection, new_parts.clone()));
1079        }
1080        for diff in input_range.difference(split_by) {
1081            ret.push(Action::Push(diff, input_parts.clone()));
1082        }
1083    }
1084    ret
1085}
1086
1087#[derive(Debug, Clone, PartialEq, Eq)]
1088enum Action {
1089    Pop(TimeRange),
1090    Push(TimeRange, Vec<usize>),
1091}
1092
1093/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1094///
1095/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1096///
1097/// hence we can merge sort current working range once that happens
1098///
1099/// if `descending` is true, the working ranges will be in descending order
1100fn compute_all_working_ranges(
1101    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1102    descending: bool,
1103) -> Vec<(TimeRange, BTreeSet<usize>)> {
1104    let mut ret = Vec::new();
1105    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1106    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1107        Box::new(overlap_counts.iter().rev()) as _
1108    } else {
1109        Box::new(overlap_counts.iter()) as _
1110    };
1111    for (range, set) in overlap_iter {
1112        match &mut cur_range_set {
1113            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1114            Some((working_range, working_set)) => {
1115                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1116                // to be read before merge sorting), and `working_set` have >1 count
1117                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1118                // so that merge sort is possible
1119                let need_expand = {
1120                    let last_part = working_set.last();
1121                    let inter: BTreeSet<usize> = working_set
1122                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1123                        .cloned()
1124                        .collect();
1125                    if let Some(one) = inter.first()
1126                        && inter.len() == 1
1127                        && Some(one) == last_part
1128                    {
1129                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1130                        if set.iter().all(|p| Some(p) >= last_part) {
1131                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1132                            false
1133                        } else {
1134                            // elsewise, we need to expand working range to include next overlap range
1135                            true
1136                        }
1137                    } else if inter.is_empty() {
1138                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1139                        false
1140                    } else {
1141                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1142                        true
1143                    }
1144                };
1145
1146                if need_expand {
1147                    if descending {
1148                        working_range.start = range.start;
1149                    } else {
1150                        working_range.end = range.end;
1151                    }
1152                    working_set.extend(set.iter().cloned());
1153                } else {
1154                    ret.push((*working_range, std::mem::take(working_set)));
1155                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1156                }
1157            }
1158        }
1159    }
1160
1161    if let Some(cur_range_set) = cur_range_set {
1162        ret.push(cur_range_set)
1163    }
1164
1165    ret
1166}
1167
1168/// return a map of non-overlapping ranges and their corresponding index
1169/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1170fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1171    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1172    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1173    for (idx, range) in ranges.iter().enumerate() {
1174        let key: TimeRange = (range.start, range.end).into();
1175        let mut actions = Vec::new();
1176        let mut untouched = vec![key];
1177        // create a forward and backward iterator to find all overlapping ranges
1178        // given that tuple is sorted in lexicographical order and promise to not overlap,
1179        // since range is sorted that way, we can stop when we find a non-overlapping range
1180        let forward_iter = ret
1181            .range(key..)
1182            .take_while(|(range, _)| range.is_overlapping(&key));
1183        let backward_iter = ret
1184            .range(..key)
1185            .rev()
1186            .take_while(|(range, _)| range.is_overlapping(&key));
1187
1188        for (range, parts) in forward_iter.chain(backward_iter) {
1189            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1190            let act = split_range_by(range, parts, &key, idx);
1191            actions.extend(act.into_iter());
1192        }
1193
1194        for action in actions {
1195            match action {
1196                Action::Pop(range) => {
1197                    ret.remove(&range);
1198                }
1199                Action::Push(range, parts) => {
1200                    ret.insert(range, parts);
1201                }
1202            }
1203        }
1204
1205        // insert untouched ranges
1206        for range in untouched {
1207            ret.insert(range, vec![idx]);
1208        }
1209    }
1210    ret
1211}
1212
1213/// Get timestamp from array at offset
1214fn get_timestamp_from_idx(
1215    array: &ArrayRef,
1216    offset: usize,
1217) -> datafusion_common::Result<Option<Timestamp>> {
1218    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1219        unit
1220    } else {
1221        return Err(DataFusionError::Internal(format!(
1222            "Unsupported sort column type: {}",
1223            array.data_type()
1224        )));
1225    };
1226    let ty = array.data_type();
1227    let array = array.slice(offset, 1);
1228    let mut iter = downcast_ts_array!(
1229        array.data_type() => (array_iter_helper, array),
1230        _ => internal_err!("Unsupported sort column type: {ty}")?
1231    );
1232    let (_idx, val) = iter.next().ok_or_else(|| {
1233        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1234    })?;
1235    let val = if let Some(val) = val {
1236        val
1237    } else {
1238        return Ok(None);
1239    };
1240    let gt_timestamp = Timestamp::new(val, time_unit.into());
1241    Ok(Some(gt_timestamp))
1242}
1243
1244#[cfg(test)]
1245mod test {
1246    use std::io::Write;
1247    use std::sync::Arc;
1248
1249    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1250    use arrow::compute::concat_batches;
1251    use arrow::json::ArrayWriter;
1252    use arrow_schema::{Field, Schema, TimeUnit};
1253    use futures::StreamExt;
1254    use pretty_assertions::assert_eq;
1255    use serde_json::json;
1256
1257    use super::*;
1258    use crate::test_util::{new_ts_array, MockInputExec};
1259
1260    #[test]
1261    fn test_overlapping() {
1262        let testcases = [
1263            (
1264                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1265                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1266                false,
1267            ),
1268            (
1269                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1270                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1271                true,
1272            ),
1273            (
1274                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1275                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1276                true,
1277            ),
1278            (
1279                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1280                (
1281                    Timestamp::new_millisecond(1000),
1282                    Timestamp::new_millisecond(1002),
1283                ),
1284                true,
1285            ),
1286            (
1287                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1288                (
1289                    Timestamp::new_millisecond(1001),
1290                    Timestamp::new_millisecond(1002),
1291                ),
1292                false,
1293            ),
1294            (
1295                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1296                (
1297                    Timestamp::new_millisecond(1002),
1298                    Timestamp::new_millisecond(1003),
1299                ),
1300                false,
1301            ),
1302        ];
1303
1304        for (range1, range2, expected) in testcases.iter() {
1305            assert_eq!(
1306                TimeRange::from(range1).is_overlapping(&range2.into()),
1307                *expected,
1308                "range1: {:?}, range2: {:?}",
1309                range1,
1310                range2
1311            );
1312        }
1313    }
1314
1315    #[test]
1316    fn test_split() {
1317        let testcases = [
1318            // no split
1319            (
1320                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1321                vec![0],
1322                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1323                1,
1324                vec![],
1325            ),
1326            // one part
1327            (
1328                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1329                vec![0],
1330                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1331                1,
1332                vec![
1333                    Action::Pop(
1334                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1335                    ),
1336                    Action::Push(
1337                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1338                        vec![0, 1],
1339                    ),
1340                ],
1341            ),
1342            (
1343                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1344                vec![0],
1345                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1346                1,
1347                vec![
1348                    Action::Pop(
1349                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1350                    ),
1351                    Action::Push(
1352                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1353                        vec![0, 1],
1354                    ),
1355                ],
1356            ),
1357            (
1358                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1359                vec![0],
1360                (
1361                    Timestamp::new_millisecond(1000),
1362                    Timestamp::new_millisecond(1002),
1363                ),
1364                1,
1365                vec![
1366                    Action::Pop(
1367                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1368                    ),
1369                    Action::Push(
1370                        (
1371                            Timestamp::new_millisecond(1000),
1372                            Timestamp::new_millisecond(1001),
1373                        )
1374                            .into(),
1375                        vec![0, 1],
1376                    ),
1377                ],
1378            ),
1379            // two part
1380            (
1381                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1382                vec![0],
1383                (
1384                    Timestamp::new_millisecond(1001),
1385                    Timestamp::new_millisecond(1002),
1386                ),
1387                1,
1388                vec![
1389                    Action::Pop(
1390                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1391                    ),
1392                    Action::Push(
1393                        (
1394                            Timestamp::new_millisecond(1001),
1395                            Timestamp::new_millisecond(1002),
1396                        )
1397                            .into(),
1398                        vec![0, 1],
1399                    ),
1400                    Action::Push(
1401                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1402                        vec![0],
1403                    ),
1404                ],
1405            ),
1406            // three part
1407            (
1408                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1409                vec![0],
1410                (
1411                    Timestamp::new_millisecond(1001),
1412                    Timestamp::new_millisecond(1002),
1413                ),
1414                1,
1415                vec![
1416                    Action::Pop(
1417                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1418                    ),
1419                    Action::Push(
1420                        (
1421                            Timestamp::new_millisecond(1001),
1422                            Timestamp::new_millisecond(1002),
1423                        )
1424                            .into(),
1425                        vec![0, 1],
1426                    ),
1427                    Action::Push(
1428                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1429                        vec![0],
1430                    ),
1431                    Action::Push(
1432                        (
1433                            Timestamp::new_millisecond(1002),
1434                            Timestamp::new_millisecond(1004),
1435                        )
1436                            .into(),
1437                        vec![0],
1438                    ),
1439                ],
1440            ),
1441        ];
1442
1443        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1444            assert_eq!(
1445                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1446                *expected,
1447                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1448                range,
1449                parts,
1450                split_by,
1451                split_idx
1452            );
1453        }
1454    }
1455
1456    #[test]
1457    fn test_compute_working_ranges_rev() {
1458        let testcases = vec![
1459            (
1460                BTreeMap::from([(
1461                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1462                    vec![0],
1463                )]),
1464                vec![(
1465                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1466                    BTreeSet::from([0]),
1467                )],
1468            ),
1469            (
1470                BTreeMap::from([(
1471                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1472                    vec![0, 1],
1473                )]),
1474                vec![(
1475                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1476                    BTreeSet::from([0, 1]),
1477                )],
1478            ),
1479            (
1480                BTreeMap::from([
1481                    (
1482                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1483                        vec![0],
1484                    ),
1485                    (
1486                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1487                        vec![0, 1],
1488                    ),
1489                ]),
1490                vec![
1491                    (
1492                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1493                        BTreeSet::from([0]),
1494                    ),
1495                    (
1496                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1497                        BTreeSet::from([0, 1]),
1498                    ),
1499                ],
1500            ),
1501            (
1502                BTreeMap::from([
1503                    (
1504                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1505                        vec![0, 1],
1506                    ),
1507                    (
1508                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1509                        vec![1],
1510                    ),
1511                ]),
1512                vec![
1513                    (
1514                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1515                        BTreeSet::from([0, 1]),
1516                    ),
1517                    (
1518                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1519                        BTreeSet::from([1]),
1520                    ),
1521                ],
1522            ),
1523            (
1524                BTreeMap::from([
1525                    (
1526                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1527                        vec![0],
1528                    ),
1529                    (
1530                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1531                        vec![0, 1],
1532                    ),
1533                    (
1534                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1535                        vec![1],
1536                    ),
1537                ]),
1538                vec![
1539                    (
1540                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1541                        BTreeSet::from([0]),
1542                    ),
1543                    (
1544                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1545                        BTreeSet::from([0, 1]),
1546                    ),
1547                    (
1548                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1549                        BTreeSet::from([1]),
1550                    ),
1551                ],
1552            ),
1553            (
1554                BTreeMap::from([
1555                    (
1556                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1557                        vec![0, 2],
1558                    ),
1559                    (
1560                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1561                        vec![0, 1, 2],
1562                    ),
1563                    (
1564                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1565                        vec![1, 2],
1566                    ),
1567                ]),
1568                vec![(
1569                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1570                    BTreeSet::from([0, 1, 2]),
1571                )],
1572            ),
1573            (
1574                BTreeMap::from([
1575                    (
1576                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1577                        vec![0, 2],
1578                    ),
1579                    (
1580                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1581                        vec![1, 2],
1582                    ),
1583                ]),
1584                vec![(
1585                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1586                    BTreeSet::from([0, 1, 2]),
1587                )],
1588            ),
1589            (
1590                BTreeMap::from([
1591                    (
1592                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1593                        vec![0, 1],
1594                    ),
1595                    (
1596                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1597                        vec![0, 1, 2],
1598                    ),
1599                    (
1600                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1601                        vec![1, 2],
1602                    ),
1603                ]),
1604                vec![(
1605                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1606                    BTreeSet::from([0, 1, 2]),
1607                )],
1608            ),
1609            (
1610                BTreeMap::from([
1611                    (
1612                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1613                        vec![0, 1],
1614                    ),
1615                    (
1616                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1617                        vec![1, 2],
1618                    ),
1619                ]),
1620                vec![
1621                    (
1622                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1623                        BTreeSet::from([0, 1]),
1624                    ),
1625                    (
1626                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1627                        BTreeSet::from([1, 2]),
1628                    ),
1629                ],
1630            ),
1631            // non-overlapping
1632            (
1633                BTreeMap::from([
1634                    (
1635                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1636                        vec![0],
1637                    ),
1638                    (
1639                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1640                        vec![1, 2],
1641                    ),
1642                ]),
1643                vec![
1644                    (
1645                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1646                        BTreeSet::from([0]),
1647                    ),
1648                    (
1649                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1650                        BTreeSet::from([1, 2]),
1651                    ),
1652                ],
1653            ),
1654        ];
1655
1656        for (input, expected) in testcases {
1657            let expected = expected
1658                .into_iter()
1659                .map(|(r, s)| (r.into(), s))
1660                .collect_vec();
1661            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1662            assert_eq!(
1663                compute_all_working_ranges(&input, true),
1664                expected,
1665                "input: {:?}",
1666                input
1667            );
1668        }
1669    }
1670
1671    #[test]
1672    fn test_compute_working_ranges() {
1673        let testcases = vec![
1674            (
1675                BTreeMap::from([(
1676                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1677                    vec![0],
1678                )]),
1679                vec![(
1680                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1681                    BTreeSet::from([0]),
1682                )],
1683            ),
1684            (
1685                BTreeMap::from([(
1686                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1687                    vec![0, 1],
1688                )]),
1689                vec![(
1690                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1691                    BTreeSet::from([0, 1]),
1692                )],
1693            ),
1694            (
1695                BTreeMap::from([
1696                    (
1697                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1698                        vec![0, 1],
1699                    ),
1700                    (
1701                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1702                        vec![1],
1703                    ),
1704                ]),
1705                vec![
1706                    (
1707                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1708                        BTreeSet::from([0, 1]),
1709                    ),
1710                    (
1711                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1712                        BTreeSet::from([1]),
1713                    ),
1714                ],
1715            ),
1716            (
1717                BTreeMap::from([
1718                    (
1719                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1720                        vec![0],
1721                    ),
1722                    (
1723                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1724                        vec![0, 1],
1725                    ),
1726                ]),
1727                vec![
1728                    (
1729                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1730                        BTreeSet::from([0]),
1731                    ),
1732                    (
1733                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1734                        BTreeSet::from([0, 1]),
1735                    ),
1736                ],
1737            ),
1738            // test if only one count in working set get it's own working range
1739            (
1740                BTreeMap::from([
1741                    (
1742                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1743                        vec![0],
1744                    ),
1745                    (
1746                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1747                        vec![0, 1],
1748                    ),
1749                    (
1750                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1751                        vec![1],
1752                    ),
1753                ]),
1754                vec![
1755                    (
1756                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1757                        BTreeSet::from([0]),
1758                    ),
1759                    (
1760                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1761                        BTreeSet::from([0, 1]),
1762                    ),
1763                    (
1764                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1765                        BTreeSet::from([1]),
1766                    ),
1767                ],
1768            ),
1769            (
1770                BTreeMap::from([
1771                    (
1772                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1773                        vec![0, 2],
1774                    ),
1775                    (
1776                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1777                        vec![0, 1, 2],
1778                    ),
1779                    (
1780                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1781                        vec![1, 2],
1782                    ),
1783                ]),
1784                vec![(
1785                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1786                    BTreeSet::from([0, 1, 2]),
1787                )],
1788            ),
1789            (
1790                BTreeMap::from([
1791                    (
1792                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1793                        vec![0, 2],
1794                    ),
1795                    (
1796                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1797                        vec![1, 2],
1798                    ),
1799                ]),
1800                vec![(
1801                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1802                    BTreeSet::from([0, 1, 2]),
1803                )],
1804            ),
1805            (
1806                BTreeMap::from([
1807                    (
1808                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1809                        vec![0, 1],
1810                    ),
1811                    (
1812                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1813                        vec![0, 1, 2],
1814                    ),
1815                    (
1816                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1817                        vec![1, 2],
1818                    ),
1819                ]),
1820                vec![(
1821                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1822                    BTreeSet::from([0, 1, 2]),
1823                )],
1824            ),
1825            (
1826                BTreeMap::from([
1827                    (
1828                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1829                        vec![0, 1],
1830                    ),
1831                    (
1832                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1833                        vec![1, 2],
1834                    ),
1835                ]),
1836                vec![
1837                    (
1838                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1839                        BTreeSet::from([0, 1]),
1840                    ),
1841                    (
1842                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1843                        BTreeSet::from([1, 2]),
1844                    ),
1845                ],
1846            ),
1847            // non-overlapping
1848            (
1849                BTreeMap::from([
1850                    (
1851                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1852                        vec![0, 1],
1853                    ),
1854                    (
1855                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1856                        vec![2],
1857                    ),
1858                ]),
1859                vec![
1860                    (
1861                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1862                        BTreeSet::from([0, 1]),
1863                    ),
1864                    (
1865                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1866                        BTreeSet::from([2]),
1867                    ),
1868                ],
1869            ),
1870        ];
1871
1872        for (input, expected) in testcases {
1873            let expected = expected
1874                .into_iter()
1875                .map(|(r, s)| (r.into(), s))
1876                .collect_vec();
1877            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1878            assert_eq!(
1879                compute_all_working_ranges(&input, false),
1880                expected,
1881                "input: {:?}",
1882                input
1883            );
1884        }
1885    }
1886
1887    #[test]
1888    fn test_split_overlap_range() {
1889        let testcases = vec![
1890            // simple one range
1891            (
1892                vec![PartitionRange {
1893                    start: Timestamp::new_second(1),
1894                    end: Timestamp::new_second(2),
1895                    num_rows: 2,
1896                    identifier: 0,
1897                }],
1898                BTreeMap::from_iter(
1899                    vec![(
1900                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1901                        vec![0],
1902                    )]
1903                    .into_iter(),
1904                ),
1905            ),
1906            // two overlapping range
1907            (
1908                vec![
1909                    PartitionRange {
1910                        start: Timestamp::new_second(1),
1911                        end: Timestamp::new_second(2),
1912                        num_rows: 2,
1913                        identifier: 0,
1914                    },
1915                    PartitionRange {
1916                        start: Timestamp::new_second(1),
1917                        end: Timestamp::new_second(2),
1918                        num_rows: 2,
1919                        identifier: 1,
1920                    },
1921                ],
1922                BTreeMap::from_iter(
1923                    vec![(
1924                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1925                        vec![0, 1],
1926                    )]
1927                    .into_iter(),
1928                ),
1929            ),
1930            (
1931                vec![
1932                    PartitionRange {
1933                        start: Timestamp::new_second(1),
1934                        end: Timestamp::new_second(3),
1935                        num_rows: 2,
1936                        identifier: 0,
1937                    },
1938                    PartitionRange {
1939                        start: Timestamp::new_second(2),
1940                        end: Timestamp::new_second(4),
1941                        num_rows: 2,
1942                        identifier: 1,
1943                    },
1944                ],
1945                BTreeMap::from_iter(
1946                    vec![
1947                        (
1948                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1949                            vec![0],
1950                        ),
1951                        (
1952                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1953                            vec![0, 1],
1954                        ),
1955                        (
1956                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1957                            vec![1],
1958                        ),
1959                    ]
1960                    .into_iter(),
1961                ),
1962            ),
1963            // three or more overlapping range
1964            (
1965                vec![
1966                    PartitionRange {
1967                        start: Timestamp::new_second(1),
1968                        end: Timestamp::new_second(3),
1969                        num_rows: 2,
1970                        identifier: 0,
1971                    },
1972                    PartitionRange {
1973                        start: Timestamp::new_second(2),
1974                        end: Timestamp::new_second(4),
1975                        num_rows: 2,
1976                        identifier: 1,
1977                    },
1978                    PartitionRange {
1979                        start: Timestamp::new_second(1),
1980                        end: Timestamp::new_second(4),
1981                        num_rows: 2,
1982                        identifier: 2,
1983                    },
1984                ],
1985                BTreeMap::from_iter(
1986                    vec![
1987                        (
1988                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1989                            vec![0, 2],
1990                        ),
1991                        (
1992                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1993                            vec![0, 1, 2],
1994                        ),
1995                        (
1996                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1997                            vec![1, 2],
1998                        ),
1999                    ]
2000                    .into_iter(),
2001                ),
2002            ),
2003            (
2004                vec![
2005                    PartitionRange {
2006                        start: Timestamp::new_second(1),
2007                        end: Timestamp::new_second(3),
2008                        num_rows: 2,
2009                        identifier: 0,
2010                    },
2011                    PartitionRange {
2012                        start: Timestamp::new_second(1),
2013                        end: Timestamp::new_second(4),
2014                        num_rows: 2,
2015                        identifier: 1,
2016                    },
2017                    PartitionRange {
2018                        start: Timestamp::new_second(2),
2019                        end: Timestamp::new_second(4),
2020                        num_rows: 2,
2021                        identifier: 2,
2022                    },
2023                ],
2024                BTreeMap::from_iter(
2025                    vec![
2026                        (
2027                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2028                            vec![0, 1],
2029                        ),
2030                        (
2031                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2032                            vec![0, 1, 2],
2033                        ),
2034                        (
2035                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2036                            vec![1, 2],
2037                        ),
2038                    ]
2039                    .into_iter(),
2040                ),
2041            ),
2042        ];
2043
2044        for (input, expected) in testcases {
2045            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2046            assert_eq!(split_overlapping_ranges(&input), expected);
2047        }
2048    }
2049
2050    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2051        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2052            Self {
2053                offset: offset as usize,
2054                len: len as usize,
2055                first_val: min_val,
2056                last_val: max_val,
2057            }
2058        }
2059    }
2060
2061    #[test]
2062    fn test_find_successive_runs() {
2063        let testcases = vec![
2064            (
2065                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2066                Some(SortOptions {
2067                    descending: false,
2068                    nulls_first: false,
2069                }),
2070                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2071            ),
2072            (
2073                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2074                Some(SortOptions {
2075                    descending: false,
2076                    nulls_first: false,
2077                }),
2078                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2079            ),
2080            (
2081                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2082                Some(SortOptions {
2083                    descending: false,
2084                    nulls_first: false,
2085                }),
2086                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2087            ),
2088            (
2089                vec![Some(1), Some(2), Some(1), Some(3)],
2090                Some(SortOptions {
2091                    descending: false,
2092                    nulls_first: false,
2093                }),
2094                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2095            ),
2096            (
2097                vec![Some(1), Some(2), Some(1), Some(3)],
2098                Some(SortOptions {
2099                    descending: true,
2100                    nulls_first: false,
2101                }),
2102                vec![
2103                    (0, 1, Some(1), Some(1)),
2104                    (1, 2, Some(2), Some(1)),
2105                    (3, 1, Some(3), Some(3)),
2106                ],
2107            ),
2108            (
2109                vec![Some(1), Some(2), None, Some(3)],
2110                Some(SortOptions {
2111                    descending: false,
2112                    nulls_first: true,
2113                }),
2114                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2115            ),
2116            (
2117                vec![Some(1), Some(2), None, Some(3)],
2118                Some(SortOptions {
2119                    descending: false,
2120                    nulls_first: false,
2121                }),
2122                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2123            ),
2124            (
2125                vec![Some(2), Some(1), None, Some(3)],
2126                Some(SortOptions {
2127                    descending: true,
2128                    nulls_first: true,
2129                }),
2130                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2131            ),
2132            (
2133                vec![],
2134                Some(SortOptions {
2135                    descending: false,
2136                    nulls_first: true,
2137                }),
2138                vec![(0, 0, None, None)],
2139            ),
2140            (
2141                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2142                Some(SortOptions {
2143                    descending: true,
2144                    nulls_first: true,
2145                }),
2146                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2147            ),
2148            (
2149                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2150                Some(SortOptions {
2151                    descending: true,
2152                    nulls_first: false,
2153                }),
2154                vec![
2155                    (0, 2, None, None),
2156                    (2, 3, Some(2), Some(1)),
2157                    (5, 2, Some(5), Some(4)),
2158                ],
2159            ),
2160        ];
2161        for (input, sort_opts, expected) in testcases {
2162            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2163            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2164            assert_eq!(
2165                ret, expected,
2166                "input: {:?}, opt: {:?},expected: {:?}",
2167                input, sort_opts, expected
2168            );
2169        }
2170    }
2171
2172    #[test]
2173    fn test_cmp_with_opts() {
2174        let testcases = vec![
2175            (
2176                Some(1),
2177                Some(2),
2178                Some(SortOptions {
2179                    descending: false,
2180                    nulls_first: false,
2181                }),
2182                std::cmp::Ordering::Less,
2183            ),
2184            (
2185                Some(1),
2186                Some(2),
2187                Some(SortOptions {
2188                    descending: true,
2189                    nulls_first: false,
2190                }),
2191                std::cmp::Ordering::Greater,
2192            ),
2193            (
2194                Some(1),
2195                None,
2196                Some(SortOptions {
2197                    descending: false,
2198                    nulls_first: true,
2199                }),
2200                std::cmp::Ordering::Greater,
2201            ),
2202            (
2203                Some(1),
2204                None,
2205                Some(SortOptions {
2206                    descending: true,
2207                    nulls_first: true,
2208                }),
2209                std::cmp::Ordering::Greater,
2210            ),
2211            (
2212                Some(1),
2213                None,
2214                Some(SortOptions {
2215                    descending: true,
2216                    nulls_first: false,
2217                }),
2218                std::cmp::Ordering::Less,
2219            ),
2220            (
2221                Some(1),
2222                None,
2223                Some(SortOptions {
2224                    descending: false,
2225                    nulls_first: false,
2226                }),
2227                std::cmp::Ordering::Less,
2228            ),
2229            (
2230                None,
2231                None,
2232                Some(SortOptions {
2233                    descending: true,
2234                    nulls_first: true,
2235                }),
2236                std::cmp::Ordering::Equal,
2237            ),
2238            (
2239                None,
2240                None,
2241                Some(SortOptions {
2242                    descending: false,
2243                    nulls_first: true,
2244                }),
2245                std::cmp::Ordering::Equal,
2246            ),
2247            (
2248                None,
2249                None,
2250                Some(SortOptions {
2251                    descending: true,
2252                    nulls_first: false,
2253                }),
2254                std::cmp::Ordering::Equal,
2255            ),
2256            (
2257                None,
2258                None,
2259                Some(SortOptions {
2260                    descending: false,
2261                    nulls_first: false,
2262                }),
2263                std::cmp::Ordering::Equal,
2264            ),
2265        ];
2266        for (a, b, opts, expected) in testcases {
2267            assert_eq!(
2268                cmp_with_opts(&a, &b, &opts),
2269                expected,
2270                "a: {:?}, b: {:?}, opts: {:?}",
2271                a,
2272                b,
2273                opts
2274            );
2275        }
2276    }
2277
2278    #[test]
2279    fn test_find_slice_from_range() {
2280        let test_cases = vec![
2281            // test for off by one case
2282            (
2283                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2284                false,
2285                TimeRange {
2286                    start: Timestamp::new_millisecond(2),
2287                    end: Timestamp::new_millisecond(4),
2288                },
2289                Ok((1, 2)),
2290            ),
2291            (
2292                Arc::new(TimestampMillisecondArray::from_iter_values([
2293                    -2, -1, 0, 1, 2, 3, 4, 5,
2294                ])) as ArrayRef,
2295                false,
2296                TimeRange {
2297                    start: Timestamp::new_millisecond(-1),
2298                    end: Timestamp::new_millisecond(4),
2299                },
2300                Ok((1, 5)),
2301            ),
2302            (
2303                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2304                false,
2305                TimeRange {
2306                    start: Timestamp::new_millisecond(2),
2307                    end: Timestamp::new_millisecond(5),
2308                },
2309                Ok((1, 2)),
2310            ),
2311            (
2312                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2313                false,
2314                TimeRange {
2315                    start: Timestamp::new_millisecond(2),
2316                    end: Timestamp::new_millisecond(5),
2317                },
2318                Ok((1, 3)),
2319            ),
2320            (
2321                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2322                false,
2323                TimeRange {
2324                    start: Timestamp::new_millisecond(2),
2325                    end: Timestamp::new_millisecond(5),
2326                },
2327                Ok((1, 2)),
2328            ),
2329            (
2330                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2331                false,
2332                TimeRange {
2333                    start: Timestamp::new_millisecond(6),
2334                    end: Timestamp::new_millisecond(7),
2335                },
2336                Ok((5, 0)),
2337            ),
2338            // descending off by one cases
2339            (
2340                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2341                true,
2342                TimeRange {
2343                    end: Timestamp::new_millisecond(4),
2344                    start: Timestamp::new_millisecond(1),
2345                },
2346                Ok((1, 3)),
2347            ),
2348            (
2349                Arc::new(TimestampMillisecondArray::from_iter_values([
2350                    5, 4, 3, 2, 1, 0,
2351                ])) as ArrayRef,
2352                true,
2353                TimeRange {
2354                    end: Timestamp::new_millisecond(4),
2355                    start: Timestamp::new_millisecond(1),
2356                },
2357                Ok((2, 3)),
2358            ),
2359            (
2360                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2361                true,
2362                TimeRange {
2363                    end: Timestamp::new_millisecond(4),
2364                    start: Timestamp::new_millisecond(1),
2365                },
2366                Ok((1, 2)),
2367            ),
2368            (
2369                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2370                true,
2371                TimeRange {
2372                    end: Timestamp::new_millisecond(4),
2373                    start: Timestamp::new_millisecond(1),
2374                },
2375                Ok((2, 2)),
2376            ),
2377            (
2378                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2379                true,
2380                TimeRange {
2381                    end: Timestamp::new_millisecond(5),
2382                    start: Timestamp::new_millisecond(2),
2383                },
2384                Ok((1, 3)),
2385            ),
2386            (
2387                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2388                true,
2389                TimeRange {
2390                    end: Timestamp::new_millisecond(5),
2391                    start: Timestamp::new_millisecond(2),
2392                },
2393                Ok((1, 2)),
2394            ),
2395            (
2396                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2397                true,
2398                TimeRange {
2399                    end: Timestamp::new_millisecond(5),
2400                    start: Timestamp::new_millisecond(2),
2401                },
2402                Ok((1, 3)),
2403            ),
2404            (
2405                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2406                true,
2407                TimeRange {
2408                    end: Timestamp::new_millisecond(5),
2409                    start: Timestamp::new_millisecond(2),
2410                },
2411                Ok((1, 2)),
2412            ),
2413            (
2414                Arc::new(TimestampMillisecondArray::from_iter_values([
2415                    10, 9, 8, 7, 6,
2416                ])) as ArrayRef,
2417                true,
2418                TimeRange {
2419                    end: Timestamp::new_millisecond(5),
2420                    start: Timestamp::new_millisecond(2),
2421                },
2422                Ok((5, 0)),
2423            ),
2424            // test off by one case
2425            (
2426                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2427                true,
2428                TimeRange {
2429                    end: Timestamp::new_millisecond(4),
2430                    start: Timestamp::new_millisecond(3),
2431                },
2432                Ok((0, 1)),
2433            ),
2434            (
2435                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2436                true,
2437                TimeRange {
2438                    end: Timestamp::new_millisecond(4),
2439                    start: Timestamp::new_millisecond(3),
2440                },
2441                Ok((1, 1)),
2442            ),
2443            (
2444                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2445                true,
2446                TimeRange {
2447                    end: Timestamp::new_millisecond(4),
2448                    start: Timestamp::new_millisecond(3),
2449                },
2450                Ok((2, 1)),
2451            ),
2452        ];
2453
2454        for (sort_vals, descending, range, expected) in test_cases {
2455            let sort_column = SortColumn {
2456                values: sort_vals,
2457                options: Some(SortOptions {
2458                    descending,
2459                    ..Default::default()
2460                }),
2461            };
2462            let ret = find_slice_from_range(&sort_column, &range);
2463            match (ret, expected) {
2464                (Ok(ret), Ok(expected)) => {
2465                    assert_eq!(
2466                        ret, expected,
2467                        "sort_vals: {:?}, range: {:?}",
2468                        sort_column, range
2469                    )
2470                }
2471                (Err(err), Err(expected)) => {
2472                    let expected: &str = expected;
2473                    assert!(
2474                        err.to_string().contains(expected),
2475                        "err: {:?}, expected: {:?}",
2476                        err,
2477                        expected
2478                    );
2479                }
2480                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2481            }
2482        }
2483    }
2484
2485    #[derive(Debug)]
2486    struct TestStream {
2487        expression: PhysicalSortExpr,
2488        fetch: Option<usize>,
2489        input: Vec<(PartitionRange, DfRecordBatch)>,
2490        output: Vec<DfRecordBatch>,
2491        schema: SchemaRef,
2492    }
2493    use datafusion::physical_plan::expressions::Column;
2494    impl TestStream {
2495        fn new(
2496            ts_col: Column,
2497            opt: SortOptions,
2498            fetch: Option<usize>,
2499            schema: impl Into<arrow_schema::Fields>,
2500            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2501            expected: Vec<Vec<ArrayRef>>,
2502        ) -> Self {
2503            let expression = PhysicalSortExpr {
2504                expr: Arc::new(ts_col),
2505                options: opt,
2506            };
2507            let schema = Schema::new(schema.into());
2508            let schema = Arc::new(schema);
2509            let input = input
2510                .into_iter()
2511                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2512                .collect_vec();
2513            let output_batchs = expected
2514                .into_iter()
2515                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2516                .collect_vec();
2517            Self {
2518                expression,
2519                fetch,
2520                input,
2521                output: output_batchs,
2522                schema,
2523            }
2524        }
2525
2526        async fn run_test(&self) -> Vec<DfRecordBatch> {
2527            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2528
2529            let mock_input = MockInputExec::new(batches, self.schema.clone());
2530
2531            let exec = WindowedSortExec::try_new(
2532                self.expression.clone(),
2533                self.fetch,
2534                vec![ranges],
2535                Arc::new(mock_input),
2536            )
2537            .unwrap();
2538
2539            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2540
2541            let real_output = exec_stream.collect::<Vec<_>>().await;
2542            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2543            real_output
2544        }
2545    }
2546
2547    #[tokio::test]
2548    async fn test_window_sort_stream() {
2549        let test_cases = [
2550            TestStream::new(
2551                Column::new("ts", 0),
2552                SortOptions {
2553                    descending: false,
2554                    nulls_first: true,
2555                },
2556                None,
2557                vec![Field::new(
2558                    "ts",
2559                    DataType::Timestamp(TimeUnit::Millisecond, None),
2560                    false,
2561                )],
2562                vec![],
2563                vec![],
2564            ),
2565            TestStream::new(
2566                Column::new("ts", 0),
2567                SortOptions {
2568                    descending: false,
2569                    nulls_first: true,
2570                },
2571                None,
2572                vec![Field::new(
2573                    "ts",
2574                    DataType::Timestamp(TimeUnit::Millisecond, None),
2575                    false,
2576                )],
2577                vec![
2578                    // test one empty
2579                    (
2580                        PartitionRange {
2581                            start: Timestamp::new_millisecond(1),
2582                            end: Timestamp::new_millisecond(2),
2583                            num_rows: 1,
2584                            identifier: 0,
2585                        },
2586                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2587                    ),
2588                    (
2589                        PartitionRange {
2590                            start: Timestamp::new_millisecond(1),
2591                            end: Timestamp::new_millisecond(3),
2592                            num_rows: 1,
2593                            identifier: 0,
2594                        },
2595                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2596                    ),
2597                ],
2598                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2599                    [2],
2600                ))]],
2601            ),
2602            TestStream::new(
2603                Column::new("ts", 0),
2604                SortOptions {
2605                    descending: false,
2606                    nulls_first: true,
2607                },
2608                None,
2609                vec![Field::new(
2610                    "ts",
2611                    DataType::Timestamp(TimeUnit::Millisecond, None),
2612                    false,
2613                )],
2614                vec![
2615                    // test one empty
2616                    (
2617                        PartitionRange {
2618                            start: Timestamp::new_millisecond(1),
2619                            end: Timestamp::new_millisecond(2),
2620                            num_rows: 1,
2621                            identifier: 0,
2622                        },
2623                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2624                    ),
2625                    (
2626                        PartitionRange {
2627                            start: Timestamp::new_millisecond(1),
2628                            end: Timestamp::new_millisecond(3),
2629                            num_rows: 1,
2630                            identifier: 0,
2631                        },
2632                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2633                    ),
2634                ],
2635                vec![],
2636            ),
2637            TestStream::new(
2638                Column::new("ts", 0),
2639                SortOptions {
2640                    descending: false,
2641                    nulls_first: true,
2642                },
2643                None,
2644                vec![Field::new(
2645                    "ts",
2646                    DataType::Timestamp(TimeUnit::Millisecond, None),
2647                    false,
2648                )],
2649                vec![
2650                    // test indistinguishable case
2651                    // we can't know whether `2` belong to which range
2652                    (
2653                        PartitionRange {
2654                            start: Timestamp::new_millisecond(1),
2655                            end: Timestamp::new_millisecond(2),
2656                            num_rows: 1,
2657                            identifier: 0,
2658                        },
2659                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2660                    ),
2661                    (
2662                        PartitionRange {
2663                            start: Timestamp::new_millisecond(1),
2664                            end: Timestamp::new_millisecond(3),
2665                            num_rows: 1,
2666                            identifier: 0,
2667                        },
2668                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2669                    ),
2670                ],
2671                vec![
2672                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2673                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2674                ],
2675            ),
2676            TestStream::new(
2677                Column::new("ts", 0),
2678                SortOptions {
2679                    descending: false,
2680                    nulls_first: true,
2681                },
2682                None,
2683                vec![Field::new(
2684                    "ts",
2685                    DataType::Timestamp(TimeUnit::Millisecond, None),
2686                    false,
2687                )],
2688                vec![
2689                    // test direct emit
2690                    (
2691                        PartitionRange {
2692                            start: Timestamp::new_millisecond(1),
2693                            end: Timestamp::new_millisecond(3),
2694                            num_rows: 1,
2695                            identifier: 0,
2696                        },
2697                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2698                            1, 2,
2699                        ]))],
2700                    ),
2701                    (
2702                        PartitionRange {
2703                            start: Timestamp::new_millisecond(1),
2704                            end: Timestamp::new_millisecond(4),
2705                            num_rows: 1,
2706                            identifier: 0,
2707                        },
2708                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2709                            2, 3,
2710                        ]))],
2711                    ),
2712                ],
2713                vec![
2714                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2715                        1, 2,
2716                    ]))],
2717                    // didn't trigger a merge sort/concat here so this is it
2718                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2719                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2720                ],
2721            ),
2722            TestStream::new(
2723                Column::new("ts", 0),
2724                SortOptions {
2725                    descending: false,
2726                    nulls_first: true,
2727                },
2728                None,
2729                vec![Field::new(
2730                    "ts",
2731                    DataType::Timestamp(TimeUnit::Millisecond, None),
2732                    false,
2733                )],
2734                vec![
2735                    // test more of cross working range batch intersection
2736                    (
2737                        PartitionRange {
2738                            start: Timestamp::new_millisecond(1),
2739                            end: Timestamp::new_millisecond(3),
2740                            num_rows: 1,
2741                            identifier: 0,
2742                        },
2743                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2744                            1, 2,
2745                        ]))],
2746                    ),
2747                    (
2748                        PartitionRange {
2749                            start: Timestamp::new_millisecond(1),
2750                            end: Timestamp::new_millisecond(4),
2751                            num_rows: 1,
2752                            identifier: 1,
2753                        },
2754                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2755                            1, 2, 3,
2756                        ]))],
2757                    ),
2758                ],
2759                vec![
2760                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2761                        1, 1, 2, 2,
2762                    ]))],
2763                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2764                ],
2765            ),
2766            TestStream::new(
2767                Column::new("ts", 0),
2768                SortOptions {
2769                    descending: false,
2770                    nulls_first: true,
2771                },
2772                None,
2773                vec![Field::new(
2774                    "ts",
2775                    DataType::Timestamp(TimeUnit::Millisecond, None),
2776                    false,
2777                )],
2778                vec![
2779                    // no overlap, empty intersection batch case
2780                    (
2781                        PartitionRange {
2782                            start: Timestamp::new_millisecond(1),
2783                            end: Timestamp::new_millisecond(3),
2784                            num_rows: 1,
2785                            identifier: 0,
2786                        },
2787                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2788                            1, 2,
2789                        ]))],
2790                    ),
2791                    (
2792                        PartitionRange {
2793                            start: Timestamp::new_millisecond(1),
2794                            end: Timestamp::new_millisecond(4),
2795                            num_rows: 1,
2796                            identifier: 1,
2797                        },
2798                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2799                            1, 2, 3,
2800                        ]))],
2801                    ),
2802                    (
2803                        PartitionRange {
2804                            start: Timestamp::new_millisecond(4),
2805                            end: Timestamp::new_millisecond(6),
2806                            num_rows: 1,
2807                            identifier: 1,
2808                        },
2809                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2810                            4, 5,
2811                        ]))],
2812                    ),
2813                ],
2814                vec![
2815                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2816                        1, 1, 2, 2,
2817                    ]))],
2818                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2819                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2820                        4, 5,
2821                    ]))],
2822                ],
2823            ),
2824            // test fetch
2825            TestStream::new(
2826                Column::new("ts", 0),
2827                SortOptions {
2828                    descending: false,
2829                    nulls_first: true,
2830                },
2831                Some(6),
2832                vec![Field::new(
2833                    "ts",
2834                    DataType::Timestamp(TimeUnit::Millisecond, None),
2835                    false,
2836                )],
2837                vec![
2838                    // no overlap, empty intersection batch case
2839                    (
2840                        PartitionRange {
2841                            start: Timestamp::new_millisecond(1),
2842                            end: Timestamp::new_millisecond(3),
2843                            num_rows: 1,
2844                            identifier: 0,
2845                        },
2846                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2847                            1, 2,
2848                        ]))],
2849                    ),
2850                    (
2851                        PartitionRange {
2852                            start: Timestamp::new_millisecond(1),
2853                            end: Timestamp::new_millisecond(4),
2854                            num_rows: 1,
2855                            identifier: 1,
2856                        },
2857                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2858                            1, 2, 3,
2859                        ]))],
2860                    ),
2861                    (
2862                        PartitionRange {
2863                            start: Timestamp::new_millisecond(3),
2864                            end: Timestamp::new_millisecond(6),
2865                            num_rows: 1,
2866                            identifier: 1,
2867                        },
2868                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2869                            4, 5,
2870                        ]))],
2871                    ),
2872                ],
2873                vec![
2874                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2875                        1, 1, 2, 2,
2876                    ]))],
2877                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2878                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2879                ],
2880            ),
2881            TestStream::new(
2882                Column::new("ts", 0),
2883                SortOptions {
2884                    descending: false,
2885                    nulls_first: true,
2886                },
2887                Some(3),
2888                vec![Field::new(
2889                    "ts",
2890                    DataType::Timestamp(TimeUnit::Millisecond, None),
2891                    false,
2892                )],
2893                vec![
2894                    // no overlap, empty intersection batch case
2895                    (
2896                        PartitionRange {
2897                            start: Timestamp::new_millisecond(1),
2898                            end: Timestamp::new_millisecond(3),
2899                            num_rows: 1,
2900                            identifier: 0,
2901                        },
2902                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2903                            1, 2,
2904                        ]))],
2905                    ),
2906                    (
2907                        PartitionRange {
2908                            start: Timestamp::new_millisecond(1),
2909                            end: Timestamp::new_millisecond(4),
2910                            num_rows: 1,
2911                            identifier: 1,
2912                        },
2913                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2914                            1, 2, 3,
2915                        ]))],
2916                    ),
2917                    (
2918                        PartitionRange {
2919                            start: Timestamp::new_millisecond(3),
2920                            end: Timestamp::new_millisecond(6),
2921                            num_rows: 1,
2922                            identifier: 1,
2923                        },
2924                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2925                            4, 5,
2926                        ]))],
2927                    ),
2928                ],
2929                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2930                    [1, 1, 2],
2931                ))]],
2932            ),
2933            // rev case
2934            TestStream::new(
2935                Column::new("ts", 0),
2936                SortOptions {
2937                    descending: true,
2938                    nulls_first: true,
2939                },
2940                None,
2941                vec![Field::new(
2942                    "ts",
2943                    DataType::Timestamp(TimeUnit::Millisecond, None),
2944                    false,
2945                )],
2946                vec![
2947                    // reverse order
2948                    (
2949                        PartitionRange {
2950                            start: Timestamp::new_millisecond(3),
2951                            end: Timestamp::new_millisecond(6),
2952                            num_rows: 1,
2953                            identifier: 1,
2954                        },
2955                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2956                            5, 4,
2957                        ]))],
2958                    ),
2959                    (
2960                        PartitionRange {
2961                            start: Timestamp::new_millisecond(1),
2962                            end: Timestamp::new_millisecond(4),
2963                            num_rows: 1,
2964                            identifier: 1,
2965                        },
2966                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2967                            3, 2, 1,
2968                        ]))],
2969                    ),
2970                    (
2971                        PartitionRange {
2972                            start: Timestamp::new_millisecond(1),
2973                            end: Timestamp::new_millisecond(3),
2974                            num_rows: 1,
2975                            identifier: 0,
2976                        },
2977                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2978                            2, 1,
2979                        ]))],
2980                    ),
2981                ],
2982                vec![
2983                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2984                        5, 4,
2985                    ]))],
2986                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2987                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2988                        2, 2, 1, 1,
2989                    ]))],
2990                ],
2991            ),
2992            TestStream::new(
2993                Column::new("ts", 0),
2994                SortOptions {
2995                    descending: false,
2996                    nulls_first: true,
2997                },
2998                None,
2999                vec![Field::new(
3000                    "ts",
3001                    DataType::Timestamp(TimeUnit::Millisecond, None),
3002                    false,
3003                )],
3004                vec![
3005                    // long have subset short run case
3006                    (
3007                        PartitionRange {
3008                            start: Timestamp::new_millisecond(1),
3009                            end: Timestamp::new_millisecond(10),
3010                            num_rows: 1,
3011                            identifier: 0,
3012                        },
3013                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3014                            1, 5, 9,
3015                        ]))],
3016                    ),
3017                    (
3018                        PartitionRange {
3019                            start: Timestamp::new_millisecond(3),
3020                            end: Timestamp::new_millisecond(7),
3021                            num_rows: 1,
3022                            identifier: 1,
3023                        },
3024                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3025                            3, 4, 5, 6,
3026                        ]))],
3027                    ),
3028                ],
3029                vec![
3030                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3031                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3032                        3, 4, 5, 5, 6, 9,
3033                    ]))],
3034                ],
3035            ),
3036            TestStream::new(
3037                Column::new("ts", 0),
3038                SortOptions {
3039                    descending: false,
3040                    nulls_first: true,
3041                },
3042                None,
3043                vec![Field::new(
3044                    "ts",
3045                    DataType::Timestamp(TimeUnit::Millisecond, None),
3046                    false,
3047                )],
3048                vec![
3049                    // complex overlap
3050                    (
3051                        PartitionRange {
3052                            start: Timestamp::new_millisecond(1),
3053                            end: Timestamp::new_millisecond(3),
3054                            num_rows: 1,
3055                            identifier: 0,
3056                        },
3057                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3058                            1, 2,
3059                        ]))],
3060                    ),
3061                    (
3062                        PartitionRange {
3063                            start: Timestamp::new_millisecond(1),
3064                            end: Timestamp::new_millisecond(10),
3065                            num_rows: 1,
3066                            identifier: 1,
3067                        },
3068                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3069                            1, 3, 4, 5, 6, 8,
3070                        ]))],
3071                    ),
3072                    (
3073                        PartitionRange {
3074                            start: Timestamp::new_millisecond(7),
3075                            end: Timestamp::new_millisecond(10),
3076                            num_rows: 1,
3077                            identifier: 1,
3078                        },
3079                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3080                            7, 8, 9,
3081                        ]))],
3082                    ),
3083                ],
3084                vec![
3085                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3086                        1, 1, 2,
3087                    ]))],
3088                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3089                        3, 4, 5, 6,
3090                    ]))],
3091                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3092                        7, 8, 8, 9,
3093                    ]))],
3094                ],
3095            ),
3096            TestStream::new(
3097                Column::new("ts", 0),
3098                SortOptions {
3099                    descending: false,
3100                    nulls_first: true,
3101                },
3102                None,
3103                vec![Field::new(
3104                    "ts",
3105                    DataType::Timestamp(TimeUnit::Millisecond, None),
3106                    false,
3107                )],
3108                vec![
3109                    // complex subset with having same datapoint
3110                    (
3111                        PartitionRange {
3112                            start: Timestamp::new_millisecond(1),
3113                            end: Timestamp::new_millisecond(11),
3114                            num_rows: 1,
3115                            identifier: 0,
3116                        },
3117                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3118                            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3119                        ]))],
3120                    ),
3121                    (
3122                        PartitionRange {
3123                            start: Timestamp::new_millisecond(5),
3124                            end: Timestamp::new_millisecond(7),
3125                            num_rows: 1,
3126                            identifier: 1,
3127                        },
3128                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3129                            5, 6,
3130                        ]))],
3131                    ),
3132                ],
3133                vec![
3134                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3135                        1, 2, 3, 4,
3136                    ]))],
3137                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3138                        5, 5, 6, 6, 7, 8, 9, 10,
3139                    ]))],
3140                ],
3141            ),
3142        ];
3143
3144        let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3145
3146        for (idx, testcase) in &indexed_test_cases {
3147            let output = testcase.run_test().await;
3148            assert_eq!(output, testcase.output, "case {idx} failed.");
3149        }
3150    }
3151
3152    #[tokio::test]
3153    async fn fuzzy_ish_test_window_sort_stream() {
3154        let test_cnt = 100;
3155        let part_cnt_bound = 100;
3156        let range_size_bound = 100;
3157        let range_offset_bound = 100;
3158        let in_range_datapoint_cnt_bound = 100;
3159        let fetch_bound = 100;
3160
3161        let mut rng = fastrand::Rng::new();
3162        let rng_seed = rng.u64(..);
3163        rng.seed(rng_seed);
3164        let mut bound_val = None;
3165        // construct testcases
3166        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3167        let mut full_testcase_list = Vec::new();
3168        for _case_id in 0..test_cnt {
3169            let descending = rng.bool();
3170            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3171                if descending {
3172                    return Box::new(|a: &T, b: &T| b.cmp(a));
3173                }
3174                Box::new(|a: &T, b: &T| a.cmp(b))
3175            }
3176            let unit = match rng.u8(0..3) {
3177                0 => TimeUnit::Second,
3178                1 => TimeUnit::Millisecond,
3179                2 => TimeUnit::Microsecond,
3180                _ => TimeUnit::Nanosecond,
3181            };
3182            let fetch = if rng.bool() {
3183                Some(rng.usize(0..fetch_bound))
3184            } else {
3185                None
3186            };
3187
3188            let mut input_ranged_data = vec![];
3189            let mut output_data: Vec<i64> = vec![];
3190            // generate input data
3191            for part_id in 0..rng.usize(0..part_cnt_bound) {
3192                let (start, end) = if descending {
3193                    let end = bound_val
3194                        .map(|i| i - rng.i64(0..range_offset_bound))
3195                        .unwrap_or_else(|| rng.i64(..));
3196                    bound_val = Some(end);
3197                    let start = end - rng.i64(1..range_size_bound);
3198                    let start = Timestamp::new(start, unit.into());
3199                    let end = Timestamp::new(end, unit.into());
3200                    (start, end)
3201                } else {
3202                    let start = bound_val
3203                        .map(|i| i + rng.i64(0..range_offset_bound))
3204                        .unwrap_or_else(|| rng.i64(..));
3205                    bound_val = Some(start);
3206                    let end = start + rng.i64(1..range_size_bound);
3207                    let start = Timestamp::new(start, unit.into());
3208                    let end = Timestamp::new(end, unit.into());
3209                    (start, end)
3210                };
3211
3212                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3213                let data_gen = iter
3214                    .map(|_| rng.i64(start.value()..end.value()))
3215                    .sorted_by(ret_cmp_fn(descending))
3216                    .collect_vec();
3217                output_data.extend(data_gen.clone());
3218                let arr = new_ts_array(unit, data_gen);
3219                let range = PartitionRange {
3220                    start,
3221                    end,
3222                    num_rows: arr.len(),
3223                    identifier: part_id,
3224                };
3225                input_ranged_data.push((range, vec![arr]));
3226            }
3227
3228            output_data.sort_by(ret_cmp_fn(descending));
3229            if let Some(fetch) = fetch {
3230                output_data.truncate(fetch);
3231            }
3232            let output_arr = new_ts_array(unit, output_data);
3233
3234            let test_stream = TestStream::new(
3235                Column::new("ts", 0),
3236                SortOptions {
3237                    descending,
3238                    nulls_first: true,
3239                },
3240                fetch,
3241                vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3242                input_ranged_data.clone(),
3243                vec![vec![output_arr]],
3244            );
3245            full_testcase_list.push(test_stream);
3246        }
3247
3248        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3249            let res = test_stream.run_test().await;
3250            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3251            let expected = test_stream.output;
3252            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3253
3254            if res_concat != expected_concat {
3255                {
3256                    let mut f_input = std::io::stderr();
3257                    f_input.write_all(b"[").unwrap();
3258                    for (input_range, input_arr) in test_stream.input {
3259                        let range_json = json!({
3260                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3261                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3262                            "num_rows": input_range.num_rows,
3263                            "identifier": input_range.identifier,
3264                        });
3265                        let buf = Vec::new();
3266                        let mut input_writer = ArrayWriter::new(buf);
3267                        input_writer.write(&input_arr).unwrap();
3268                        input_writer.finish().unwrap();
3269                        let res_str =
3270                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3271                        let whole_json =
3272                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3273                        f_input.write_all(whole_json.as_bytes()).unwrap();
3274                    }
3275                    f_input.write_all(b"]").unwrap();
3276                }
3277                {
3278                    let mut f_res = std::io::stderr();
3279                    f_res.write_all(b"[").unwrap();
3280                    for batch in &res {
3281                        let mut res_writer = ArrayWriter::new(f_res);
3282                        res_writer.write(batch).unwrap();
3283                        res_writer.finish().unwrap();
3284                        f_res = res_writer.into_inner();
3285                        f_res.write_all(b",").unwrap();
3286                    }
3287                    f_res.write_all(b"]").unwrap();
3288
3289                    let f_res_concat = std::io::stderr();
3290                    let mut res_writer = ArrayWriter::new(f_res_concat);
3291                    res_writer.write(&res_concat).unwrap();
3292                    res_writer.finish().unwrap();
3293
3294                    let f_expected = std::io::stderr();
3295                    let mut expected_writer = ArrayWriter::new(f_expected);
3296                    expected_writer.write(&expected_concat).unwrap();
3297                    expected_writer.finish().unwrap();
3298                }
3299                panic!(
3300                    "case failed, case id: {0}, output and expected output to stderr",
3301                    case_id
3302                );
3303            }
3304            assert_eq!(
3305                res_concat, expected_concat,
3306                "case failed, case id: {}, rng seed: {}",
3307                case_id, rng_seed
3308            );
3309        }
3310    }
3311}