query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::slice::from_ref;
22use std::sync::Arc;
23use std::task::{Context, Poll};
24
25use arrow::array::{Array, ArrayRef};
26use arrow::compute::SortColumn;
27use arrow_schema::{DataType, SchemaRef, SortOptions};
28use common_error::ext::{BoxedError, PlainError};
29use common_error::status_code::StatusCode;
30use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
31use common_telemetry::error;
32use common_time::Timestamp;
33use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
34use datafusion::execution::{RecordBatchStream, TaskContext};
35use datafusion::physical_plan::memory::MemoryStream;
36use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
37use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
38use datafusion::physical_plan::{
39    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
40};
41use datafusion_common::utils::bisect;
42use datafusion_common::{DataFusionError, internal_err};
43use datafusion_physical_expr::PhysicalSortExpr;
44use datatypes::value::Value;
45use futures::Stream;
46use itertools::Itertools;
47use snafu::ResultExt;
48use store_api::region_engine::PartitionRange;
49
50use crate::error::{QueryExecutionSnafu, Result};
51
52/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
53/// merge sort them whenever possible, and emit the sorted result as soon as possible.
54/// This sorting plan only accept sort by ts and will not sort by other fields.
55///
56/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
57///
58/// # Invariant Promise on Input Stream
59/// 1. The input stream must be sorted by timestamp and
60/// 2. in the order of `PartitionRange` in `ranges`
61/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
62/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
63///
64///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
65
66#[derive(Debug, Clone)]
67pub struct WindowedSortExec {
68    /// Physical sort expressions(that is, sort by timestamp)
69    expression: PhysicalSortExpr,
70    /// Optional number of rows to fetch. Stops producing rows after this fetch
71    fetch: Option<usize>,
72    /// The input ranges indicate input stream will be composed of those ranges in given order.
73    ///
74    /// Each partition has one vector of `PartitionRange`.
75    ranges: Vec<Vec<PartitionRange>>,
76    /// All available working ranges and their corresponding working set
77    ///
78    /// working ranges promise once input stream get a value out of current range, future values will never
79    /// be in this range. Each partition has one vector of ranges.
80    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
81    input: Arc<dyn ExecutionPlan>,
82    /// Execution metrics
83    metrics: ExecutionPlanMetricsSet,
84    properties: PlanProperties,
85}
86
87fn check_partition_range_monotonicity(
88    ranges: &[Vec<PartitionRange>],
89    descending: bool,
90) -> Result<()> {
91    let is_valid = ranges.iter().all(|r| {
92        if descending {
93            r.windows(2).all(|w| w[0].end >= w[1].end)
94        } else {
95            r.windows(2).all(|w| w[0].start <= w[1].start)
96        }
97    });
98
99    if !is_valid {
100        let msg = if descending {
101            "Input `PartitionRange`s's upper bound is not monotonic non-increase"
102        } else {
103            "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
104        };
105        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
106        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
107    } else {
108        Ok(())
109    }
110}
111
112impl WindowedSortExec {
113    pub fn try_new(
114        expression: PhysicalSortExpr,
115        fetch: Option<usize>,
116        ranges: Vec<Vec<PartitionRange>>,
117        input: Arc<dyn ExecutionPlan>,
118    ) -> Result<Self> {
119        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
120
121        let mut eq_properties = input.equivalence_properties().clone();
122        eq_properties.reorder(vec![expression.clone()])?;
123
124        let properties = input.properties();
125        let properties = PlanProperties::new(
126            eq_properties,
127            input.output_partitioning().clone(),
128            properties.emission_type,
129            properties.boundedness,
130        );
131
132        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
133        for r in &ranges {
134            let overlap_counts = split_overlapping_ranges(r);
135            let working_ranges =
136                compute_all_working_ranges(&overlap_counts, expression.options.descending);
137            all_avail_working_range.push(working_ranges);
138        }
139
140        Ok(Self {
141            expression,
142            fetch,
143            ranges,
144            all_avail_working_range,
145            input,
146            metrics: ExecutionPlanMetricsSet::new(),
147            properties,
148        })
149    }
150
151    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
152    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
153    /// of working set, we can merge results before whenever possible.
154    pub fn to_stream(
155        &self,
156        context: Arc<TaskContext>,
157        partition: usize,
158    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
159        let input_stream: DfSendableRecordBatchStream =
160            self.input.execute(partition, context.clone())?;
161
162        let df_stream = Box::pin(WindowedSortStream::new(
163            context,
164            self,
165            input_stream,
166            partition,
167        )) as _;
168
169        Ok(df_stream)
170    }
171}
172
173impl DisplayAs for WindowedSortExec {
174    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
175        write!(
176            f,
177            "WindowedSortExec: expr={} num_ranges={}",
178            self.expression,
179            self.ranges.len()
180        )?;
181        if let Some(fetch) = self.fetch {
182            write!(f, " fetch={}", fetch)?;
183        }
184        Ok(())
185    }
186}
187
188impl ExecutionPlan for WindowedSortExec {
189    fn as_any(&self) -> &dyn Any {
190        self
191    }
192
193    fn schema(&self) -> SchemaRef {
194        self.input.schema()
195    }
196
197    fn properties(&self) -> &PlanProperties {
198        &self.properties
199    }
200
201    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
202        vec![&self.input]
203    }
204
205    fn with_new_children(
206        self: Arc<Self>,
207        children: Vec<Arc<dyn ExecutionPlan>>,
208    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
209        let new_input = if let Some(first) = children.first() {
210            first
211        } else {
212            internal_err!("No children found")?
213        };
214        let new = Self::try_new(
215            self.expression.clone(),
216            self.fetch,
217            self.ranges.clone(),
218            new_input.clone(),
219        )?;
220        Ok(Arc::new(new))
221    }
222
223    fn execute(
224        &self,
225        partition: usize,
226        context: Arc<TaskContext>,
227    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
228        self.to_stream(context, partition)
229    }
230
231    fn metrics(&self) -> Option<MetricsSet> {
232        Some(self.metrics.clone_inner())
233    }
234
235    /// # Explain
236    ///
237    /// This plan needs to be executed on each partition independently,
238    /// and is expected to run directly on storage engine's output
239    /// distribution / partition.
240    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
241        vec![false]
242    }
243
244    fn name(&self) -> &str {
245        "WindowedSortExec"
246    }
247}
248
249/// The core logic of merging sort multiple sorted ranges
250///
251/// the flow of data is:
252/// ```md
253/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
254/// ```
255pub struct WindowedSortStream {
256    /// Memory pool for this stream
257    memory_pool: Arc<dyn MemoryPool>,
258    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
259    in_progress: Vec<DfRecordBatch>,
260    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
261    last_value: Option<Timestamp>,
262    /// Current working set of `PartitionRange` sorted RecordBatches
263    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
264    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
265    merge_stream: VecDeque<DfSendableRecordBatchStream>,
266    /// The number of times merge sort has been called
267    merge_count: usize,
268    /// Index into current `working_range` in `all_avail_working_range`
269    working_idx: usize,
270    /// input stream assumed reading in order of `PartitionRange`
271    input: DfSendableRecordBatchStream,
272    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
273    is_terminated: bool,
274    /// Output Schema, which is the same as input schema, since this is a sort plan
275    schema: SchemaRef,
276    /// Physical sort expressions(that is, sort by timestamp)
277    expression: PhysicalSortExpr,
278    /// Optional number of rows to fetch. Stops producing rows after this fetch
279    fetch: Option<usize>,
280    /// number of rows produced
281    produced: usize,
282    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
283    batch_size: usize,
284    /// All available working ranges and their corresponding working set
285    ///
286    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
287    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
288    /// The input partition ranges
289    #[allow(dead_code)] // this is used under #[debug_assertions]
290    ranges: Vec<PartitionRange>,
291    /// Execution metrics
292    metrics: BaselineMetrics,
293}
294
295impl WindowedSortStream {
296    pub fn new(
297        context: Arc<TaskContext>,
298        exec: &WindowedSortExec,
299        input: DfSendableRecordBatchStream,
300        partition: usize,
301    ) -> Self {
302        Self {
303            memory_pool: context.runtime_env().memory_pool.clone(),
304            in_progress: Vec::new(),
305            last_value: None,
306            sorted_input_runs: Vec::new(),
307            merge_stream: VecDeque::new(),
308            merge_count: 0,
309            working_idx: 0,
310            schema: input.schema(),
311            input,
312            is_terminated: false,
313            expression: exec.expression.clone(),
314            fetch: exec.fetch,
315            produced: 0,
316            batch_size: context.session_config().batch_size(),
317            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
318            ranges: exec.ranges[partition].clone(),
319            metrics: BaselineMetrics::new(&exec.metrics, partition),
320        }
321    }
322}
323
324impl WindowedSortStream {
325    #[cfg(debug_assertions)]
326    fn check_subset_ranges(&self, cur_range: &TimeRange) {
327        let cur_is_subset_to = self
328            .ranges
329            .iter()
330            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
331            .collect_vec();
332        if cur_is_subset_to.is_empty() {
333            error!("Current range is not a subset of any PartitionRange");
334            // found in all ranges that are subset of current range
335            let subset_ranges = self
336                .ranges
337                .iter()
338                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
339                .collect_vec();
340            let only_overlap = self
341                .ranges
342                .iter()
343                .filter(|r| {
344                    let r = TimeRange::from(*r);
345                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
346                })
347                .collect_vec();
348            error!(
349                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
350                subset_ranges.len(),
351                only_overlap.len(),
352                subset_ranges,
353                only_overlap
354            );
355        } else {
356            let only_overlap = self
357                .ranges
358                .iter()
359                .filter(|r| {
360                    let r = TimeRange::from(*r);
361                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
362                })
363                .collect_vec();
364            error!(
365                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
366                cur_is_subset_to.len(),
367                only_overlap.len(),
368                cur_is_subset_to,
369                only_overlap
370            );
371        }
372        let all_overlap_working_range = self
373            .all_avail_working_range
374            .iter()
375            .filter(|(range, _)| range.is_overlapping(cur_range))
376            .map(|(range, _)| range)
377            .collect_vec();
378        error!(
379            "Found {} working ranges that overlap with current range: {:?}",
380            all_overlap_working_range.len(),
381            all_overlap_working_range
382        );
383    }
384
385    /// Poll the next RecordBatch from the merge-sort's output stream
386    fn poll_result_stream(
387        &mut self,
388        cx: &mut Context<'_>,
389    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
390        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
391            match merge_stream.as_mut().poll_next(cx) {
392                Poll::Ready(Some(Ok(batch))) => {
393                    let ret = if let Some(remaining) = self.remaining_fetch() {
394                        if remaining == 0 {
395                            self.is_terminated = true;
396                            None
397                        } else if remaining < batch.num_rows() {
398                            self.produced += remaining;
399                            Some(Ok(batch.slice(0, remaining)))
400                        } else {
401                            self.produced += batch.num_rows();
402                            Some(Ok(batch))
403                        }
404                    } else {
405                        self.produced += batch.num_rows();
406                        Some(Ok(batch))
407                    };
408                    return Poll::Ready(ret);
409                }
410                Poll::Ready(Some(Err(e))) => {
411                    return Poll::Ready(Some(Err(e)));
412                }
413                Poll::Ready(None) => {
414                    // current merge stream is done, we can start polling the next one
415
416                    self.merge_stream.pop_front();
417                    continue;
418                }
419                Poll::Pending => {
420                    return Poll::Pending;
421                }
422            }
423        }
424        // if no output stream is available
425        Poll::Ready(None)
426    }
427
428    /// The core logic of merging sort multiple sorted ranges
429    ///
430    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
431    pub fn poll_next_inner(
432        mut self: Pin<&mut Self>,
433        cx: &mut Context<'_>,
434    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
435        // first check and send out the merge result
436        match self.poll_result_stream(cx) {
437            Poll::Ready(None) => {
438                if self.is_terminated {
439                    return Poll::Ready(None);
440                }
441            }
442            x => return x,
443        };
444
445        // consume input stream
446        while !self.is_terminated {
447            // then we get a new RecordBatch from input stream
448            let SortedRunSet {
449                runs_with_batch,
450                sort_column,
451            } = match self.input.as_mut().poll_next(cx) {
452                Poll::Ready(Some(Ok(batch))) => split_batch_to_sorted_run(batch, &self.expression)?,
453                Poll::Ready(Some(Err(e))) => {
454                    return Poll::Ready(Some(Err(e)));
455                }
456                Poll::Ready(None) => {
457                    // input stream is done, we need to merge sort the remaining working set
458                    self.is_terminated = true;
459                    self.build_sorted_stream()?;
460                    self.start_new_merge_sort()?;
461                    break;
462                }
463                Poll::Pending => return Poll::Pending,
464            };
465
466            // The core logic to eargerly merge sort the working set
467
468            // compare with last_value to find boundary, then merge runs if needed
469
470            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
471            let mut last_remaining = None;
472            let mut run_iter = runs_with_batch.into_iter();
473            loop {
474                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
475                    break;
476                };
477                if sorted_rb.num_rows() == 0 {
478                    continue;
479                }
480                // determine if this batch is in current working range
481                let Some(cur_range) = run_info.get_time_range() else {
482                    internal_err!("Found NULL in time index column")?
483                };
484                let Some(working_range) = self.get_working_range() else {
485                    internal_err!("No working range found")?
486                };
487
488                // ensure the current batch is in the working range
489                if sort_column.options.unwrap_or_default().descending {
490                    if cur_range.end > working_range.end {
491                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
492                        #[cfg(debug_assertions)]
493                        self.check_subset_ranges(&cur_range);
494                        internal_err!(
495                            "Current batch have data on the right side of working range, something is very wrong"
496                        )?;
497                    }
498                } else if cur_range.start < working_range.start {
499                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
500                    #[cfg(debug_assertions)]
501                    self.check_subset_ranges(&cur_range);
502                    internal_err!(
503                        "Current batch have data on the left side of working range, something is very wrong"
504                    )?;
505                }
506
507                if cur_range.is_subset(&working_range) {
508                    // data still in range, can't merge sort yet
509                    // see if can concat entire sorted rb, merge sort need to wait
510                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
511                } else if let Some(intersection) = cur_range.intersection(&working_range) {
512                    // slice rb by intersection and concat it then merge sort
513                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
514                    let (offset, len) = find_slice_from_range(
515                        &SortColumn {
516                            values: cur_sort_column.clone(),
517                            options: sort_column.options,
518                        },
519                        &intersection,
520                    )?;
521
522                    if offset != 0 {
523                        internal_err!(
524                            "Current batch have data on the left side of working range, something is very wrong"
525                        )?;
526                    }
527
528                    let sliced_rb = sorted_rb.slice(offset, len);
529
530                    // try to concat the sliced input batch to the current `in_progress` run
531                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
532                    // since no more sorted data in this range will come in now, build stream now
533                    self.build_sorted_stream()?;
534
535                    // since we are crossing working range, we need to merge sort the working set
536                    self.start_new_merge_sort()?;
537
538                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
539                    if r_len != 0 {
540                        // we have remaining data in this batch, put it back to input queue
541                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
542                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
543                        let new_run_info = SucRun {
544                            offset: run_info.offset + r_offset,
545                            len: r_len,
546                            first_val: new_first_val,
547                            last_val: run_info.last_val,
548                        };
549                        last_remaining = Some((remaining_rb, new_run_info));
550                    }
551                    // deal with remaining batch cross working range problem
552                    // i.e: this example require more slice, and we are currently at point A
553                    // |---1---|       |---3---|
554                    // |-------A--2------------|
555                    //  put the remaining batch back to iter and deal it in next loop
556                } else {
557                    // no overlap, we can merge sort the working set
558
559                    self.build_sorted_stream()?;
560                    self.start_new_merge_sort()?;
561
562                    // always put it back to input queue until some batch is in working range
563                    last_remaining = Some((sorted_rb, run_info));
564                }
565            }
566
567            // poll result stream again to see if we can emit more results
568            match self.poll_result_stream(cx) {
569                Poll::Ready(None) => {
570                    if self.is_terminated {
571                        return Poll::Ready(None);
572                    }
573                }
574                x => return x,
575            };
576        }
577        // emit the merge result after terminated(all input stream is done)
578        self.poll_result_stream(cx)
579    }
580
581    fn push_batch(&mut self, batch: DfRecordBatch) {
582        self.in_progress.push(batch);
583    }
584
585    /// Try to concat the input batch to the current `in_progress` run
586    ///
587    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
588    fn try_concat_batch(
589        &mut self,
590        batch: DfRecordBatch,
591        run_info: &SucRun<Timestamp>,
592        opt: Option<SortOptions>,
593    ) -> datafusion_common::Result<()> {
594        let is_ok_to_concat =
595            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
596
597        if is_ok_to_concat {
598            self.push_batch(batch);
599            // next time we get input batch might still be ordered, so not build stream yet
600        } else {
601            // no more sorted data, build stream now
602            self.build_sorted_stream()?;
603            self.push_batch(batch);
604        }
605        self.last_value = run_info.last_val;
606        Ok(())
607    }
608
609    /// Get the current working range
610    fn get_working_range(&self) -> Option<TimeRange> {
611        self.all_avail_working_range
612            .get(self.working_idx)
613            .map(|(range, _)| *range)
614    }
615
616    /// Set current working range to the next working range
617    fn set_next_working_range(&mut self) {
618        self.working_idx += 1;
619    }
620
621    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
622    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
623        if self.in_progress.is_empty() {
624            return Ok(());
625        }
626        let data = std::mem::take(&mut self.in_progress);
627
628        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
629        self.sorted_input_runs.push(Box::pin(new_stream));
630        Ok(())
631    }
632
633    /// Start merging sort the current working set
634    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
635        if !self.in_progress.is_empty() {
636            return internal_err!("Starting a merge sort when in_progress is not empty")?;
637        }
638
639        self.set_next_working_range();
640
641        let streams = std::mem::take(&mut self.sorted_input_runs);
642        if streams.is_empty() {
643            return Ok(());
644        } else if streams.len() == 1 {
645            self.merge_stream
646                .push_back(streams.into_iter().next().unwrap());
647            return Ok(());
648        }
649
650        let fetch = self.remaining_fetch();
651        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
652            .register(&self.memory_pool);
653        self.merge_count += 1;
654
655        let resulting_stream = StreamingMergeBuilder::new()
656            .with_streams(streams)
657            .with_schema(self.schema())
658            .with_expressions(&[self.expression.clone()].into())
659            .with_metrics(self.metrics.clone())
660            .with_batch_size(self.batch_size)
661            .with_fetch(fetch)
662            .with_reservation(reservation)
663            .build()?;
664        self.merge_stream.push_back(resulting_stream);
665        // this working range is done, move to next working range
666        Ok(())
667    }
668
669    /// Remaining number of rows to fetch, if no fetch limit, return None
670    /// if fetch limit is reached, return Some(0)
671    fn remaining_fetch(&self) -> Option<usize> {
672        let total_now = self.produced;
673        self.fetch.map(|p| p.saturating_sub(total_now))
674    }
675}
676
677impl Stream for WindowedSortStream {
678    type Item = datafusion_common::Result<DfRecordBatch>;
679
680    fn poll_next(
681        mut self: Pin<&mut Self>,
682        cx: &mut Context<'_>,
683    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
684        let result = self.as_mut().poll_next_inner(cx);
685        self.metrics.record_poll(result)
686    }
687}
688
689impl RecordBatchStream for WindowedSortStream {
690    fn schema(&self) -> SchemaRef {
691        self.schema.clone()
692    }
693}
694
695/// split batch to sorted runs
696fn split_batch_to_sorted_run(
697    batch: DfRecordBatch,
698    expression: &PhysicalSortExpr,
699) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
700    // split input rb to sorted runs
701    let sort_column = expression.evaluate_to_sort_column(&batch)?;
702    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
703    if let Some(run) = sorted_runs_offset.first()
704        && sorted_runs_offset.len() == 1
705    {
706        if !(run.offset == 0 && run.len == batch.num_rows()) {
707            internal_err!(
708                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
709                run.offset,
710                run.len,
711                batch.num_rows()
712            )?;
713        }
714        // input rb is already sorted, we can emit it directly
715        Ok(SortedRunSet {
716            runs_with_batch: vec![(batch, run.clone())],
717            sort_column,
718        })
719    } else {
720        // those slice should be zero copy, so supposedly no new reservation needed
721        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
722        for run in sorted_runs_offset {
723            if run.offset + run.len > batch.num_rows() {
724                internal_err!(
725                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
726                    run.offset,
727                    run.len,
728                    batch.num_rows()
729                )?;
730            }
731            let new_rb = batch.slice(run.offset, run.len);
732            ret.push((new_rb, run));
733        }
734        Ok(SortedRunSet {
735            runs_with_batch: ret,
736            sort_column,
737        })
738    }
739}
740
741/// Downcast a temporal array to a specific type
742///
743/// usage similar to `downcast_primitive!` in `arrow-array` crate
744#[macro_export]
745macro_rules! downcast_ts_array {
746    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
747    {
748        match $data_type {
749            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
750                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
751            }
752            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
753                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
754            }
755            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
756                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
757            }
758            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
759                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
760            }
761            $($p => $fallback,)*
762        }
763    };
764}
765
766/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
767///
768/// Return the offset and length of the slice
769fn find_slice_from_range(
770    sort_column: &SortColumn,
771    range: &TimeRange,
772) -> datafusion_common::Result<(usize, usize)> {
773    let ty = sort_column.values.data_type();
774    let time_unit = if let DataType::Timestamp(unit, _) = ty {
775        unit
776    } else {
777        return Err(DataFusionError::Internal(format!(
778            "Unsupported sort column type: {}",
779            sort_column.values.data_type()
780        )));
781    };
782    let array = &sort_column.values;
783    let opt = &sort_column.options.unwrap_or_default();
784    let descending = opt.descending;
785
786    let typed_sorted_range = [range.start, range.end]
787        .iter()
788        .map(|t| {
789            t.convert_to(time_unit.into())
790                .ok_or_else(|| {
791                    DataFusionError::Internal(format!(
792                        "Failed to convert timestamp from {:?} to {:?}",
793                        t.unit(),
794                        time_unit
795                    ))
796                })
797                .and_then(|typed_ts| {
798                    let value = Value::Timestamp(typed_ts);
799                    value
800                        .try_to_scalar_value(&value.data_type())
801                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
802                })
803        })
804        .try_collect::<_, Vec<_>, _>()?;
805
806    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
807
808    // get slice that in which all data that `min_val<=data<max_val`
809    let (start, end) = if descending {
810        // note that `data < max_val`
811        // i,e, for max_val = 4, array = [5,3,2] should be start=1
812        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
813        let start = bisect::<false>(from_ref(array), from_ref(&max_val), &[*opt])?;
814        // min_val = 1, array = [3, 2, 1, 0], end = 3
815        // min_val = 1, array = [3, 2, 0], end = 2
816        let end = bisect::<false>(from_ref(array), from_ref(&min_val), &[*opt])?;
817        (start, end)
818    } else {
819        // min_val = 1, array = [1, 2, 3], start = 0
820        // min_val = 1, array = [0, 2, 3], start = 1
821        let start = bisect::<true>(from_ref(array), from_ref(&min_val), &[*opt])?;
822        // max_val = 3, array = [1, 3, 4], end = 1
823        // max_val = 3, array = [1, 2, 4], end = 2
824        let end = bisect::<true>(from_ref(array), from_ref(&max_val), &[*opt])?;
825        (start, end)
826    };
827
828    Ok((start, end - start))
829}
830
831/// Get an iterator from a primitive array.
832///
833/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
834#[macro_export]
835macro_rules! array_iter_helper {
836    ($t:ty, $unit:expr, $arr:expr) => {{
837        let typed = $arr
838            .as_any()
839            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
840            .unwrap();
841        let iter = typed.iter().enumerate();
842        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
843    }};
844}
845
846/// Compare with options, note None is considered as NULL here
847///
848/// default to null first
849fn cmp_with_opts<T: Ord>(
850    a: &Option<T>,
851    b: &Option<T>,
852    opt: &Option<SortOptions>,
853) -> std::cmp::Ordering {
854    let opt = opt.unwrap_or_default();
855
856    if let (Some(a), Some(b)) = (a, b) {
857        if opt.descending { b.cmp(a) } else { a.cmp(b) }
858    } else if opt.nulls_first {
859        // now we know at leatst one of them is None
860        // in rust None < Some(_)
861        a.cmp(b)
862    } else {
863        match (a, b) {
864            (Some(a), Some(b)) => a.cmp(b),
865            (Some(_), None) => std::cmp::Ordering::Less,
866            (None, Some(_)) => std::cmp::Ordering::Greater,
867            (None, None) => std::cmp::Ordering::Equal,
868        }
869    }
870}
871
872#[derive(Debug, Clone)]
873struct SortedRunSet<N: Ord> {
874    /// sorted runs with batch corresponding to them
875    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
876    /// sorted column from eval sorting expr
877    sort_column: SortColumn,
878}
879
880/// A struct to represent a successive run in the input iterator
881#[derive(Debug, Clone, PartialEq)]
882struct SucRun<N: Ord> {
883    /// offset of the first element in the run
884    offset: usize,
885    /// length of the run
886    len: usize,
887    /// first non-null value in the run
888    first_val: Option<N>,
889    /// last non-null value in the run
890    last_val: Option<N>,
891}
892
893impl SucRun<Timestamp> {
894    /// Get the time range of the run, which is [min_val, max_val + 1)
895    fn get_time_range(&self) -> Option<TimeRange> {
896        let start = self.first_val.min(self.last_val);
897        let end = self
898            .first_val
899            .max(self.last_val)
900            .map(|i| Timestamp::new(i.value() + 1, i.unit()));
901        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
902    }
903}
904
905/// find all successive runs in the input iterator
906fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
907    iter: T,
908    sort_opts: &Option<SortOptions>,
909) -> Vec<SucRun<N>> {
910    let mut runs = Vec::new();
911    let mut last_value = None;
912    let mut iter_len = None;
913
914    let mut last_offset = 0;
915    let mut first_val: Option<N> = None;
916    let mut last_val: Option<N> = None;
917
918    for (idx, t) in iter {
919        if let Some(last_value) = &last_value
920            && cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater
921        {
922            // we found a boundary
923            let len = idx - last_offset;
924            let run = SucRun {
925                offset: last_offset,
926                len,
927                first_val,
928                last_val,
929            };
930            runs.push(run);
931            first_val = None;
932            last_val = None;
933
934            last_offset = idx;
935        }
936        last_value = Some(t);
937        if let Some(t) = t {
938            first_val = first_val.or(Some(t));
939            last_val = Some(t).or(last_val);
940        }
941        iter_len = Some(idx);
942    }
943    let run = SucRun {
944        offset: last_offset,
945        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
946        first_val,
947        last_val,
948    };
949    runs.push(run);
950
951    runs
952}
953
954/// return a list of non-overlapping (offset, length) which represent sorted runs, and
955/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
956/// Returned runs will be as long as possible, and will not overlap with each other
957fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
958    let ty = sort_column.values.data_type();
959    if let DataType::Timestamp(unit, _) = ty {
960        let array = &sort_column.values;
961        let iter = downcast_ts_array!(
962            array.data_type() => (array_iter_helper, array),
963            _ => internal_err!("Unsupported sort column type: {ty}")?
964        );
965
966        let raw = find_successive_runs(iter, &sort_column.options);
967        let ts_runs = raw
968            .into_iter()
969            .map(|run| SucRun {
970                offset: run.offset,
971                len: run.len,
972                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
973                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
974            })
975            .collect_vec();
976        Ok(ts_runs)
977    } else {
978        Err(DataFusionError::Internal(format!(
979            "Unsupported sort column type: {ty}"
980        )))
981    }
982}
983
984/// Left(`start`) inclusive right(`end`) exclusive,
985///
986/// This is just tuple with extra methods
987#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
988struct TimeRange {
989    start: Timestamp,
990    end: Timestamp,
991}
992
993impl From<&PartitionRange> for TimeRange {
994    fn from(range: &PartitionRange) -> Self {
995        Self::new(range.start, range.end)
996    }
997}
998
999impl From<(Timestamp, Timestamp)> for TimeRange {
1000    fn from(range: (Timestamp, Timestamp)) -> Self {
1001        Self::new(range.0, range.1)
1002    }
1003}
1004
1005impl From<&(Timestamp, Timestamp)> for TimeRange {
1006    fn from(range: &(Timestamp, Timestamp)) -> Self {
1007        Self::new(range.0, range.1)
1008    }
1009}
1010
1011impl TimeRange {
1012    /// Create a new TimeRange, if start is greater than end, swap them
1013    fn new(start: Timestamp, end: Timestamp) -> Self {
1014        if start > end {
1015            Self {
1016                start: end,
1017                end: start,
1018            }
1019        } else {
1020            Self { start, end }
1021        }
1022    }
1023
1024    fn is_subset(&self, other: &Self) -> bool {
1025        self.start >= other.start && self.end <= other.end
1026    }
1027
1028    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1029    fn is_overlapping(&self, other: &Self) -> bool {
1030        !(self.start >= other.end || self.end <= other.start)
1031    }
1032
1033    fn intersection(&self, other: &Self) -> Option<Self> {
1034        if self.is_overlapping(other) {
1035            Some(Self::new(
1036                self.start.max(other.start),
1037                self.end.min(other.end),
1038            ))
1039        } else {
1040            None
1041        }
1042    }
1043
1044    fn difference(&self, other: &Self) -> Vec<Self> {
1045        if !self.is_overlapping(other) {
1046            vec![*self]
1047        } else {
1048            let mut ret = Vec::new();
1049            if self.start < other.start && self.end > other.end {
1050                ret.push(Self::new(self.start, other.start));
1051                ret.push(Self::new(other.end, self.end));
1052            } else if self.start < other.start {
1053                ret.push(Self::new(self.start, other.start));
1054            } else if self.end > other.end {
1055                ret.push(Self::new(other.end, self.end));
1056            }
1057            ret
1058        }
1059    }
1060}
1061
1062/// split input range by `split_by` range to one, two or three parts.
1063fn split_range_by(
1064    input_range: &TimeRange,
1065    input_parts: &[usize],
1066    split_by: &TimeRange,
1067    split_idx: usize,
1068) -> Vec<Action> {
1069    let mut ret = Vec::new();
1070    if input_range.is_overlapping(split_by) {
1071        let input_parts = input_parts.to_vec();
1072        let new_parts = {
1073            let mut new_parts = input_parts.clone();
1074            new_parts.push(split_idx);
1075            new_parts
1076        };
1077
1078        ret.push(Action::Pop(*input_range));
1079        if let Some(intersection) = input_range.intersection(split_by) {
1080            ret.push(Action::Push(intersection, new_parts.clone()));
1081        }
1082        for diff in input_range.difference(split_by) {
1083            ret.push(Action::Push(diff, input_parts.clone()));
1084        }
1085    }
1086    ret
1087}
1088
1089#[derive(Debug, Clone, PartialEq, Eq)]
1090enum Action {
1091    Pop(TimeRange),
1092    Push(TimeRange, Vec<usize>),
1093}
1094
1095/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1096///
1097/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1098///
1099/// hence we can merge sort current working range once that happens
1100///
1101/// if `descending` is true, the working ranges will be in descending order
1102fn compute_all_working_ranges(
1103    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1104    descending: bool,
1105) -> Vec<(TimeRange, BTreeSet<usize>)> {
1106    let mut ret = Vec::new();
1107    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1108    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1109        Box::new(overlap_counts.iter().rev()) as _
1110    } else {
1111        Box::new(overlap_counts.iter()) as _
1112    };
1113    for (range, set) in overlap_iter {
1114        match &mut cur_range_set {
1115            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1116            Some((working_range, working_set)) => {
1117                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1118                // to be read before merge sorting), and `working_set` have >1 count
1119                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1120                // so that merge sort is possible
1121                let need_expand = {
1122                    let last_part = working_set.last();
1123                    let inter: BTreeSet<usize> = working_set
1124                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1125                        .cloned()
1126                        .collect();
1127                    if let Some(one) = inter.first()
1128                        && inter.len() == 1
1129                        && Some(one) == last_part
1130                    {
1131                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1132                        if set.iter().all(|p| Some(p) >= last_part) {
1133                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1134                            false
1135                        } else {
1136                            // elsewise, we need to expand working range to include next overlap range
1137                            true
1138                        }
1139                    } else if inter.is_empty() {
1140                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1141                        false
1142                    } else {
1143                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1144                        true
1145                    }
1146                };
1147
1148                if need_expand {
1149                    if descending {
1150                        working_range.start = range.start;
1151                    } else {
1152                        working_range.end = range.end;
1153                    }
1154                    working_set.extend(set.iter().cloned());
1155                } else {
1156                    ret.push((*working_range, std::mem::take(working_set)));
1157                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1158                }
1159            }
1160        }
1161    }
1162
1163    if let Some(cur_range_set) = cur_range_set {
1164        ret.push(cur_range_set)
1165    }
1166
1167    ret
1168}
1169
1170/// return a map of non-overlapping ranges and their corresponding index
1171/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1172fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1173    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1174    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1175    for (idx, range) in ranges.iter().enumerate() {
1176        let key: TimeRange = (range.start, range.end).into();
1177        let mut actions = Vec::new();
1178        let mut untouched = vec![key];
1179        // create a forward and backward iterator to find all overlapping ranges
1180        // given that tuple is sorted in lexicographical order and promise to not overlap,
1181        // since range is sorted that way, we can stop when we find a non-overlapping range
1182        let forward_iter = ret
1183            .range(key..)
1184            .take_while(|(range, _)| range.is_overlapping(&key));
1185        let backward_iter = ret
1186            .range(..key)
1187            .rev()
1188            .take_while(|(range, _)| range.is_overlapping(&key));
1189
1190        for (range, parts) in forward_iter.chain(backward_iter) {
1191            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1192            let act = split_range_by(range, parts, &key, idx);
1193            actions.extend(act.into_iter());
1194        }
1195
1196        for action in actions {
1197            match action {
1198                Action::Pop(range) => {
1199                    ret.remove(&range);
1200                }
1201                Action::Push(range, parts) => {
1202                    ret.insert(range, parts);
1203                }
1204            }
1205        }
1206
1207        // insert untouched ranges
1208        for range in untouched {
1209            ret.insert(range, vec![idx]);
1210        }
1211    }
1212    ret
1213}
1214
1215/// Get timestamp from array at offset
1216fn get_timestamp_from_idx(
1217    array: &ArrayRef,
1218    offset: usize,
1219) -> datafusion_common::Result<Option<Timestamp>> {
1220    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1221        unit
1222    } else {
1223        return Err(DataFusionError::Internal(format!(
1224            "Unsupported sort column type: {}",
1225            array.data_type()
1226        )));
1227    };
1228    let ty = array.data_type();
1229    let array = array.slice(offset, 1);
1230    let mut iter = downcast_ts_array!(
1231        array.data_type() => (array_iter_helper, array),
1232        _ => internal_err!("Unsupported sort column type: {ty}")?
1233    );
1234    let (_idx, val) = iter.next().ok_or_else(|| {
1235        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1236    })?;
1237    let val = if let Some(val) = val {
1238        val
1239    } else {
1240        return Ok(None);
1241    };
1242    let gt_timestamp = Timestamp::new(val, time_unit.into());
1243    Ok(Some(gt_timestamp))
1244}
1245
1246#[cfg(test)]
1247mod test {
1248    use std::io::Write;
1249    use std::sync::Arc;
1250
1251    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1252    use arrow::compute::concat_batches;
1253    use arrow::json::ArrayWriter;
1254    use arrow_schema::{Field, Schema, TimeUnit};
1255    use futures::StreamExt;
1256    use pretty_assertions::assert_eq;
1257    use serde_json::json;
1258
1259    use super::*;
1260    use crate::test_util::{MockInputExec, new_ts_array};
1261
1262    #[test]
1263    fn test_overlapping() {
1264        let testcases = [
1265            (
1266                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1267                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1268                false,
1269            ),
1270            (
1271                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1272                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1273                true,
1274            ),
1275            (
1276                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1277                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1278                true,
1279            ),
1280            (
1281                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1282                (
1283                    Timestamp::new_millisecond(1000),
1284                    Timestamp::new_millisecond(1002),
1285                ),
1286                true,
1287            ),
1288            (
1289                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1290                (
1291                    Timestamp::new_millisecond(1001),
1292                    Timestamp::new_millisecond(1002),
1293                ),
1294                false,
1295            ),
1296            (
1297                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1298                (
1299                    Timestamp::new_millisecond(1002),
1300                    Timestamp::new_millisecond(1003),
1301                ),
1302                false,
1303            ),
1304        ];
1305
1306        for (range1, range2, expected) in testcases.iter() {
1307            assert_eq!(
1308                TimeRange::from(range1).is_overlapping(&range2.into()),
1309                *expected,
1310                "range1: {:?}, range2: {:?}",
1311                range1,
1312                range2
1313            );
1314        }
1315    }
1316
1317    #[test]
1318    fn test_split() {
1319        let testcases = [
1320            // no split
1321            (
1322                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1323                vec![0],
1324                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1325                1,
1326                vec![],
1327            ),
1328            // one part
1329            (
1330                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1331                vec![0],
1332                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1333                1,
1334                vec![
1335                    Action::Pop(
1336                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1337                    ),
1338                    Action::Push(
1339                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1340                        vec![0, 1],
1341                    ),
1342                ],
1343            ),
1344            (
1345                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1346                vec![0],
1347                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1348                1,
1349                vec![
1350                    Action::Pop(
1351                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1352                    ),
1353                    Action::Push(
1354                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1355                        vec![0, 1],
1356                    ),
1357                ],
1358            ),
1359            (
1360                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1361                vec![0],
1362                (
1363                    Timestamp::new_millisecond(1000),
1364                    Timestamp::new_millisecond(1002),
1365                ),
1366                1,
1367                vec![
1368                    Action::Pop(
1369                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1370                    ),
1371                    Action::Push(
1372                        (
1373                            Timestamp::new_millisecond(1000),
1374                            Timestamp::new_millisecond(1001),
1375                        )
1376                            .into(),
1377                        vec![0, 1],
1378                    ),
1379                ],
1380            ),
1381            // two part
1382            (
1383                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1384                vec![0],
1385                (
1386                    Timestamp::new_millisecond(1001),
1387                    Timestamp::new_millisecond(1002),
1388                ),
1389                1,
1390                vec![
1391                    Action::Pop(
1392                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1393                    ),
1394                    Action::Push(
1395                        (
1396                            Timestamp::new_millisecond(1001),
1397                            Timestamp::new_millisecond(1002),
1398                        )
1399                            .into(),
1400                        vec![0, 1],
1401                    ),
1402                    Action::Push(
1403                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1404                        vec![0],
1405                    ),
1406                ],
1407            ),
1408            // three part
1409            (
1410                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1411                vec![0],
1412                (
1413                    Timestamp::new_millisecond(1001),
1414                    Timestamp::new_millisecond(1002),
1415                ),
1416                1,
1417                vec![
1418                    Action::Pop(
1419                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1420                    ),
1421                    Action::Push(
1422                        (
1423                            Timestamp::new_millisecond(1001),
1424                            Timestamp::new_millisecond(1002),
1425                        )
1426                            .into(),
1427                        vec![0, 1],
1428                    ),
1429                    Action::Push(
1430                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1431                        vec![0],
1432                    ),
1433                    Action::Push(
1434                        (
1435                            Timestamp::new_millisecond(1002),
1436                            Timestamp::new_millisecond(1004),
1437                        )
1438                            .into(),
1439                        vec![0],
1440                    ),
1441                ],
1442            ),
1443        ];
1444
1445        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1446            assert_eq!(
1447                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1448                *expected,
1449                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1450                range,
1451                parts,
1452                split_by,
1453                split_idx
1454            );
1455        }
1456    }
1457
1458    #[test]
1459    fn test_compute_working_ranges_rev() {
1460        let testcases = vec![
1461            (
1462                BTreeMap::from([(
1463                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1464                    vec![0],
1465                )]),
1466                vec![(
1467                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1468                    BTreeSet::from([0]),
1469                )],
1470            ),
1471            (
1472                BTreeMap::from([(
1473                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1474                    vec![0, 1],
1475                )]),
1476                vec![(
1477                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1478                    BTreeSet::from([0, 1]),
1479                )],
1480            ),
1481            (
1482                BTreeMap::from([
1483                    (
1484                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1485                        vec![0],
1486                    ),
1487                    (
1488                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1489                        vec![0, 1],
1490                    ),
1491                ]),
1492                vec![
1493                    (
1494                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1495                        BTreeSet::from([0]),
1496                    ),
1497                    (
1498                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1499                        BTreeSet::from([0, 1]),
1500                    ),
1501                ],
1502            ),
1503            (
1504                BTreeMap::from([
1505                    (
1506                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1507                        vec![0, 1],
1508                    ),
1509                    (
1510                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1511                        vec![1],
1512                    ),
1513                ]),
1514                vec![
1515                    (
1516                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1517                        BTreeSet::from([0, 1]),
1518                    ),
1519                    (
1520                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1521                        BTreeSet::from([1]),
1522                    ),
1523                ],
1524            ),
1525            (
1526                BTreeMap::from([
1527                    (
1528                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1529                        vec![0],
1530                    ),
1531                    (
1532                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1533                        vec![0, 1],
1534                    ),
1535                    (
1536                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1537                        vec![1],
1538                    ),
1539                ]),
1540                vec![
1541                    (
1542                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1543                        BTreeSet::from([0]),
1544                    ),
1545                    (
1546                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1547                        BTreeSet::from([0, 1]),
1548                    ),
1549                    (
1550                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1551                        BTreeSet::from([1]),
1552                    ),
1553                ],
1554            ),
1555            (
1556                BTreeMap::from([
1557                    (
1558                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1559                        vec![0, 2],
1560                    ),
1561                    (
1562                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1563                        vec![0, 1, 2],
1564                    ),
1565                    (
1566                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1567                        vec![1, 2],
1568                    ),
1569                ]),
1570                vec![(
1571                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1572                    BTreeSet::from([0, 1, 2]),
1573                )],
1574            ),
1575            (
1576                BTreeMap::from([
1577                    (
1578                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1579                        vec![0, 2],
1580                    ),
1581                    (
1582                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1583                        vec![1, 2],
1584                    ),
1585                ]),
1586                vec![(
1587                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1588                    BTreeSet::from([0, 1, 2]),
1589                )],
1590            ),
1591            (
1592                BTreeMap::from([
1593                    (
1594                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1595                        vec![0, 1],
1596                    ),
1597                    (
1598                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1599                        vec![0, 1, 2],
1600                    ),
1601                    (
1602                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1603                        vec![1, 2],
1604                    ),
1605                ]),
1606                vec![(
1607                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1608                    BTreeSet::from([0, 1, 2]),
1609                )],
1610            ),
1611            (
1612                BTreeMap::from([
1613                    (
1614                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1615                        vec![0, 1],
1616                    ),
1617                    (
1618                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1619                        vec![1, 2],
1620                    ),
1621                ]),
1622                vec![
1623                    (
1624                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1625                        BTreeSet::from([0, 1]),
1626                    ),
1627                    (
1628                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1629                        BTreeSet::from([1, 2]),
1630                    ),
1631                ],
1632            ),
1633            // non-overlapping
1634            (
1635                BTreeMap::from([
1636                    (
1637                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1638                        vec![0],
1639                    ),
1640                    (
1641                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1642                        vec![1, 2],
1643                    ),
1644                ]),
1645                vec![
1646                    (
1647                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1648                        BTreeSet::from([0]),
1649                    ),
1650                    (
1651                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1652                        BTreeSet::from([1, 2]),
1653                    ),
1654                ],
1655            ),
1656        ];
1657
1658        for (input, expected) in testcases {
1659            let expected = expected
1660                .into_iter()
1661                .map(|(r, s)| (r.into(), s))
1662                .collect_vec();
1663            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1664            assert_eq!(
1665                compute_all_working_ranges(&input, true),
1666                expected,
1667                "input: {:?}",
1668                input
1669            );
1670        }
1671    }
1672
1673    #[test]
1674    fn test_compute_working_ranges() {
1675        let testcases = vec![
1676            (
1677                BTreeMap::from([(
1678                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1679                    vec![0],
1680                )]),
1681                vec![(
1682                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1683                    BTreeSet::from([0]),
1684                )],
1685            ),
1686            (
1687                BTreeMap::from([(
1688                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1689                    vec![0, 1],
1690                )]),
1691                vec![(
1692                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1693                    BTreeSet::from([0, 1]),
1694                )],
1695            ),
1696            (
1697                BTreeMap::from([
1698                    (
1699                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1700                        vec![0, 1],
1701                    ),
1702                    (
1703                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1704                        vec![1],
1705                    ),
1706                ]),
1707                vec![
1708                    (
1709                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1710                        BTreeSet::from([0, 1]),
1711                    ),
1712                    (
1713                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1714                        BTreeSet::from([1]),
1715                    ),
1716                ],
1717            ),
1718            (
1719                BTreeMap::from([
1720                    (
1721                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1722                        vec![0],
1723                    ),
1724                    (
1725                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1726                        vec![0, 1],
1727                    ),
1728                ]),
1729                vec![
1730                    (
1731                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1732                        BTreeSet::from([0]),
1733                    ),
1734                    (
1735                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1736                        BTreeSet::from([0, 1]),
1737                    ),
1738                ],
1739            ),
1740            // test if only one count in working set get it's own working range
1741            (
1742                BTreeMap::from([
1743                    (
1744                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1745                        vec![0],
1746                    ),
1747                    (
1748                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1749                        vec![0, 1],
1750                    ),
1751                    (
1752                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1753                        vec![1],
1754                    ),
1755                ]),
1756                vec![
1757                    (
1758                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1759                        BTreeSet::from([0]),
1760                    ),
1761                    (
1762                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1763                        BTreeSet::from([0, 1]),
1764                    ),
1765                    (
1766                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1767                        BTreeSet::from([1]),
1768                    ),
1769                ],
1770            ),
1771            (
1772                BTreeMap::from([
1773                    (
1774                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1775                        vec![0, 2],
1776                    ),
1777                    (
1778                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1779                        vec![0, 1, 2],
1780                    ),
1781                    (
1782                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1783                        vec![1, 2],
1784                    ),
1785                ]),
1786                vec![(
1787                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1788                    BTreeSet::from([0, 1, 2]),
1789                )],
1790            ),
1791            (
1792                BTreeMap::from([
1793                    (
1794                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1795                        vec![0, 2],
1796                    ),
1797                    (
1798                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1799                        vec![1, 2],
1800                    ),
1801                ]),
1802                vec![(
1803                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1804                    BTreeSet::from([0, 1, 2]),
1805                )],
1806            ),
1807            (
1808                BTreeMap::from([
1809                    (
1810                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1811                        vec![0, 1],
1812                    ),
1813                    (
1814                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1815                        vec![0, 1, 2],
1816                    ),
1817                    (
1818                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1819                        vec![1, 2],
1820                    ),
1821                ]),
1822                vec![(
1823                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1824                    BTreeSet::from([0, 1, 2]),
1825                )],
1826            ),
1827            (
1828                BTreeMap::from([
1829                    (
1830                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1831                        vec![0, 1],
1832                    ),
1833                    (
1834                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1835                        vec![1, 2],
1836                    ),
1837                ]),
1838                vec![
1839                    (
1840                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1841                        BTreeSet::from([0, 1]),
1842                    ),
1843                    (
1844                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1845                        BTreeSet::from([1, 2]),
1846                    ),
1847                ],
1848            ),
1849            // non-overlapping
1850            (
1851                BTreeMap::from([
1852                    (
1853                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1854                        vec![0, 1],
1855                    ),
1856                    (
1857                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1858                        vec![2],
1859                    ),
1860                ]),
1861                vec![
1862                    (
1863                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1864                        BTreeSet::from([0, 1]),
1865                    ),
1866                    (
1867                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1868                        BTreeSet::from([2]),
1869                    ),
1870                ],
1871            ),
1872        ];
1873
1874        for (input, expected) in testcases {
1875            let expected = expected
1876                .into_iter()
1877                .map(|(r, s)| (r.into(), s))
1878                .collect_vec();
1879            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1880            assert_eq!(
1881                compute_all_working_ranges(&input, false),
1882                expected,
1883                "input: {:?}",
1884                input
1885            );
1886        }
1887    }
1888
1889    #[test]
1890    fn test_split_overlap_range() {
1891        let testcases = vec![
1892            // simple one range
1893            (
1894                vec![PartitionRange {
1895                    start: Timestamp::new_second(1),
1896                    end: Timestamp::new_second(2),
1897                    num_rows: 2,
1898                    identifier: 0,
1899                }],
1900                BTreeMap::from_iter(
1901                    vec![(
1902                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1903                        vec![0],
1904                    )]
1905                    .into_iter(),
1906                ),
1907            ),
1908            // two overlapping range
1909            (
1910                vec![
1911                    PartitionRange {
1912                        start: Timestamp::new_second(1),
1913                        end: Timestamp::new_second(2),
1914                        num_rows: 2,
1915                        identifier: 0,
1916                    },
1917                    PartitionRange {
1918                        start: Timestamp::new_second(1),
1919                        end: Timestamp::new_second(2),
1920                        num_rows: 2,
1921                        identifier: 1,
1922                    },
1923                ],
1924                BTreeMap::from_iter(
1925                    vec![(
1926                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1927                        vec![0, 1],
1928                    )]
1929                    .into_iter(),
1930                ),
1931            ),
1932            (
1933                vec![
1934                    PartitionRange {
1935                        start: Timestamp::new_second(1),
1936                        end: Timestamp::new_second(3),
1937                        num_rows: 2,
1938                        identifier: 0,
1939                    },
1940                    PartitionRange {
1941                        start: Timestamp::new_second(2),
1942                        end: Timestamp::new_second(4),
1943                        num_rows: 2,
1944                        identifier: 1,
1945                    },
1946                ],
1947                BTreeMap::from_iter(
1948                    vec![
1949                        (
1950                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1951                            vec![0],
1952                        ),
1953                        (
1954                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1955                            vec![0, 1],
1956                        ),
1957                        (
1958                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1959                            vec![1],
1960                        ),
1961                    ]
1962                    .into_iter(),
1963                ),
1964            ),
1965            // three or more overlapping range
1966            (
1967                vec![
1968                    PartitionRange {
1969                        start: Timestamp::new_second(1),
1970                        end: Timestamp::new_second(3),
1971                        num_rows: 2,
1972                        identifier: 0,
1973                    },
1974                    PartitionRange {
1975                        start: Timestamp::new_second(2),
1976                        end: Timestamp::new_second(4),
1977                        num_rows: 2,
1978                        identifier: 1,
1979                    },
1980                    PartitionRange {
1981                        start: Timestamp::new_second(1),
1982                        end: Timestamp::new_second(4),
1983                        num_rows: 2,
1984                        identifier: 2,
1985                    },
1986                ],
1987                BTreeMap::from_iter(
1988                    vec![
1989                        (
1990                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1991                            vec![0, 2],
1992                        ),
1993                        (
1994                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1995                            vec![0, 1, 2],
1996                        ),
1997                        (
1998                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1999                            vec![1, 2],
2000                        ),
2001                    ]
2002                    .into_iter(),
2003                ),
2004            ),
2005            (
2006                vec![
2007                    PartitionRange {
2008                        start: Timestamp::new_second(1),
2009                        end: Timestamp::new_second(3),
2010                        num_rows: 2,
2011                        identifier: 0,
2012                    },
2013                    PartitionRange {
2014                        start: Timestamp::new_second(1),
2015                        end: Timestamp::new_second(4),
2016                        num_rows: 2,
2017                        identifier: 1,
2018                    },
2019                    PartitionRange {
2020                        start: Timestamp::new_second(2),
2021                        end: Timestamp::new_second(4),
2022                        num_rows: 2,
2023                        identifier: 2,
2024                    },
2025                ],
2026                BTreeMap::from_iter(
2027                    vec![
2028                        (
2029                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2030                            vec![0, 1],
2031                        ),
2032                        (
2033                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2034                            vec![0, 1, 2],
2035                        ),
2036                        (
2037                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2038                            vec![1, 2],
2039                        ),
2040                    ]
2041                    .into_iter(),
2042                ),
2043            ),
2044        ];
2045
2046        for (input, expected) in testcases {
2047            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2048            assert_eq!(split_overlapping_ranges(&input), expected);
2049        }
2050    }
2051
2052    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2053        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2054            Self {
2055                offset: offset as usize,
2056                len: len as usize,
2057                first_val: min_val,
2058                last_val: max_val,
2059            }
2060        }
2061    }
2062
2063    #[test]
2064    fn test_find_successive_runs() {
2065        let testcases = vec![
2066            (
2067                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2068                Some(SortOptions {
2069                    descending: false,
2070                    nulls_first: false,
2071                }),
2072                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2073            ),
2074            (
2075                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2076                Some(SortOptions {
2077                    descending: false,
2078                    nulls_first: false,
2079                }),
2080                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2081            ),
2082            (
2083                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2084                Some(SortOptions {
2085                    descending: false,
2086                    nulls_first: false,
2087                }),
2088                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2089            ),
2090            (
2091                vec![Some(1), Some(2), Some(1), Some(3)],
2092                Some(SortOptions {
2093                    descending: false,
2094                    nulls_first: false,
2095                }),
2096                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2097            ),
2098            (
2099                vec![Some(1), Some(2), Some(1), Some(3)],
2100                Some(SortOptions {
2101                    descending: true,
2102                    nulls_first: false,
2103                }),
2104                vec![
2105                    (0, 1, Some(1), Some(1)),
2106                    (1, 2, Some(2), Some(1)),
2107                    (3, 1, Some(3), Some(3)),
2108                ],
2109            ),
2110            (
2111                vec![Some(1), Some(2), None, Some(3)],
2112                Some(SortOptions {
2113                    descending: false,
2114                    nulls_first: true,
2115                }),
2116                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2117            ),
2118            (
2119                vec![Some(1), Some(2), None, Some(3)],
2120                Some(SortOptions {
2121                    descending: false,
2122                    nulls_first: false,
2123                }),
2124                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2125            ),
2126            (
2127                vec![Some(2), Some(1), None, Some(3)],
2128                Some(SortOptions {
2129                    descending: true,
2130                    nulls_first: true,
2131                }),
2132                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2133            ),
2134            (
2135                vec![],
2136                Some(SortOptions {
2137                    descending: false,
2138                    nulls_first: true,
2139                }),
2140                vec![(0, 0, None, None)],
2141            ),
2142            (
2143                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2144                Some(SortOptions {
2145                    descending: true,
2146                    nulls_first: true,
2147                }),
2148                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2149            ),
2150            (
2151                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2152                Some(SortOptions {
2153                    descending: true,
2154                    nulls_first: false,
2155                }),
2156                vec![
2157                    (0, 2, None, None),
2158                    (2, 3, Some(2), Some(1)),
2159                    (5, 2, Some(5), Some(4)),
2160                ],
2161            ),
2162        ];
2163        for (input, sort_opts, expected) in testcases {
2164            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2165            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2166            assert_eq!(
2167                ret, expected,
2168                "input: {:?}, opt: {:?},expected: {:?}",
2169                input, sort_opts, expected
2170            );
2171        }
2172    }
2173
2174    #[test]
2175    fn test_cmp_with_opts() {
2176        let testcases = vec![
2177            (
2178                Some(1),
2179                Some(2),
2180                Some(SortOptions {
2181                    descending: false,
2182                    nulls_first: false,
2183                }),
2184                std::cmp::Ordering::Less,
2185            ),
2186            (
2187                Some(1),
2188                Some(2),
2189                Some(SortOptions {
2190                    descending: true,
2191                    nulls_first: false,
2192                }),
2193                std::cmp::Ordering::Greater,
2194            ),
2195            (
2196                Some(1),
2197                None,
2198                Some(SortOptions {
2199                    descending: false,
2200                    nulls_first: true,
2201                }),
2202                std::cmp::Ordering::Greater,
2203            ),
2204            (
2205                Some(1),
2206                None,
2207                Some(SortOptions {
2208                    descending: true,
2209                    nulls_first: true,
2210                }),
2211                std::cmp::Ordering::Greater,
2212            ),
2213            (
2214                Some(1),
2215                None,
2216                Some(SortOptions {
2217                    descending: true,
2218                    nulls_first: false,
2219                }),
2220                std::cmp::Ordering::Less,
2221            ),
2222            (
2223                Some(1),
2224                None,
2225                Some(SortOptions {
2226                    descending: false,
2227                    nulls_first: false,
2228                }),
2229                std::cmp::Ordering::Less,
2230            ),
2231            (
2232                None,
2233                None,
2234                Some(SortOptions {
2235                    descending: true,
2236                    nulls_first: true,
2237                }),
2238                std::cmp::Ordering::Equal,
2239            ),
2240            (
2241                None,
2242                None,
2243                Some(SortOptions {
2244                    descending: false,
2245                    nulls_first: true,
2246                }),
2247                std::cmp::Ordering::Equal,
2248            ),
2249            (
2250                None,
2251                None,
2252                Some(SortOptions {
2253                    descending: true,
2254                    nulls_first: false,
2255                }),
2256                std::cmp::Ordering::Equal,
2257            ),
2258            (
2259                None,
2260                None,
2261                Some(SortOptions {
2262                    descending: false,
2263                    nulls_first: false,
2264                }),
2265                std::cmp::Ordering::Equal,
2266            ),
2267        ];
2268        for (a, b, opts, expected) in testcases {
2269            assert_eq!(
2270                cmp_with_opts(&a, &b, &opts),
2271                expected,
2272                "a: {:?}, b: {:?}, opts: {:?}",
2273                a,
2274                b,
2275                opts
2276            );
2277        }
2278    }
2279
2280    #[test]
2281    fn test_find_slice_from_range() {
2282        let test_cases = vec![
2283            // test for off by one case
2284            (
2285                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2286                false,
2287                TimeRange {
2288                    start: Timestamp::new_millisecond(2),
2289                    end: Timestamp::new_millisecond(4),
2290                },
2291                Ok((1, 2)),
2292            ),
2293            (
2294                Arc::new(TimestampMillisecondArray::from_iter_values([
2295                    -2, -1, 0, 1, 2, 3, 4, 5,
2296                ])) as ArrayRef,
2297                false,
2298                TimeRange {
2299                    start: Timestamp::new_millisecond(-1),
2300                    end: Timestamp::new_millisecond(4),
2301                },
2302                Ok((1, 5)),
2303            ),
2304            (
2305                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2306                false,
2307                TimeRange {
2308                    start: Timestamp::new_millisecond(2),
2309                    end: Timestamp::new_millisecond(5),
2310                },
2311                Ok((1, 2)),
2312            ),
2313            (
2314                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2315                false,
2316                TimeRange {
2317                    start: Timestamp::new_millisecond(2),
2318                    end: Timestamp::new_millisecond(5),
2319                },
2320                Ok((1, 3)),
2321            ),
2322            (
2323                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2324                false,
2325                TimeRange {
2326                    start: Timestamp::new_millisecond(2),
2327                    end: Timestamp::new_millisecond(5),
2328                },
2329                Ok((1, 2)),
2330            ),
2331            (
2332                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2333                false,
2334                TimeRange {
2335                    start: Timestamp::new_millisecond(6),
2336                    end: Timestamp::new_millisecond(7),
2337                },
2338                Ok((5, 0)),
2339            ),
2340            // descending off by one cases
2341            (
2342                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2343                true,
2344                TimeRange {
2345                    end: Timestamp::new_millisecond(4),
2346                    start: Timestamp::new_millisecond(1),
2347                },
2348                Ok((1, 3)),
2349            ),
2350            (
2351                Arc::new(TimestampMillisecondArray::from_iter_values([
2352                    5, 4, 3, 2, 1, 0,
2353                ])) as ArrayRef,
2354                true,
2355                TimeRange {
2356                    end: Timestamp::new_millisecond(4),
2357                    start: Timestamp::new_millisecond(1),
2358                },
2359                Ok((2, 3)),
2360            ),
2361            (
2362                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2363                true,
2364                TimeRange {
2365                    end: Timestamp::new_millisecond(4),
2366                    start: Timestamp::new_millisecond(1),
2367                },
2368                Ok((1, 2)),
2369            ),
2370            (
2371                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2372                true,
2373                TimeRange {
2374                    end: Timestamp::new_millisecond(4),
2375                    start: Timestamp::new_millisecond(1),
2376                },
2377                Ok((2, 2)),
2378            ),
2379            (
2380                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2381                true,
2382                TimeRange {
2383                    end: Timestamp::new_millisecond(5),
2384                    start: Timestamp::new_millisecond(2),
2385                },
2386                Ok((1, 3)),
2387            ),
2388            (
2389                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2390                true,
2391                TimeRange {
2392                    end: Timestamp::new_millisecond(5),
2393                    start: Timestamp::new_millisecond(2),
2394                },
2395                Ok((1, 2)),
2396            ),
2397            (
2398                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2399                true,
2400                TimeRange {
2401                    end: Timestamp::new_millisecond(5),
2402                    start: Timestamp::new_millisecond(2),
2403                },
2404                Ok((1, 3)),
2405            ),
2406            (
2407                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2408                true,
2409                TimeRange {
2410                    end: Timestamp::new_millisecond(5),
2411                    start: Timestamp::new_millisecond(2),
2412                },
2413                Ok((1, 2)),
2414            ),
2415            (
2416                Arc::new(TimestampMillisecondArray::from_iter_values([
2417                    10, 9, 8, 7, 6,
2418                ])) as ArrayRef,
2419                true,
2420                TimeRange {
2421                    end: Timestamp::new_millisecond(5),
2422                    start: Timestamp::new_millisecond(2),
2423                },
2424                Ok((5, 0)),
2425            ),
2426            // test off by one case
2427            (
2428                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2429                true,
2430                TimeRange {
2431                    end: Timestamp::new_millisecond(4),
2432                    start: Timestamp::new_millisecond(3),
2433                },
2434                Ok((0, 1)),
2435            ),
2436            (
2437                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2438                true,
2439                TimeRange {
2440                    end: Timestamp::new_millisecond(4),
2441                    start: Timestamp::new_millisecond(3),
2442                },
2443                Ok((1, 1)),
2444            ),
2445            (
2446                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2447                true,
2448                TimeRange {
2449                    end: Timestamp::new_millisecond(4),
2450                    start: Timestamp::new_millisecond(3),
2451                },
2452                Ok((2, 1)),
2453            ),
2454        ];
2455
2456        for (sort_vals, descending, range, expected) in test_cases {
2457            let sort_column = SortColumn {
2458                values: sort_vals,
2459                options: Some(SortOptions {
2460                    descending,
2461                    ..Default::default()
2462                }),
2463            };
2464            let ret = find_slice_from_range(&sort_column, &range);
2465            match (ret, expected) {
2466                (Ok(ret), Ok(expected)) => {
2467                    assert_eq!(
2468                        ret, expected,
2469                        "sort_vals: {:?}, range: {:?}",
2470                        sort_column, range
2471                    )
2472                }
2473                (Err(err), Err(expected)) => {
2474                    let expected: &str = expected;
2475                    assert!(
2476                        err.to_string().contains(expected),
2477                        "err: {:?}, expected: {:?}",
2478                        err,
2479                        expected
2480                    );
2481                }
2482                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2483            }
2484        }
2485    }
2486
2487    #[derive(Debug)]
2488    struct TestStream {
2489        expression: PhysicalSortExpr,
2490        fetch: Option<usize>,
2491        input: Vec<(PartitionRange, DfRecordBatch)>,
2492        output: Vec<DfRecordBatch>,
2493        schema: SchemaRef,
2494    }
2495    use datafusion::physical_plan::expressions::Column;
2496    impl TestStream {
2497        fn new(
2498            ts_col: Column,
2499            opt: SortOptions,
2500            fetch: Option<usize>,
2501            schema: impl Into<arrow_schema::Fields>,
2502            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2503            expected: Vec<Vec<ArrayRef>>,
2504        ) -> Self {
2505            let expression = PhysicalSortExpr {
2506                expr: Arc::new(ts_col),
2507                options: opt,
2508            };
2509            let schema = Schema::new(schema.into());
2510            let schema = Arc::new(schema);
2511            let input = input
2512                .into_iter()
2513                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2514                .collect_vec();
2515            let output_batchs = expected
2516                .into_iter()
2517                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2518                .collect_vec();
2519            Self {
2520                expression,
2521                fetch,
2522                input,
2523                output: output_batchs,
2524                schema,
2525            }
2526        }
2527
2528        async fn run_test(&self) -> Vec<DfRecordBatch> {
2529            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2530
2531            let mock_input = MockInputExec::new(batches, self.schema.clone());
2532
2533            let exec = WindowedSortExec::try_new(
2534                self.expression.clone(),
2535                self.fetch,
2536                vec![ranges],
2537                Arc::new(mock_input),
2538            )
2539            .unwrap();
2540
2541            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2542
2543            let real_output = exec_stream.collect::<Vec<_>>().await;
2544            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2545            real_output
2546        }
2547    }
2548
2549    #[tokio::test]
2550    async fn test_window_sort_stream() {
2551        let test_cases = [
2552            TestStream::new(
2553                Column::new("ts", 0),
2554                SortOptions {
2555                    descending: false,
2556                    nulls_first: true,
2557                },
2558                None,
2559                vec![Field::new(
2560                    "ts",
2561                    DataType::Timestamp(TimeUnit::Millisecond, None),
2562                    false,
2563                )],
2564                vec![],
2565                vec![],
2566            ),
2567            TestStream::new(
2568                Column::new("ts", 0),
2569                SortOptions {
2570                    descending: false,
2571                    nulls_first: true,
2572                },
2573                None,
2574                vec![Field::new(
2575                    "ts",
2576                    DataType::Timestamp(TimeUnit::Millisecond, None),
2577                    false,
2578                )],
2579                vec![
2580                    // test one empty
2581                    (
2582                        PartitionRange {
2583                            start: Timestamp::new_millisecond(1),
2584                            end: Timestamp::new_millisecond(2),
2585                            num_rows: 1,
2586                            identifier: 0,
2587                        },
2588                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2589                    ),
2590                    (
2591                        PartitionRange {
2592                            start: Timestamp::new_millisecond(1),
2593                            end: Timestamp::new_millisecond(3),
2594                            num_rows: 1,
2595                            identifier: 0,
2596                        },
2597                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2598                    ),
2599                ],
2600                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2601                    [2],
2602                ))]],
2603            ),
2604            TestStream::new(
2605                Column::new("ts", 0),
2606                SortOptions {
2607                    descending: false,
2608                    nulls_first: true,
2609                },
2610                None,
2611                vec![Field::new(
2612                    "ts",
2613                    DataType::Timestamp(TimeUnit::Millisecond, None),
2614                    false,
2615                )],
2616                vec![
2617                    // test one empty
2618                    (
2619                        PartitionRange {
2620                            start: Timestamp::new_millisecond(1),
2621                            end: Timestamp::new_millisecond(2),
2622                            num_rows: 1,
2623                            identifier: 0,
2624                        },
2625                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2626                    ),
2627                    (
2628                        PartitionRange {
2629                            start: Timestamp::new_millisecond(1),
2630                            end: Timestamp::new_millisecond(3),
2631                            num_rows: 1,
2632                            identifier: 0,
2633                        },
2634                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2635                    ),
2636                ],
2637                vec![],
2638            ),
2639            TestStream::new(
2640                Column::new("ts", 0),
2641                SortOptions {
2642                    descending: false,
2643                    nulls_first: true,
2644                },
2645                None,
2646                vec![Field::new(
2647                    "ts",
2648                    DataType::Timestamp(TimeUnit::Millisecond, None),
2649                    false,
2650                )],
2651                vec![
2652                    // test indistinguishable case
2653                    // we can't know whether `2` belong to which range
2654                    (
2655                        PartitionRange {
2656                            start: Timestamp::new_millisecond(1),
2657                            end: Timestamp::new_millisecond(2),
2658                            num_rows: 1,
2659                            identifier: 0,
2660                        },
2661                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2662                    ),
2663                    (
2664                        PartitionRange {
2665                            start: Timestamp::new_millisecond(1),
2666                            end: Timestamp::new_millisecond(3),
2667                            num_rows: 1,
2668                            identifier: 0,
2669                        },
2670                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2671                    ),
2672                ],
2673                vec![
2674                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2675                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2676                ],
2677            ),
2678            TestStream::new(
2679                Column::new("ts", 0),
2680                SortOptions {
2681                    descending: false,
2682                    nulls_first: true,
2683                },
2684                None,
2685                vec![Field::new(
2686                    "ts",
2687                    DataType::Timestamp(TimeUnit::Millisecond, None),
2688                    false,
2689                )],
2690                vec![
2691                    // test direct emit
2692                    (
2693                        PartitionRange {
2694                            start: Timestamp::new_millisecond(1),
2695                            end: Timestamp::new_millisecond(3),
2696                            num_rows: 1,
2697                            identifier: 0,
2698                        },
2699                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2700                            1, 2,
2701                        ]))],
2702                    ),
2703                    (
2704                        PartitionRange {
2705                            start: Timestamp::new_millisecond(1),
2706                            end: Timestamp::new_millisecond(4),
2707                            num_rows: 1,
2708                            identifier: 0,
2709                        },
2710                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2711                            2, 3,
2712                        ]))],
2713                    ),
2714                ],
2715                vec![
2716                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2717                        1, 2,
2718                    ]))],
2719                    // didn't trigger a merge sort/concat here so this is it
2720                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2721                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2722                ],
2723            ),
2724            TestStream::new(
2725                Column::new("ts", 0),
2726                SortOptions {
2727                    descending: false,
2728                    nulls_first: true,
2729                },
2730                None,
2731                vec![Field::new(
2732                    "ts",
2733                    DataType::Timestamp(TimeUnit::Millisecond, None),
2734                    false,
2735                )],
2736                vec![
2737                    // test more of cross working range batch intersection
2738                    (
2739                        PartitionRange {
2740                            start: Timestamp::new_millisecond(1),
2741                            end: Timestamp::new_millisecond(3),
2742                            num_rows: 1,
2743                            identifier: 0,
2744                        },
2745                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2746                            1, 2,
2747                        ]))],
2748                    ),
2749                    (
2750                        PartitionRange {
2751                            start: Timestamp::new_millisecond(1),
2752                            end: Timestamp::new_millisecond(4),
2753                            num_rows: 1,
2754                            identifier: 1,
2755                        },
2756                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2757                            1, 2, 3,
2758                        ]))],
2759                    ),
2760                ],
2761                vec![
2762                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2763                        1, 1, 2, 2,
2764                    ]))],
2765                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2766                ],
2767            ),
2768            TestStream::new(
2769                Column::new("ts", 0),
2770                SortOptions {
2771                    descending: false,
2772                    nulls_first: true,
2773                },
2774                None,
2775                vec![Field::new(
2776                    "ts",
2777                    DataType::Timestamp(TimeUnit::Millisecond, None),
2778                    false,
2779                )],
2780                vec![
2781                    // no overlap, empty intersection batch case
2782                    (
2783                        PartitionRange {
2784                            start: Timestamp::new_millisecond(1),
2785                            end: Timestamp::new_millisecond(3),
2786                            num_rows: 1,
2787                            identifier: 0,
2788                        },
2789                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2790                            1, 2,
2791                        ]))],
2792                    ),
2793                    (
2794                        PartitionRange {
2795                            start: Timestamp::new_millisecond(1),
2796                            end: Timestamp::new_millisecond(4),
2797                            num_rows: 1,
2798                            identifier: 1,
2799                        },
2800                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2801                            1, 2, 3,
2802                        ]))],
2803                    ),
2804                    (
2805                        PartitionRange {
2806                            start: Timestamp::new_millisecond(4),
2807                            end: Timestamp::new_millisecond(6),
2808                            num_rows: 1,
2809                            identifier: 1,
2810                        },
2811                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2812                            4, 5,
2813                        ]))],
2814                    ),
2815                ],
2816                vec![
2817                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2818                        1, 1, 2, 2,
2819                    ]))],
2820                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2821                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2822                        4, 5,
2823                    ]))],
2824                ],
2825            ),
2826            // test fetch
2827            TestStream::new(
2828                Column::new("ts", 0),
2829                SortOptions {
2830                    descending: false,
2831                    nulls_first: true,
2832                },
2833                Some(6),
2834                vec![Field::new(
2835                    "ts",
2836                    DataType::Timestamp(TimeUnit::Millisecond, None),
2837                    false,
2838                )],
2839                vec![
2840                    // no overlap, empty intersection batch case
2841                    (
2842                        PartitionRange {
2843                            start: Timestamp::new_millisecond(1),
2844                            end: Timestamp::new_millisecond(3),
2845                            num_rows: 1,
2846                            identifier: 0,
2847                        },
2848                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2849                            1, 2,
2850                        ]))],
2851                    ),
2852                    (
2853                        PartitionRange {
2854                            start: Timestamp::new_millisecond(1),
2855                            end: Timestamp::new_millisecond(4),
2856                            num_rows: 1,
2857                            identifier: 1,
2858                        },
2859                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2860                            1, 2, 3,
2861                        ]))],
2862                    ),
2863                    (
2864                        PartitionRange {
2865                            start: Timestamp::new_millisecond(3),
2866                            end: Timestamp::new_millisecond(6),
2867                            num_rows: 1,
2868                            identifier: 1,
2869                        },
2870                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2871                            4, 5,
2872                        ]))],
2873                    ),
2874                ],
2875                vec![
2876                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2877                        1, 1, 2, 2,
2878                    ]))],
2879                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2880                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2881                ],
2882            ),
2883            TestStream::new(
2884                Column::new("ts", 0),
2885                SortOptions {
2886                    descending: false,
2887                    nulls_first: true,
2888                },
2889                Some(3),
2890                vec![Field::new(
2891                    "ts",
2892                    DataType::Timestamp(TimeUnit::Millisecond, None),
2893                    false,
2894                )],
2895                vec![
2896                    // no overlap, empty intersection batch case
2897                    (
2898                        PartitionRange {
2899                            start: Timestamp::new_millisecond(1),
2900                            end: Timestamp::new_millisecond(3),
2901                            num_rows: 1,
2902                            identifier: 0,
2903                        },
2904                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2905                            1, 2,
2906                        ]))],
2907                    ),
2908                    (
2909                        PartitionRange {
2910                            start: Timestamp::new_millisecond(1),
2911                            end: Timestamp::new_millisecond(4),
2912                            num_rows: 1,
2913                            identifier: 1,
2914                        },
2915                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2916                            1, 2, 3,
2917                        ]))],
2918                    ),
2919                    (
2920                        PartitionRange {
2921                            start: Timestamp::new_millisecond(3),
2922                            end: Timestamp::new_millisecond(6),
2923                            num_rows: 1,
2924                            identifier: 1,
2925                        },
2926                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2927                            4, 5,
2928                        ]))],
2929                    ),
2930                ],
2931                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2932                    [1, 1, 2],
2933                ))]],
2934            ),
2935            // rev case
2936            TestStream::new(
2937                Column::new("ts", 0),
2938                SortOptions {
2939                    descending: true,
2940                    nulls_first: true,
2941                },
2942                None,
2943                vec![Field::new(
2944                    "ts",
2945                    DataType::Timestamp(TimeUnit::Millisecond, None),
2946                    false,
2947                )],
2948                vec![
2949                    // reverse order
2950                    (
2951                        PartitionRange {
2952                            start: Timestamp::new_millisecond(3),
2953                            end: Timestamp::new_millisecond(6),
2954                            num_rows: 1,
2955                            identifier: 1,
2956                        },
2957                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2958                            5, 4,
2959                        ]))],
2960                    ),
2961                    (
2962                        PartitionRange {
2963                            start: Timestamp::new_millisecond(1),
2964                            end: Timestamp::new_millisecond(4),
2965                            num_rows: 1,
2966                            identifier: 1,
2967                        },
2968                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2969                            3, 2, 1,
2970                        ]))],
2971                    ),
2972                    (
2973                        PartitionRange {
2974                            start: Timestamp::new_millisecond(1),
2975                            end: Timestamp::new_millisecond(3),
2976                            num_rows: 1,
2977                            identifier: 0,
2978                        },
2979                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2980                            2, 1,
2981                        ]))],
2982                    ),
2983                ],
2984                vec![
2985                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2986                        5, 4,
2987                    ]))],
2988                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2989                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2990                        2, 2, 1, 1,
2991                    ]))],
2992                ],
2993            ),
2994            TestStream::new(
2995                Column::new("ts", 0),
2996                SortOptions {
2997                    descending: false,
2998                    nulls_first: true,
2999                },
3000                None,
3001                vec![Field::new(
3002                    "ts",
3003                    DataType::Timestamp(TimeUnit::Millisecond, None),
3004                    false,
3005                )],
3006                vec![
3007                    // long have subset short run case
3008                    (
3009                        PartitionRange {
3010                            start: Timestamp::new_millisecond(1),
3011                            end: Timestamp::new_millisecond(10),
3012                            num_rows: 1,
3013                            identifier: 0,
3014                        },
3015                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3016                            1, 5, 9,
3017                        ]))],
3018                    ),
3019                    (
3020                        PartitionRange {
3021                            start: Timestamp::new_millisecond(3),
3022                            end: Timestamp::new_millisecond(7),
3023                            num_rows: 1,
3024                            identifier: 1,
3025                        },
3026                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3027                            3, 4, 5, 6,
3028                        ]))],
3029                    ),
3030                ],
3031                vec![
3032                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3033                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3034                        3, 4, 5, 5, 6, 9,
3035                    ]))],
3036                ],
3037            ),
3038            TestStream::new(
3039                Column::new("ts", 0),
3040                SortOptions {
3041                    descending: false,
3042                    nulls_first: true,
3043                },
3044                None,
3045                vec![Field::new(
3046                    "ts",
3047                    DataType::Timestamp(TimeUnit::Millisecond, None),
3048                    false,
3049                )],
3050                vec![
3051                    // complex overlap
3052                    (
3053                        PartitionRange {
3054                            start: Timestamp::new_millisecond(1),
3055                            end: Timestamp::new_millisecond(3),
3056                            num_rows: 1,
3057                            identifier: 0,
3058                        },
3059                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3060                            1, 2,
3061                        ]))],
3062                    ),
3063                    (
3064                        PartitionRange {
3065                            start: Timestamp::new_millisecond(1),
3066                            end: Timestamp::new_millisecond(10),
3067                            num_rows: 1,
3068                            identifier: 1,
3069                        },
3070                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3071                            1, 3, 4, 5, 6, 8,
3072                        ]))],
3073                    ),
3074                    (
3075                        PartitionRange {
3076                            start: Timestamp::new_millisecond(7),
3077                            end: Timestamp::new_millisecond(10),
3078                            num_rows: 1,
3079                            identifier: 1,
3080                        },
3081                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3082                            7, 8, 9,
3083                        ]))],
3084                    ),
3085                ],
3086                vec![
3087                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3088                        1, 1, 2,
3089                    ]))],
3090                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3091                        3, 4, 5, 6,
3092                    ]))],
3093                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3094                        7, 8, 8, 9,
3095                    ]))],
3096                ],
3097            ),
3098            TestStream::new(
3099                Column::new("ts", 0),
3100                SortOptions {
3101                    descending: false,
3102                    nulls_first: true,
3103                },
3104                None,
3105                vec![Field::new(
3106                    "ts",
3107                    DataType::Timestamp(TimeUnit::Millisecond, None),
3108                    false,
3109                )],
3110                vec![
3111                    // complex subset with having same datapoint
3112                    (
3113                        PartitionRange {
3114                            start: Timestamp::new_millisecond(1),
3115                            end: Timestamp::new_millisecond(11),
3116                            num_rows: 1,
3117                            identifier: 0,
3118                        },
3119                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3120                            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3121                        ]))],
3122                    ),
3123                    (
3124                        PartitionRange {
3125                            start: Timestamp::new_millisecond(5),
3126                            end: Timestamp::new_millisecond(7),
3127                            num_rows: 1,
3128                            identifier: 1,
3129                        },
3130                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3131                            5, 6,
3132                        ]))],
3133                    ),
3134                ],
3135                vec![
3136                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3137                        1, 2, 3, 4,
3138                    ]))],
3139                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3140                        5, 5, 6, 6, 7, 8, 9, 10,
3141                    ]))],
3142                ],
3143            ),
3144        ];
3145
3146        let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3147
3148        for (idx, testcase) in &indexed_test_cases {
3149            let output = testcase.run_test().await;
3150            assert_eq!(output, testcase.output, "case {idx} failed.");
3151        }
3152    }
3153
3154    #[tokio::test]
3155    async fn fuzzy_ish_test_window_sort_stream() {
3156        let test_cnt = 100;
3157        let part_cnt_bound = 100;
3158        let range_size_bound = 100;
3159        let range_offset_bound = 100;
3160        let in_range_datapoint_cnt_bound = 100;
3161        let fetch_bound = 100;
3162
3163        let mut rng = fastrand::Rng::new();
3164        let rng_seed = rng.u64(..);
3165        rng.seed(rng_seed);
3166        let mut bound_val = None;
3167        // construct testcases
3168        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3169        let mut full_testcase_list = Vec::new();
3170        for _case_id in 0..test_cnt {
3171            let descending = rng.bool();
3172            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3173                if descending {
3174                    return Box::new(|a: &T, b: &T| b.cmp(a));
3175                }
3176                Box::new(|a: &T, b: &T| a.cmp(b))
3177            }
3178            let unit = match rng.u8(0..3) {
3179                0 => TimeUnit::Second,
3180                1 => TimeUnit::Millisecond,
3181                2 => TimeUnit::Microsecond,
3182                _ => TimeUnit::Nanosecond,
3183            };
3184            let fetch = if rng.bool() {
3185                Some(rng.usize(0..fetch_bound))
3186            } else {
3187                None
3188            };
3189
3190            let mut input_ranged_data = vec![];
3191            let mut output_data: Vec<i64> = vec![];
3192            // generate input data
3193            for part_id in 0..rng.usize(0..part_cnt_bound) {
3194                let (start, end) = if descending {
3195                    let end = bound_val
3196                        .map(|i| i - rng.i64(0..range_offset_bound))
3197                        .unwrap_or_else(|| rng.i64(..));
3198                    bound_val = Some(end);
3199                    let start = end - rng.i64(1..range_size_bound);
3200                    let start = Timestamp::new(start, unit.into());
3201                    let end = Timestamp::new(end, unit.into());
3202                    (start, end)
3203                } else {
3204                    let start = bound_val
3205                        .map(|i| i + rng.i64(0..range_offset_bound))
3206                        .unwrap_or_else(|| rng.i64(..));
3207                    bound_val = Some(start);
3208                    let end = start + rng.i64(1..range_size_bound);
3209                    let start = Timestamp::new(start, unit.into());
3210                    let end = Timestamp::new(end, unit.into());
3211                    (start, end)
3212                };
3213
3214                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3215                let data_gen = iter
3216                    .map(|_| rng.i64(start.value()..end.value()))
3217                    .sorted_by(ret_cmp_fn(descending))
3218                    .collect_vec();
3219                output_data.extend(data_gen.clone());
3220                let arr = new_ts_array(unit, data_gen);
3221                let range = PartitionRange {
3222                    start,
3223                    end,
3224                    num_rows: arr.len(),
3225                    identifier: part_id,
3226                };
3227                input_ranged_data.push((range, vec![arr]));
3228            }
3229
3230            output_data.sort_by(ret_cmp_fn(descending));
3231            if let Some(fetch) = fetch {
3232                output_data.truncate(fetch);
3233            }
3234            let output_arr = new_ts_array(unit, output_data);
3235
3236            let test_stream = TestStream::new(
3237                Column::new("ts", 0),
3238                SortOptions {
3239                    descending,
3240                    nulls_first: true,
3241                },
3242                fetch,
3243                vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3244                input_ranged_data.clone(),
3245                vec![vec![output_arr]],
3246            );
3247            full_testcase_list.push(test_stream);
3248        }
3249
3250        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3251            let res = test_stream.run_test().await;
3252            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3253            let expected = test_stream.output;
3254            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3255
3256            if res_concat != expected_concat {
3257                {
3258                    let mut f_input = std::io::stderr();
3259                    f_input.write_all(b"[").unwrap();
3260                    for (input_range, input_arr) in test_stream.input {
3261                        let range_json = json!({
3262                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3263                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3264                            "num_rows": input_range.num_rows,
3265                            "identifier": input_range.identifier,
3266                        });
3267                        let buf = Vec::new();
3268                        let mut input_writer = ArrayWriter::new(buf);
3269                        input_writer.write(&input_arr).unwrap();
3270                        input_writer.finish().unwrap();
3271                        let res_str =
3272                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3273                        let whole_json =
3274                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3275                        f_input.write_all(whole_json.as_bytes()).unwrap();
3276                    }
3277                    f_input.write_all(b"]").unwrap();
3278                }
3279                {
3280                    let mut f_res = std::io::stderr();
3281                    f_res.write_all(b"[").unwrap();
3282                    for batch in &res {
3283                        let mut res_writer = ArrayWriter::new(f_res);
3284                        res_writer.write(batch).unwrap();
3285                        res_writer.finish().unwrap();
3286                        f_res = res_writer.into_inner();
3287                        f_res.write_all(b",").unwrap();
3288                    }
3289                    f_res.write_all(b"]").unwrap();
3290
3291                    let f_res_concat = std::io::stderr();
3292                    let mut res_writer = ArrayWriter::new(f_res_concat);
3293                    res_writer.write(&res_concat).unwrap();
3294                    res_writer.finish().unwrap();
3295
3296                    let f_expected = std::io::stderr();
3297                    let mut expected_writer = ArrayWriter::new(f_expected);
3298                    expected_writer.write(&expected_concat).unwrap();
3299                    expected_writer.finish().unwrap();
3300                }
3301                panic!(
3302                    "case failed, case id: {0}, output and expected output to stderr",
3303                    case_id
3304                );
3305            }
3306            assert_eq!(
3307                res_concat, expected_concat,
3308                "case failed, case id: {}, rng seed: {}",
3309                case_id, rng_seed
3310            );
3311        }
3312    }
3313}