query/
window_sort.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A physical plan for window sort(Which is sorting multiple sorted ranges according to input `PartitionRange`).
16//!
17
18use std::any::Any;
19use std::collections::{BTreeMap, BTreeSet, VecDeque};
20use std::pin::Pin;
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use arrow::array::{Array, ArrayRef};
25use arrow::compute::SortColumn;
26use arrow_schema::{DataType, SchemaRef, SortOptions};
27use common_error::ext::{BoxedError, PlainError};
28use common_error::status_code::StatusCode;
29use common_recordbatch::{DfRecordBatch, DfSendableRecordBatchStream};
30use common_telemetry::error;
31use common_time::Timestamp;
32use datafusion::execution::memory_pool::{MemoryConsumer, MemoryPool};
33use datafusion::execution::{RecordBatchStream, TaskContext};
34use datafusion::physical_plan::memory::MemoryStream;
35use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
36use datafusion::physical_plan::sorts::streaming_merge::StreamingMergeBuilder;
37use datafusion::physical_plan::{
38    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
39};
40use datafusion_common::utils::bisect;
41use datafusion_common::{internal_err, DataFusionError};
42use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
43use datatypes::value::Value;
44use futures::Stream;
45use itertools::Itertools;
46use snafu::ResultExt;
47use store_api::region_engine::PartitionRange;
48
49use crate::error::{QueryExecutionSnafu, Result};
50
51/// A complex stream sort execution plan which accepts a list of `PartitionRange` and
52/// merge sort them whenever possible, and emit the sorted result as soon as possible.
53/// This sorting plan only accept sort by ts and will not sort by other fields.
54///
55/// internally, it call [`StreamingMergeBuilder`] multiple times to merge multiple sorted "working ranges"
56///
57/// # Invariant Promise on Input Stream
58/// 1. The input stream must be sorted by timestamp and
59/// 2. in the order of `PartitionRange` in `ranges`
60/// 3. Each `PartitionRange` is sorted within itself(ascending or descending) but no need to be sorted across ranges
61/// 4. There can't be any RecordBatch that is cross multiple `PartitionRange` in the input stream
62///
63///  TODO(discord9): fix item 4, but since only use `PartSort` as input, this might not be a problem
64
65#[derive(Debug, Clone)]
66pub struct WindowedSortExec {
67    /// Physical sort expressions(that is, sort by timestamp)
68    expression: PhysicalSortExpr,
69    /// Optional number of rows to fetch. Stops producing rows after this fetch
70    fetch: Option<usize>,
71    /// The input ranges indicate input stream will be composed of those ranges in given order.
72    ///
73    /// Each partition has one vector of `PartitionRange`.
74    ranges: Vec<Vec<PartitionRange>>,
75    /// All available working ranges and their corresponding working set
76    ///
77    /// working ranges promise once input stream get a value out of current range, future values will never
78    /// be in this range. Each partition has one vector of ranges.
79    all_avail_working_range: Vec<Vec<(TimeRange, BTreeSet<usize>)>>,
80    input: Arc<dyn ExecutionPlan>,
81    /// Execution metrics
82    metrics: ExecutionPlanMetricsSet,
83    properties: PlanProperties,
84}
85
86fn check_partition_range_monotonicity(
87    ranges: &[Vec<PartitionRange>],
88    descending: bool,
89) -> Result<()> {
90    let is_valid = ranges.iter().all(|r| {
91        if descending {
92            r.windows(2).all(|w| w[0].end >= w[1].end)
93        } else {
94            r.windows(2).all(|w| w[0].start <= w[1].start)
95        }
96    });
97
98    if !is_valid {
99        let msg = if descending {
100            "Input `PartitionRange`s's upper bound is not monotonic non-increase"
101        } else {
102            "Input `PartitionRange`s's lower bound is not monotonic non-decrease"
103        };
104        let plain_error = PlainError::new(msg.to_string(), StatusCode::Unexpected);
105        Err(BoxedError::new(plain_error)).context(QueryExecutionSnafu {})
106    } else {
107        Ok(())
108    }
109}
110
111impl WindowedSortExec {
112    pub fn try_new(
113        expression: PhysicalSortExpr,
114        fetch: Option<usize>,
115        ranges: Vec<Vec<PartitionRange>>,
116        input: Arc<dyn ExecutionPlan>,
117    ) -> Result<Self> {
118        check_partition_range_monotonicity(&ranges, expression.options.descending)?;
119
120        let properties = input.properties();
121        let properties = PlanProperties::new(
122            input
123                .equivalence_properties()
124                .clone()
125                .with_reorder(LexOrdering::new(vec![expression.clone()])),
126            input.output_partitioning().clone(),
127            properties.emission_type,
128            properties.boundedness,
129        );
130
131        let mut all_avail_working_range = Vec::with_capacity(ranges.len());
132        for r in &ranges {
133            let overlap_counts = split_overlapping_ranges(r);
134            let working_ranges =
135                compute_all_working_ranges(&overlap_counts, expression.options.descending);
136            all_avail_working_range.push(working_ranges);
137        }
138
139        Ok(Self {
140            expression,
141            fetch,
142            ranges,
143            all_avail_working_range,
144            input,
145            metrics: ExecutionPlanMetricsSet::new(),
146            properties,
147        })
148    }
149
150    /// During receiving partial-sorted RecordBatch, we need to update the working set which is the
151    /// `PartitionRange` we think those RecordBatch belongs to. And when we receive something outside
152    /// of working set, we can merge results before whenever possible.
153    pub fn to_stream(
154        &self,
155        context: Arc<TaskContext>,
156        partition: usize,
157    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
158        let input_stream: DfSendableRecordBatchStream =
159            self.input.execute(partition, context.clone())?;
160
161        let df_stream = Box::pin(WindowedSortStream::new(
162            context,
163            self,
164            input_stream,
165            partition,
166        )) as _;
167
168        Ok(df_stream)
169    }
170}
171
172impl DisplayAs for WindowedSortExec {
173    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        write!(
175            f,
176            "WindowedSortExec: expr={} num_ranges={}",
177            self.expression,
178            self.ranges.len()
179        )?;
180        if let Some(fetch) = self.fetch {
181            write!(f, " fetch={}", fetch)?;
182        }
183        Ok(())
184    }
185}
186
187impl ExecutionPlan for WindowedSortExec {
188    fn as_any(&self) -> &dyn Any {
189        self
190    }
191
192    fn schema(&self) -> SchemaRef {
193        self.input.schema()
194    }
195
196    fn properties(&self) -> &PlanProperties {
197        &self.properties
198    }
199
200    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
201        vec![&self.input]
202    }
203
204    fn with_new_children(
205        self: Arc<Self>,
206        children: Vec<Arc<dyn ExecutionPlan>>,
207    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
208        let new_input = if let Some(first) = children.first() {
209            first
210        } else {
211            internal_err!("No children found")?
212        };
213        let new = Self::try_new(
214            self.expression.clone(),
215            self.fetch,
216            self.ranges.clone(),
217            new_input.clone(),
218        )?;
219        Ok(Arc::new(new))
220    }
221
222    fn execute(
223        &self,
224        partition: usize,
225        context: Arc<TaskContext>,
226    ) -> datafusion_common::Result<DfSendableRecordBatchStream> {
227        self.to_stream(context, partition)
228    }
229
230    fn metrics(&self) -> Option<MetricsSet> {
231        Some(self.metrics.clone_inner())
232    }
233
234    /// # Explain
235    ///
236    /// This plan needs to be executed on each partition independently,
237    /// and is expected to run directly on storage engine's output
238    /// distribution / partition.
239    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
240        vec![false; self.ranges.len()]
241    }
242
243    fn name(&self) -> &str {
244        "WindowedSortExec"
245    }
246}
247
248/// The core logic of merging sort multiple sorted ranges
249///
250/// the flow of data is:
251/// ```md
252/// input --check if sorted--> in_progress --find sorted run--> sorted_input_runs --call merge sort--> merge_stream --> output
253/// ```
254pub struct WindowedSortStream {
255    /// Memory pool for this stream
256    memory_pool: Arc<dyn MemoryPool>,
257    /// currently assembling RecordBatches, will be put to `sort_partition_rbs` when it's done
258    in_progress: Vec<DfRecordBatch>,
259    /// last `Timestamp` of the last input RecordBatch in `in_progress`, use to found partial sorted run's boundary
260    last_value: Option<Timestamp>,
261    /// Current working set of `PartitionRange` sorted RecordBatches
262    sorted_input_runs: Vec<DfSendableRecordBatchStream>,
263    /// Merge-sorted result streams, should be polled to end before start a new merge sort again
264    merge_stream: VecDeque<DfSendableRecordBatchStream>,
265    /// The number of times merge sort has been called
266    merge_count: usize,
267    /// Index into current `working_range` in `all_avail_working_range`
268    working_idx: usize,
269    /// input stream assumed reading in order of `PartitionRange`
270    input: DfSendableRecordBatchStream,
271    /// Whether this stream is terminated. For reasons like limit reached or input stream is done.
272    is_terminated: bool,
273    /// Output Schema, which is the same as input schema, since this is a sort plan
274    schema: SchemaRef,
275    /// Physical sort expressions(that is, sort by timestamp)
276    expression: PhysicalSortExpr,
277    /// Optional number of rows to fetch. Stops producing rows after this fetch
278    fetch: Option<usize>,
279    /// number of rows produced
280    produced: usize,
281    /// Resulting Stream(`merge_stream`)'s batch size, merely a suggestion
282    batch_size: usize,
283    /// All available working ranges and their corresponding working set
284    ///
285    /// working ranges promise once input stream get a value out of current range, future values will never be in this range
286    all_avail_working_range: Vec<(TimeRange, BTreeSet<usize>)>,
287    /// The input partition ranges
288    #[allow(dead_code)] // this is used under #[debug_assertions]
289    ranges: Vec<PartitionRange>,
290    /// Execution metrics
291    metrics: BaselineMetrics,
292}
293
294impl WindowedSortStream {
295    pub fn new(
296        context: Arc<TaskContext>,
297        exec: &WindowedSortExec,
298        input: DfSendableRecordBatchStream,
299        partition: usize,
300    ) -> Self {
301        Self {
302            memory_pool: context.runtime_env().memory_pool.clone(),
303            in_progress: Vec::new(),
304            last_value: None,
305            sorted_input_runs: Vec::new(),
306            merge_stream: VecDeque::new(),
307            merge_count: 0,
308            working_idx: 0,
309            schema: input.schema(),
310            input,
311            is_terminated: false,
312            expression: exec.expression.clone(),
313            fetch: exec.fetch,
314            produced: 0,
315            batch_size: context.session_config().batch_size(),
316            all_avail_working_range: exec.all_avail_working_range[partition].clone(),
317            ranges: exec.ranges[partition].clone(),
318            metrics: BaselineMetrics::new(&exec.metrics, partition),
319        }
320    }
321}
322
323impl WindowedSortStream {
324    #[cfg(debug_assertions)]
325    fn check_subset_ranges(&self, cur_range: &TimeRange) {
326        let cur_is_subset_to = self
327            .ranges
328            .iter()
329            .filter(|r| cur_range.is_subset(&TimeRange::from(*r)))
330            .collect_vec();
331        if cur_is_subset_to.is_empty() {
332            error!("Current range is not a subset of any PartitionRange");
333            // found in all ranges that are subset of current range
334            let subset_ranges = self
335                .ranges
336                .iter()
337                .filter(|r| TimeRange::from(*r).is_subset(cur_range))
338                .collect_vec();
339            let only_overlap = self
340                .ranges
341                .iter()
342                .filter(|r| {
343                    let r = TimeRange::from(*r);
344                    r.is_overlapping(cur_range) && !r.is_subset(cur_range)
345                })
346                .collect_vec();
347            error!(
348                "Bad input, found {} ranges that are subset of current range, also found {} ranges that only overlap, subset ranges are: {:?}; overlap ranges are: {:?}",
349                subset_ranges.len(),
350                only_overlap.len(),
351                subset_ranges,
352                only_overlap
353            );
354        } else {
355            let only_overlap = self
356                .ranges
357                .iter()
358                .filter(|r| {
359                    let r = TimeRange::from(*r);
360                    r.is_overlapping(cur_range) && !cur_range.is_subset(&r)
361                })
362                .collect_vec();
363            error!(
364                "Found current range to be subset of {} ranges, also found {} ranges that only overlap, of subset ranges are:{:?}; overlap ranges are: {:?}",
365                cur_is_subset_to.len(),
366                only_overlap.len(),
367                cur_is_subset_to,
368                only_overlap
369            );
370        }
371        let all_overlap_working_range = self
372            .all_avail_working_range
373            .iter()
374            .filter(|(range, _)| range.is_overlapping(cur_range))
375            .map(|(range, _)| range)
376            .collect_vec();
377        error!(
378            "Found {} working ranges that overlap with current range: {:?}",
379            all_overlap_working_range.len(),
380            all_overlap_working_range
381        );
382    }
383
384    /// Poll the next RecordBatch from the merge-sort's output stream
385    fn poll_result_stream(
386        &mut self,
387        cx: &mut Context<'_>,
388    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
389        while let Some(merge_stream) = &mut self.merge_stream.front_mut() {
390            match merge_stream.as_mut().poll_next(cx) {
391                Poll::Ready(Some(Ok(batch))) => {
392                    let ret = if let Some(remaining) = self.remaining_fetch() {
393                        if remaining == 0 {
394                            self.is_terminated = true;
395                            None
396                        } else if remaining < batch.num_rows() {
397                            self.produced += remaining;
398                            Some(Ok(batch.slice(0, remaining)))
399                        } else {
400                            self.produced += batch.num_rows();
401                            Some(Ok(batch))
402                        }
403                    } else {
404                        self.produced += batch.num_rows();
405                        Some(Ok(batch))
406                    };
407                    return Poll::Ready(ret);
408                }
409                Poll::Ready(Some(Err(e))) => {
410                    return Poll::Ready(Some(Err(e)));
411                }
412                Poll::Ready(None) => {
413                    // current merge stream is done, we can start polling the next one
414
415                    self.merge_stream.pop_front();
416                    continue;
417                }
418                Poll::Pending => {
419                    return Poll::Pending;
420                }
421            }
422        }
423        // if no output stream is available
424        Poll::Ready(None)
425    }
426
427    /// The core logic of merging sort multiple sorted ranges
428    ///
429    /// We try to maximize the number of sorted runs we can merge in one go, while emit the result as soon as possible.
430    pub fn poll_next_inner(
431        mut self: Pin<&mut Self>,
432        cx: &mut Context<'_>,
433    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
434        // first check and send out the merge result
435        match self.poll_result_stream(cx) {
436            Poll::Ready(None) => {
437                if self.is_terminated {
438                    return Poll::Ready(None);
439                }
440            }
441            x => return x,
442        };
443
444        // consume input stream
445        while !self.is_terminated {
446            // then we get a new RecordBatch from input stream
447            let new_input_rbs = match self.input.as_mut().poll_next(cx) {
448                Poll::Ready(Some(Ok(batch))) => {
449                    Some(split_batch_to_sorted_run(batch, &self.expression)?)
450                }
451                Poll::Ready(Some(Err(e))) => {
452                    return Poll::Ready(Some(Err(e)));
453                }
454                Poll::Ready(None) => {
455                    // input stream is done, we need to merge sort the remaining working set
456                    self.is_terminated = true;
457                    None
458                }
459                Poll::Pending => return Poll::Pending,
460            };
461
462            let Some(SortedRunSet {
463                runs_with_batch,
464                sort_column,
465            }) = new_input_rbs
466            else {
467                // input stream is done, we need to merge sort the remaining working set
468                self.build_sorted_stream()?;
469                self.start_new_merge_sort()?;
470                continue;
471            };
472            // The core logic to eargerly merge sort the working set
473
474            // compare with last_value to find boundary, then merge runs if needed
475
476            // iterate over runs_with_batch to merge sort, might create zero or more stream to put to `sort_partition_rbs`
477            let mut last_remaining = None;
478            let mut run_iter = runs_with_batch.into_iter();
479            loop {
480                let Some((sorted_rb, run_info)) = last_remaining.take().or(run_iter.next()) else {
481                    break;
482                };
483                if sorted_rb.num_rows() == 0 {
484                    continue;
485                }
486                // determine if this batch is in current working range
487                let Some(cur_range) = run_info.get_time_range() else {
488                    internal_err!("Found NULL in time index column")?
489                };
490                let Some(working_range) = self.get_working_range() else {
491                    internal_err!("No working range found")?
492                };
493
494                // ensure the current batch is in the working range
495                if sort_column.options.unwrap_or_default().descending {
496                    if cur_range.end > working_range.end {
497                        error!("Invalid range: {:?} > {:?}", cur_range, working_range);
498                        #[cfg(debug_assertions)]
499                        self.check_subset_ranges(&cur_range);
500                        internal_err!("Current batch have data on the right side of working range, something is very wrong")?;
501                    }
502                } else if cur_range.start < working_range.start {
503                    error!("Invalid range: {:?} < {:?}", cur_range, working_range);
504                    #[cfg(debug_assertions)]
505                    self.check_subset_ranges(&cur_range);
506                    internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
507                }
508
509                if cur_range.is_subset(&working_range) {
510                    // data still in range, can't merge sort yet
511                    // see if can concat entire sorted rb, merge sort need to wait
512                    self.try_concat_batch(sorted_rb.clone(), &run_info, sort_column.options)?;
513                } else if let Some(intersection) = cur_range.intersection(&working_range) {
514                    // slice rb by intersection and concat it then merge sort
515                    let cur_sort_column = sort_column.values.slice(run_info.offset, run_info.len);
516                    let (offset, len) = find_slice_from_range(
517                        &SortColumn {
518                            values: cur_sort_column.clone(),
519                            options: sort_column.options,
520                        },
521                        &intersection,
522                    )?;
523
524                    if offset != 0 {
525                        internal_err!("Current batch have data on the left side of working range, something is very wrong")?;
526                    }
527
528                    let sliced_rb = sorted_rb.slice(offset, len);
529
530                    // try to concat the sliced input batch to the current `in_progress` run
531                    self.try_concat_batch(sliced_rb, &run_info, sort_column.options)?;
532                    // since no more sorted data in this range will come in now, build stream now
533                    self.build_sorted_stream()?;
534
535                    // since we are crossing working range, we need to merge sort the working set
536                    self.start_new_merge_sort()?;
537
538                    let (r_offset, r_len) = (offset + len, sorted_rb.num_rows() - offset - len);
539                    if r_len != 0 {
540                        // we have remaining data in this batch, put it back to input queue
541                        let remaining_rb = sorted_rb.slice(r_offset, r_len);
542                        let new_first_val = get_timestamp_from_idx(&cur_sort_column, r_offset)?;
543                        let new_run_info = SucRun {
544                            offset: run_info.offset + r_offset,
545                            len: r_len,
546                            first_val: new_first_val,
547                            last_val: run_info.last_val,
548                        };
549                        last_remaining = Some((remaining_rb, new_run_info));
550                    }
551                    // deal with remaining batch cross working range problem
552                    // i.e: this example require more slice, and we are currently at point A
553                    // |---1---|       |---3---|
554                    // |-------A--2------------|
555                    //  put the remaining batch back to iter and deal it in next loop
556                } else {
557                    // no overlap, we can merge sort the working set
558
559                    self.build_sorted_stream()?;
560                    self.start_new_merge_sort()?;
561
562                    // always put it back to input queue until some batch is in working range
563                    last_remaining = Some((sorted_rb, run_info));
564                }
565            }
566        }
567        // emit the merge result
568        self.poll_result_stream(cx)
569    }
570
571    fn push_batch(&mut self, batch: DfRecordBatch) {
572        self.in_progress.push(batch);
573    }
574
575    /// Try to concat the input batch to the current `in_progress` run
576    ///
577    /// if the input batch is not sorted, build old run to stream and start a new run with new batch
578    fn try_concat_batch(
579        &mut self,
580        batch: DfRecordBatch,
581        run_info: &SucRun<Timestamp>,
582        opt: Option<SortOptions>,
583    ) -> datafusion_common::Result<()> {
584        let is_ok_to_concat =
585            cmp_with_opts(&self.last_value, &run_info.first_val, &opt) <= std::cmp::Ordering::Equal;
586
587        if is_ok_to_concat {
588            self.push_batch(batch);
589            // next time we get input batch might still be ordered, so not build stream yet
590        } else {
591            // no more sorted data, build stream now
592            self.build_sorted_stream()?;
593            self.push_batch(batch);
594        }
595        self.last_value = run_info.last_val;
596        Ok(())
597    }
598
599    /// Get the current working range
600    fn get_working_range(&self) -> Option<TimeRange> {
601        self.all_avail_working_range
602            .get(self.working_idx)
603            .map(|(range, _)| *range)
604    }
605
606    /// Set current working range to the next working range
607    fn set_next_working_range(&mut self) {
608        self.working_idx += 1;
609    }
610
611    /// make `in_progress` as a new `DfSendableRecordBatchStream` and put into `sorted_input_runs`
612    fn build_sorted_stream(&mut self) -> datafusion_common::Result<()> {
613        if self.in_progress.is_empty() {
614            return Ok(());
615        }
616        let data = std::mem::take(&mut self.in_progress);
617
618        let new_stream = MemoryStream::try_new(data, self.schema(), None)?;
619        self.sorted_input_runs.push(Box::pin(new_stream));
620        Ok(())
621    }
622
623    /// Start merging sort the current working set
624    fn start_new_merge_sort(&mut self) -> datafusion_common::Result<()> {
625        if !self.in_progress.is_empty() {
626            return internal_err!("Starting a merge sort when in_progress is not empty")?;
627        }
628
629        self.set_next_working_range();
630
631        let streams = std::mem::take(&mut self.sorted_input_runs);
632        if streams.is_empty() {
633            return Ok(());
634        } else if streams.len() == 1 {
635            self.merge_stream
636                .push_back(streams.into_iter().next().unwrap());
637            return Ok(());
638        }
639
640        let fetch = self.remaining_fetch();
641        let reservation = MemoryConsumer::new(format!("WindowedSortStream[{}]", self.merge_count))
642            .register(&self.memory_pool);
643        self.merge_count += 1;
644        let lex_ordering = LexOrdering::new(vec![self.expression.clone()]);
645
646        let resulting_stream = StreamingMergeBuilder::new()
647            .with_streams(streams)
648            .with_schema(self.schema())
649            .with_expressions(&lex_ordering)
650            .with_metrics(self.metrics.clone())
651            .with_batch_size(self.batch_size)
652            .with_fetch(fetch)
653            .with_reservation(reservation)
654            .build()?;
655        self.merge_stream.push_back(resulting_stream);
656        // this working range is done, move to next working range
657        Ok(())
658    }
659
660    /// Remaining number of rows to fetch, if no fetch limit, return None
661    /// if fetch limit is reached, return Some(0)
662    fn remaining_fetch(&self) -> Option<usize> {
663        let total_now = self.produced;
664        self.fetch.map(|p| p.saturating_sub(total_now))
665    }
666}
667
668impl Stream for WindowedSortStream {
669    type Item = datafusion_common::Result<DfRecordBatch>;
670
671    fn poll_next(
672        mut self: Pin<&mut Self>,
673        cx: &mut Context<'_>,
674    ) -> Poll<Option<datafusion_common::Result<DfRecordBatch>>> {
675        let result = self.as_mut().poll_next_inner(cx);
676        self.metrics.record_poll(result)
677    }
678}
679
680impl RecordBatchStream for WindowedSortStream {
681    fn schema(&self) -> SchemaRef {
682        self.schema.clone()
683    }
684}
685
686/// split batch to sorted runs
687fn split_batch_to_sorted_run(
688    batch: DfRecordBatch,
689    expression: &PhysicalSortExpr,
690) -> datafusion_common::Result<SortedRunSet<Timestamp>> {
691    // split input rb to sorted runs
692    let sort_column = expression.evaluate_to_sort_column(&batch)?;
693    let sorted_runs_offset = get_sorted_runs(sort_column.clone())?;
694    if let Some(run) = sorted_runs_offset.first()
695        && sorted_runs_offset.len() == 1
696    {
697        if !(run.offset == 0 && run.len == batch.num_rows()) {
698            internal_err!(
699                "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
700                run.offset,
701                run.len,
702                batch.num_rows()
703            )?;
704        }
705        // input rb is already sorted, we can emit it directly
706        Ok(SortedRunSet {
707            runs_with_batch: vec![(batch, run.clone())],
708            sort_column,
709        })
710    } else {
711        // those slice should be zero copy, so supposedly no new reservation needed
712        let mut ret = Vec::with_capacity(sorted_runs_offset.len());
713        for run in sorted_runs_offset {
714            if run.offset + run.len > batch.num_rows() {
715                internal_err!(
716                    "Invalid run offset and length: offset = {:?}, len = {:?}, num_rows = {:?}",
717                    run.offset,
718                    run.len,
719                    batch.num_rows()
720                )?;
721            }
722            let new_rb = batch.slice(run.offset, run.len);
723            ret.push((new_rb, run));
724        }
725        Ok(SortedRunSet {
726            runs_with_batch: ret,
727            sort_column,
728        })
729    }
730}
731
732/// Downcast a temporal array to a specific type
733///
734/// usage similar to `downcast_primitive!` in `arrow-array` crate
735#[macro_export]
736macro_rules! downcast_ts_array {
737    ($data_type:expr => ($m:path $(, $args:tt)*), $($p:pat => $fallback:expr $(,)*)*) =>
738    {
739        match $data_type {
740            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _) => {
741                $m!(arrow::datatypes::TimestampSecondType, arrow_schema::TimeUnit::Second $(, $args)*)
742            }
743            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
744                $m!(arrow::datatypes::TimestampMillisecondType, arrow_schema::TimeUnit::Millisecond $(, $args)*)
745            }
746            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
747                $m!(arrow::datatypes::TimestampMicrosecondType, arrow_schema::TimeUnit::Microsecond $(, $args)*)
748            }
749            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
750                $m!(arrow::datatypes::TimestampNanosecondType, arrow_schema::TimeUnit::Nanosecond $(, $args)*)
751            }
752            $($p => $fallback,)*
753        }
754    };
755}
756
757/// Find the slice(where start <= data < end and sort by `sort_column.options`) from the given range
758///
759/// Return the offset and length of the slice
760fn find_slice_from_range(
761    sort_column: &SortColumn,
762    range: &TimeRange,
763) -> datafusion_common::Result<(usize, usize)> {
764    let ty = sort_column.values.data_type();
765    let time_unit = if let DataType::Timestamp(unit, _) = ty {
766        unit
767    } else {
768        return Err(DataFusionError::Internal(format!(
769            "Unsupported sort column type: {}",
770            sort_column.values.data_type()
771        )));
772    };
773    let array = &sort_column.values;
774    let opt = &sort_column.options.unwrap_or_default();
775    let descending = opt.descending;
776
777    let typed_sorted_range = [range.start, range.end]
778        .iter()
779        .map(|t| {
780            t.convert_to(time_unit.into())
781                .ok_or_else(|| {
782                    DataFusionError::Internal(format!(
783                        "Failed to convert timestamp from {:?} to {:?}",
784                        t.unit(),
785                        time_unit
786                    ))
787                })
788                .and_then(|typed_ts| {
789                    let value = Value::Timestamp(typed_ts);
790                    value
791                        .try_to_scalar_value(&value.data_type())
792                        .map_err(|e| DataFusionError::External(Box::new(e) as _))
793                })
794        })
795        .try_collect::<_, Vec<_>, _>()?;
796
797    let (min_val, max_val) = (typed_sorted_range[0].clone(), typed_sorted_range[1].clone());
798
799    // get slice that in which all data that `min_val<=data<max_val`
800    let (start, end) = if descending {
801        // note that `data < max_val`
802        // i,e, for max_val = 4, array = [5,3,2] should be start=1
803        // max_val = 4, array = [5, 4, 3, 2] should be start= 2
804        let start = bisect::<false>(&[array.clone()], &[max_val.clone()], &[*opt])?;
805        // min_val = 1, array = [3, 2, 1, 0], end = 3
806        // min_val = 1, array = [3, 2, 0], end = 2
807        let end = bisect::<false>(&[array.clone()], &[min_val.clone()], &[*opt])?;
808        (start, end)
809    } else {
810        // min_val = 1, array = [1, 2, 3], start = 0
811        // min_val = 1, array = [0, 2, 3], start = 1
812        let start = bisect::<true>(&[array.clone()], &[min_val.clone()], &[*opt])?;
813        // max_val = 3, array = [1, 3, 4], end = 1
814        // max_val = 3, array = [1, 2, 4], end = 2
815        let end = bisect::<true>(&[array.clone()], &[max_val.clone()], &[*opt])?;
816        (start, end)
817    };
818
819    Ok((start, end - start))
820}
821
822/// Get an iterator from a primitive array.
823///
824/// Used with `downcast_ts_array`. The returned iter is wrapped with `.enumerate()`.
825#[macro_export]
826macro_rules! array_iter_helper {
827    ($t:ty, $unit:expr, $arr:expr) => {{
828        let typed = $arr
829            .as_any()
830            .downcast_ref::<arrow::array::PrimitiveArray<$t>>()
831            .unwrap();
832        let iter = typed.iter().enumerate();
833        Box::new(iter) as Box<dyn Iterator<Item = (usize, Option<i64>)>>
834    }};
835}
836
837/// Compare with options, note None is considered as NULL here
838///
839/// default to null first
840fn cmp_with_opts<T: Ord>(
841    a: &Option<T>,
842    b: &Option<T>,
843    opt: &Option<SortOptions>,
844) -> std::cmp::Ordering {
845    let opt = opt.unwrap_or_default();
846
847    if let (Some(a), Some(b)) = (a, b) {
848        if opt.descending {
849            b.cmp(a)
850        } else {
851            a.cmp(b)
852        }
853    } else if opt.nulls_first {
854        // now we know at leatst one of them is None
855        // in rust None < Some(_)
856        a.cmp(b)
857    } else {
858        match (a, b) {
859            (Some(a), Some(b)) => a.cmp(b),
860            (Some(_), None) => std::cmp::Ordering::Less,
861            (None, Some(_)) => std::cmp::Ordering::Greater,
862            (None, None) => std::cmp::Ordering::Equal,
863        }
864    }
865}
866
867#[derive(Debug, Clone)]
868struct SortedRunSet<N: Ord> {
869    /// sorted runs with batch corresponding to them
870    runs_with_batch: Vec<(DfRecordBatch, SucRun<N>)>,
871    /// sorted column from eval sorting expr
872    sort_column: SortColumn,
873}
874
875/// A struct to represent a successive run in the input iterator
876#[derive(Debug, Clone, PartialEq)]
877struct SucRun<N: Ord> {
878    /// offset of the first element in the run
879    offset: usize,
880    /// length of the run
881    len: usize,
882    /// first non-null value in the run
883    first_val: Option<N>,
884    /// last non-null value in the run
885    last_val: Option<N>,
886}
887
888impl SucRun<Timestamp> {
889    /// Get the time range of the run, which is [min_val, max_val + 1)
890    fn get_time_range(&self) -> Option<TimeRange> {
891        let start = self.first_val.min(self.last_val);
892        let end = self
893            .first_val
894            .max(self.last_val)
895            .map(|i| Timestamp::new(i.value() + 1, i.unit()));
896        start.zip(end).map(|(s, e)| TimeRange::new(s, e))
897    }
898}
899
900/// find all successive runs in the input iterator
901fn find_successive_runs<T: Iterator<Item = (usize, Option<N>)>, N: Ord + Copy>(
902    iter: T,
903    sort_opts: &Option<SortOptions>,
904) -> Vec<SucRun<N>> {
905    let mut runs = Vec::new();
906    let mut last_value = None;
907    let mut iter_len = None;
908
909    let mut last_offset = 0;
910    let mut first_val: Option<N> = None;
911    let mut last_val: Option<N> = None;
912
913    for (idx, t) in iter {
914        if let Some(last_value) = &last_value {
915            if cmp_with_opts(last_value, &t, sort_opts) == std::cmp::Ordering::Greater {
916                // we found a boundary
917                let len = idx - last_offset;
918                let run = SucRun {
919                    offset: last_offset,
920                    len,
921                    first_val,
922                    last_val,
923                };
924                runs.push(run);
925                first_val = None;
926                last_val = None;
927
928                last_offset = idx;
929            }
930        }
931        last_value = Some(t);
932        if let Some(t) = t {
933            first_val = first_val.or(Some(t));
934            last_val = Some(t).or(last_val);
935        }
936        iter_len = Some(idx);
937    }
938    let run = SucRun {
939        offset: last_offset,
940        len: iter_len.map(|l| l - last_offset + 1).unwrap_or(0),
941        first_val,
942        last_val,
943    };
944    runs.push(run);
945
946    runs
947}
948
949/// return a list of non-overlapping (offset, length) which represent sorted runs, and
950/// can be used to call [`DfRecordBatch::slice`] to get sorted runs
951/// Returned runs will be as long as possible, and will not overlap with each other
952fn get_sorted_runs(sort_column: SortColumn) -> datafusion_common::Result<Vec<SucRun<Timestamp>>> {
953    let ty = sort_column.values.data_type();
954    if let DataType::Timestamp(unit, _) = ty {
955        let array = &sort_column.values;
956        let iter = downcast_ts_array!(
957            array.data_type() => (array_iter_helper, array),
958            _ => internal_err!("Unsupported sort column type: {ty}")?
959        );
960
961        let raw = find_successive_runs(iter, &sort_column.options);
962        let ts_runs = raw
963            .into_iter()
964            .map(|run| SucRun {
965                offset: run.offset,
966                len: run.len,
967                first_val: run.first_val.map(|v| Timestamp::new(v, unit.into())),
968                last_val: run.last_val.map(|v| Timestamp::new(v, unit.into())),
969            })
970            .collect_vec();
971        Ok(ts_runs)
972    } else {
973        Err(DataFusionError::Internal(format!(
974            "Unsupported sort column type: {ty}"
975        )))
976    }
977}
978
979/// Left(`start`) inclusive right(`end`) exclusive,
980///
981/// This is just tuple with extra methods
982#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, PartialOrd, Ord)]
983struct TimeRange {
984    start: Timestamp,
985    end: Timestamp,
986}
987
988impl From<&PartitionRange> for TimeRange {
989    fn from(range: &PartitionRange) -> Self {
990        Self::new(range.start, range.end)
991    }
992}
993
994impl From<(Timestamp, Timestamp)> for TimeRange {
995    fn from(range: (Timestamp, Timestamp)) -> Self {
996        Self::new(range.0, range.1)
997    }
998}
999
1000impl From<&(Timestamp, Timestamp)> for TimeRange {
1001    fn from(range: &(Timestamp, Timestamp)) -> Self {
1002        Self::new(range.0, range.1)
1003    }
1004}
1005
1006impl TimeRange {
1007    /// Create a new TimeRange, if start is greater than end, swap them
1008    fn new(start: Timestamp, end: Timestamp) -> Self {
1009        if start > end {
1010            Self {
1011                start: end,
1012                end: start,
1013            }
1014        } else {
1015            Self { start, end }
1016        }
1017    }
1018
1019    fn is_subset(&self, other: &Self) -> bool {
1020        self.start >= other.start && self.end <= other.end
1021    }
1022
1023    /// Check if two ranges are overlapping, exclusive(meaning if only boundary is overlapped then range is not overlapping)
1024    fn is_overlapping(&self, other: &Self) -> bool {
1025        !(self.start >= other.end || self.end <= other.start)
1026    }
1027
1028    fn intersection(&self, other: &Self) -> Option<Self> {
1029        if self.is_overlapping(other) {
1030            Some(Self::new(
1031                self.start.max(other.start),
1032                self.end.min(other.end),
1033            ))
1034        } else {
1035            None
1036        }
1037    }
1038
1039    fn difference(&self, other: &Self) -> Vec<Self> {
1040        if !self.is_overlapping(other) {
1041            vec![*self]
1042        } else {
1043            let mut ret = Vec::new();
1044            if self.start < other.start && self.end > other.end {
1045                ret.push(Self::new(self.start, other.start));
1046                ret.push(Self::new(other.end, self.end));
1047            } else if self.start < other.start {
1048                ret.push(Self::new(self.start, other.start));
1049            } else if self.end > other.end {
1050                ret.push(Self::new(other.end, self.end));
1051            }
1052            ret
1053        }
1054    }
1055}
1056
1057/// split input range by `split_by` range to one, two or three parts.
1058fn split_range_by(
1059    input_range: &TimeRange,
1060    input_parts: &[usize],
1061    split_by: &TimeRange,
1062    split_idx: usize,
1063) -> Vec<Action> {
1064    let mut ret = Vec::new();
1065    if input_range.is_overlapping(split_by) {
1066        let input_parts = input_parts.to_vec();
1067        let new_parts = {
1068            let mut new_parts = input_parts.clone();
1069            new_parts.push(split_idx);
1070            new_parts
1071        };
1072
1073        ret.push(Action::Pop(*input_range));
1074        if let Some(intersection) = input_range.intersection(split_by) {
1075            ret.push(Action::Push(intersection, new_parts.clone()));
1076        }
1077        for diff in input_range.difference(split_by) {
1078            ret.push(Action::Push(diff, input_parts.clone()));
1079        }
1080    }
1081    ret
1082}
1083
1084#[derive(Debug, Clone, PartialEq, Eq)]
1085enum Action {
1086    Pop(TimeRange),
1087    Push(TimeRange, Vec<usize>),
1088}
1089
1090/// Compute all working ranges and corresponding working sets from given `overlap_counts` computed from `split_overlapping_ranges`
1091///
1092/// working ranges promise once input stream get a value out of current range, future values will never be in this range
1093///
1094/// hence we can merge sort current working range once that happens
1095///
1096/// if `descending` is true, the working ranges will be in descending order
1097fn compute_all_working_ranges(
1098    overlap_counts: &BTreeMap<TimeRange, Vec<usize>>,
1099    descending: bool,
1100) -> Vec<(TimeRange, BTreeSet<usize>)> {
1101    let mut ret = Vec::new();
1102    let mut cur_range_set: Option<(TimeRange, BTreeSet<usize>)> = None;
1103    let overlap_iter: Box<dyn Iterator<Item = (&TimeRange, &Vec<usize>)>> = if descending {
1104        Box::new(overlap_counts.iter().rev()) as _
1105    } else {
1106        Box::new(overlap_counts.iter()) as _
1107    };
1108    for (range, set) in overlap_iter {
1109        match &mut cur_range_set {
1110            None => cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned()))),
1111            Some((working_range, working_set)) => {
1112                // if next overlap range have Partition that's is not last one in `working_set`(hence need
1113                // to be read before merge sorting), and `working_set` have >1 count
1114                // we have to expand current working range to cover it(and add it's `set` to `working_set`)
1115                // so that merge sort is possible
1116                let need_expand = {
1117                    let last_part = working_set.last();
1118                    let inter: BTreeSet<usize> = working_set
1119                        .intersection(&BTreeSet::from_iter(set.iter().cloned()))
1120                        .cloned()
1121                        .collect();
1122                    if let Some(one) = inter.first()
1123                        && inter.len() == 1
1124                        && Some(one) == last_part
1125                    {
1126                        // if only the last PartitionRange in current working set, we can just emit it so no need to expand working range
1127                        if set.iter().all(|p| Some(p) >= last_part) {
1128                            // if all PartitionRange in next overlap range is after the last one in current working set, we can just emit current working set
1129                            false
1130                        } else {
1131                            // elsewise, we need to expand working range to include next overlap range
1132                            true
1133                        }
1134                    } else if inter.is_empty() {
1135                        // if no common PartitionRange in current working set and next overlap range, we can just emit current working set
1136                        false
1137                    } else {
1138                        // have multiple intersection or intersection is not the last part, either way we need to expand working range to include next overlap range
1139                        true
1140                    }
1141                };
1142
1143                if need_expand {
1144                    if descending {
1145                        working_range.start = range.start;
1146                    } else {
1147                        working_range.end = range.end;
1148                    }
1149                    working_set.extend(set.iter().cloned());
1150                } else {
1151                    ret.push((*working_range, std::mem::take(working_set)));
1152                    cur_range_set = Some((*range, BTreeSet::from_iter(set.iter().cloned())));
1153                }
1154            }
1155        }
1156    }
1157
1158    if let Some(cur_range_set) = cur_range_set {
1159        ret.push(cur_range_set)
1160    }
1161
1162    ret
1163}
1164
1165/// return a map of non-overlapping ranges and their corresponding index
1166/// (not `PartitionRange.identifier` but position in array) in the input `PartitionRange`s that is in those ranges
1167fn split_overlapping_ranges(ranges: &[PartitionRange]) -> BTreeMap<TimeRange, Vec<usize>> {
1168    // invariant: the key ranges should not overlapping with each other by definition from `is_overlapping`
1169    let mut ret: BTreeMap<TimeRange, Vec<usize>> = BTreeMap::new();
1170    for (idx, range) in ranges.iter().enumerate() {
1171        let key: TimeRange = (range.start, range.end).into();
1172        let mut actions = Vec::new();
1173        let mut untouched = vec![key];
1174        // create a forward and backward iterator to find all overlapping ranges
1175        // given that tuple is sorted in lexicographical order and promise to not overlap,
1176        // since range is sorted that way, we can stop when we find a non-overlapping range
1177        let forward_iter = ret
1178            .range(key..)
1179            .take_while(|(range, _)| range.is_overlapping(&key));
1180        let backward_iter = ret
1181            .range(..key)
1182            .rev()
1183            .take_while(|(range, _)| range.is_overlapping(&key));
1184
1185        for (range, parts) in forward_iter.chain(backward_iter) {
1186            untouched = untouched.iter().flat_map(|r| r.difference(range)).collect();
1187            let act = split_range_by(range, parts, &key, idx);
1188            actions.extend(act.into_iter());
1189        }
1190
1191        for action in actions {
1192            match action {
1193                Action::Pop(range) => {
1194                    ret.remove(&range);
1195                }
1196                Action::Push(range, parts) => {
1197                    ret.insert(range, parts);
1198                }
1199            }
1200        }
1201
1202        // insert untouched ranges
1203        for range in untouched {
1204            ret.insert(range, vec![idx]);
1205        }
1206    }
1207    ret
1208}
1209
1210/// Get timestamp from array at offset
1211fn get_timestamp_from_idx(
1212    array: &ArrayRef,
1213    offset: usize,
1214) -> datafusion_common::Result<Option<Timestamp>> {
1215    let time_unit = if let DataType::Timestamp(unit, _) = array.data_type() {
1216        unit
1217    } else {
1218        return Err(DataFusionError::Internal(format!(
1219            "Unsupported sort column type: {}",
1220            array.data_type()
1221        )));
1222    };
1223    let ty = array.data_type();
1224    let array = array.slice(offset, 1);
1225    let mut iter = downcast_ts_array!(
1226        array.data_type() => (array_iter_helper, array),
1227        _ => internal_err!("Unsupported sort column type: {ty}")?
1228    );
1229    let (_idx, val) = iter.next().ok_or_else(|| {
1230        DataFusionError::Internal("Empty array in get_timestamp_from".to_string())
1231    })?;
1232    let val = if let Some(val) = val {
1233        val
1234    } else {
1235        return Ok(None);
1236    };
1237    let gt_timestamp = Timestamp::new(val, time_unit.into());
1238    Ok(Some(gt_timestamp))
1239}
1240
1241#[cfg(test)]
1242mod test {
1243    use std::io::Write;
1244    use std::sync::Arc;
1245
1246    use arrow::array::{ArrayRef, TimestampMillisecondArray};
1247    use arrow::compute::concat_batches;
1248    use arrow::json::ArrayWriter;
1249    use arrow_schema::{Field, Schema, TimeUnit};
1250    use futures::StreamExt;
1251    use pretty_assertions::assert_eq;
1252    use serde_json::json;
1253
1254    use super::*;
1255    use crate::test_util::{new_ts_array, MockInputExec};
1256
1257    #[test]
1258    fn test_overlapping() {
1259        let testcases = [
1260            (
1261                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1262                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1263                false,
1264            ),
1265            (
1266                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1267                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1268                true,
1269            ),
1270            (
1271                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1272                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1273                true,
1274            ),
1275            (
1276                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1277                (
1278                    Timestamp::new_millisecond(1000),
1279                    Timestamp::new_millisecond(1002),
1280                ),
1281                true,
1282            ),
1283            (
1284                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1285                (
1286                    Timestamp::new_millisecond(1001),
1287                    Timestamp::new_millisecond(1002),
1288                ),
1289                false,
1290            ),
1291            (
1292                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1293                (
1294                    Timestamp::new_millisecond(1002),
1295                    Timestamp::new_millisecond(1003),
1296                ),
1297                false,
1298            ),
1299        ];
1300
1301        for (range1, range2, expected) in testcases.iter() {
1302            assert_eq!(
1303                TimeRange::from(range1).is_overlapping(&range2.into()),
1304                *expected,
1305                "range1: {:?}, range2: {:?}",
1306                range1,
1307                range2
1308            );
1309        }
1310    }
1311
1312    #[test]
1313    fn test_split() {
1314        let testcases = [
1315            // no split
1316            (
1317                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1318                vec![0],
1319                (Timestamp::new_second(0), Timestamp::new_millisecond(1)),
1320                1,
1321                vec![],
1322            ),
1323            // one part
1324            (
1325                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1326                vec![0],
1327                (Timestamp::new_second(0), Timestamp::new_millisecond(1001)),
1328                1,
1329                vec![
1330                    Action::Pop(
1331                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1332                    ),
1333                    Action::Push(
1334                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1335                        vec![0, 1],
1336                    ),
1337                ],
1338            ),
1339            (
1340                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1341                vec![0],
1342                (Timestamp::new_second(0), Timestamp::new_millisecond(1002)),
1343                1,
1344                vec![
1345                    Action::Pop(
1346                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1347                    ),
1348                    Action::Push(
1349                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1350                        vec![0, 1],
1351                    ),
1352                ],
1353            ),
1354            (
1355                (Timestamp::new_second(1), Timestamp::new_millisecond(1001)),
1356                vec![0],
1357                (
1358                    Timestamp::new_millisecond(1000),
1359                    Timestamp::new_millisecond(1002),
1360                ),
1361                1,
1362                vec![
1363                    Action::Pop(
1364                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1365                    ),
1366                    Action::Push(
1367                        (
1368                            Timestamp::new_millisecond(1000),
1369                            Timestamp::new_millisecond(1001),
1370                        )
1371                            .into(),
1372                        vec![0, 1],
1373                    ),
1374                ],
1375            ),
1376            // two part
1377            (
1378                (Timestamp::new_second(1), Timestamp::new_millisecond(1002)),
1379                vec![0],
1380                (
1381                    Timestamp::new_millisecond(1001),
1382                    Timestamp::new_millisecond(1002),
1383                ),
1384                1,
1385                vec![
1386                    Action::Pop(
1387                        (Timestamp::new_second(1), Timestamp::new_millisecond(1002)).into(),
1388                    ),
1389                    Action::Push(
1390                        (
1391                            Timestamp::new_millisecond(1001),
1392                            Timestamp::new_millisecond(1002),
1393                        )
1394                            .into(),
1395                        vec![0, 1],
1396                    ),
1397                    Action::Push(
1398                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1399                        vec![0],
1400                    ),
1401                ],
1402            ),
1403            // three part
1404            (
1405                (Timestamp::new_second(1), Timestamp::new_millisecond(1004)),
1406                vec![0],
1407                (
1408                    Timestamp::new_millisecond(1001),
1409                    Timestamp::new_millisecond(1002),
1410                ),
1411                1,
1412                vec![
1413                    Action::Pop(
1414                        (Timestamp::new_second(1), Timestamp::new_millisecond(1004)).into(),
1415                    ),
1416                    Action::Push(
1417                        (
1418                            Timestamp::new_millisecond(1001),
1419                            Timestamp::new_millisecond(1002),
1420                        )
1421                            .into(),
1422                        vec![0, 1],
1423                    ),
1424                    Action::Push(
1425                        (Timestamp::new_second(1), Timestamp::new_millisecond(1001)).into(),
1426                        vec![0],
1427                    ),
1428                    Action::Push(
1429                        (
1430                            Timestamp::new_millisecond(1002),
1431                            Timestamp::new_millisecond(1004),
1432                        )
1433                            .into(),
1434                        vec![0],
1435                    ),
1436                ],
1437            ),
1438        ];
1439
1440        for (range, parts, split_by, split_idx, expected) in testcases.iter() {
1441            assert_eq!(
1442                split_range_by(&(*range).into(), parts, &split_by.into(), *split_idx),
1443                *expected,
1444                "range: {:?}, parts: {:?}, split_by: {:?}, split_idx: {}",
1445                range,
1446                parts,
1447                split_by,
1448                split_idx
1449            );
1450        }
1451    }
1452
1453    #[test]
1454    fn test_compute_working_ranges_rev() {
1455        let testcases = vec![
1456            (
1457                BTreeMap::from([(
1458                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1459                    vec![0],
1460                )]),
1461                vec![(
1462                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1463                    BTreeSet::from([0]),
1464                )],
1465            ),
1466            (
1467                BTreeMap::from([(
1468                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1469                    vec![0, 1],
1470                )]),
1471                vec![(
1472                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1473                    BTreeSet::from([0, 1]),
1474                )],
1475            ),
1476            (
1477                BTreeMap::from([
1478                    (
1479                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1480                        vec![0],
1481                    ),
1482                    (
1483                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1484                        vec![0, 1],
1485                    ),
1486                ]),
1487                vec![
1488                    (
1489                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1490                        BTreeSet::from([0]),
1491                    ),
1492                    (
1493                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1494                        BTreeSet::from([0, 1]),
1495                    ),
1496                ],
1497            ),
1498            (
1499                BTreeMap::from([
1500                    (
1501                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1502                        vec![0, 1],
1503                    ),
1504                    (
1505                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1506                        vec![1],
1507                    ),
1508                ]),
1509                vec![
1510                    (
1511                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1512                        BTreeSet::from([0, 1]),
1513                    ),
1514                    (
1515                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1516                        BTreeSet::from([1]),
1517                    ),
1518                ],
1519            ),
1520            (
1521                BTreeMap::from([
1522                    (
1523                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1524                        vec![0],
1525                    ),
1526                    (
1527                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1528                        vec![0, 1],
1529                    ),
1530                    (
1531                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1532                        vec![1],
1533                    ),
1534                ]),
1535                vec![
1536                    (
1537                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1538                        BTreeSet::from([0]),
1539                    ),
1540                    (
1541                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1542                        BTreeSet::from([0, 1]),
1543                    ),
1544                    (
1545                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1546                        BTreeSet::from([1]),
1547                    ),
1548                ],
1549            ),
1550            (
1551                BTreeMap::from([
1552                    (
1553                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1554                        vec![0, 2],
1555                    ),
1556                    (
1557                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1558                        vec![0, 1, 2],
1559                    ),
1560                    (
1561                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1562                        vec![1, 2],
1563                    ),
1564                ]),
1565                vec![(
1566                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1567                    BTreeSet::from([0, 1, 2]),
1568                )],
1569            ),
1570            (
1571                BTreeMap::from([
1572                    (
1573                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1574                        vec![0, 2],
1575                    ),
1576                    (
1577                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1578                        vec![1, 2],
1579                    ),
1580                ]),
1581                vec![(
1582                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1583                    BTreeSet::from([0, 1, 2]),
1584                )],
1585            ),
1586            (
1587                BTreeMap::from([
1588                    (
1589                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1590                        vec![0, 1],
1591                    ),
1592                    (
1593                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1594                        vec![0, 1, 2],
1595                    ),
1596                    (
1597                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1598                        vec![1, 2],
1599                    ),
1600                ]),
1601                vec![(
1602                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1603                    BTreeSet::from([0, 1, 2]),
1604                )],
1605            ),
1606            (
1607                BTreeMap::from([
1608                    (
1609                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1610                        vec![0, 1],
1611                    ),
1612                    (
1613                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1614                        vec![1, 2],
1615                    ),
1616                ]),
1617                vec![
1618                    (
1619                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1620                        BTreeSet::from([0, 1]),
1621                    ),
1622                    (
1623                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1624                        BTreeSet::from([1, 2]),
1625                    ),
1626                ],
1627            ),
1628            // non-overlapping
1629            (
1630                BTreeMap::from([
1631                    (
1632                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1633                        vec![0],
1634                    ),
1635                    (
1636                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1637                        vec![1, 2],
1638                    ),
1639                ]),
1640                vec![
1641                    (
1642                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1643                        BTreeSet::from([0]),
1644                    ),
1645                    (
1646                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1647                        BTreeSet::from([1, 2]),
1648                    ),
1649                ],
1650            ),
1651        ];
1652
1653        for (input, expected) in testcases {
1654            let expected = expected
1655                .into_iter()
1656                .map(|(r, s)| (r.into(), s))
1657                .collect_vec();
1658            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1659            assert_eq!(
1660                compute_all_working_ranges(&input, true),
1661                expected,
1662                "input: {:?}",
1663                input
1664            );
1665        }
1666    }
1667
1668    #[test]
1669    fn test_compute_working_ranges() {
1670        let testcases = vec![
1671            (
1672                BTreeMap::from([(
1673                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1674                    vec![0],
1675                )]),
1676                vec![(
1677                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1678                    BTreeSet::from([0]),
1679                )],
1680            ),
1681            (
1682                BTreeMap::from([(
1683                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1684                    vec![0, 1],
1685                )]),
1686                vec![(
1687                    (Timestamp::new_second(1), Timestamp::new_second(2)),
1688                    BTreeSet::from([0, 1]),
1689                )],
1690            ),
1691            (
1692                BTreeMap::from([
1693                    (
1694                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1695                        vec![0, 1],
1696                    ),
1697                    (
1698                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1699                        vec![1],
1700                    ),
1701                ]),
1702                vec![
1703                    (
1704                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1705                        BTreeSet::from([0, 1]),
1706                    ),
1707                    (
1708                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1709                        BTreeSet::from([1]),
1710                    ),
1711                ],
1712            ),
1713            (
1714                BTreeMap::from([
1715                    (
1716                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1717                        vec![0],
1718                    ),
1719                    (
1720                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1721                        vec![0, 1],
1722                    ),
1723                ]),
1724                vec![
1725                    (
1726                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1727                        BTreeSet::from([0]),
1728                    ),
1729                    (
1730                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1731                        BTreeSet::from([0, 1]),
1732                    ),
1733                ],
1734            ),
1735            // test if only one count in working set get it's own working range
1736            (
1737                BTreeMap::from([
1738                    (
1739                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1740                        vec![0],
1741                    ),
1742                    (
1743                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1744                        vec![0, 1],
1745                    ),
1746                    (
1747                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1748                        vec![1],
1749                    ),
1750                ]),
1751                vec![
1752                    (
1753                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1754                        BTreeSet::from([0]),
1755                    ),
1756                    (
1757                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1758                        BTreeSet::from([0, 1]),
1759                    ),
1760                    (
1761                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1762                        BTreeSet::from([1]),
1763                    ),
1764                ],
1765            ),
1766            (
1767                BTreeMap::from([
1768                    (
1769                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1770                        vec![0, 2],
1771                    ),
1772                    (
1773                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1774                        vec![0, 1, 2],
1775                    ),
1776                    (
1777                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1778                        vec![1, 2],
1779                    ),
1780                ]),
1781                vec![(
1782                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1783                    BTreeSet::from([0, 1, 2]),
1784                )],
1785            ),
1786            (
1787                BTreeMap::from([
1788                    (
1789                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1790                        vec![0, 2],
1791                    ),
1792                    (
1793                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1794                        vec![1, 2],
1795                    ),
1796                ]),
1797                vec![(
1798                    (Timestamp::new_second(1), Timestamp::new_second(3)),
1799                    BTreeSet::from([0, 1, 2]),
1800                )],
1801            ),
1802            (
1803                BTreeMap::from([
1804                    (
1805                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1806                        vec![0, 1],
1807                    ),
1808                    (
1809                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1810                        vec![0, 1, 2],
1811                    ),
1812                    (
1813                        (Timestamp::new_second(3), Timestamp::new_second(4)),
1814                        vec![1, 2],
1815                    ),
1816                ]),
1817                vec![(
1818                    (Timestamp::new_second(1), Timestamp::new_second(4)),
1819                    BTreeSet::from([0, 1, 2]),
1820                )],
1821            ),
1822            (
1823                BTreeMap::from([
1824                    (
1825                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1826                        vec![0, 1],
1827                    ),
1828                    (
1829                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1830                        vec![1, 2],
1831                    ),
1832                ]),
1833                vec![
1834                    (
1835                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1836                        BTreeSet::from([0, 1]),
1837                    ),
1838                    (
1839                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1840                        BTreeSet::from([1, 2]),
1841                    ),
1842                ],
1843            ),
1844            // non-overlapping
1845            (
1846                BTreeMap::from([
1847                    (
1848                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1849                        vec![0, 1],
1850                    ),
1851                    (
1852                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1853                        vec![2],
1854                    ),
1855                ]),
1856                vec![
1857                    (
1858                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1859                        BTreeSet::from([0, 1]),
1860                    ),
1861                    (
1862                        (Timestamp::new_second(2), Timestamp::new_second(3)),
1863                        BTreeSet::from([2]),
1864                    ),
1865                ],
1866            ),
1867        ];
1868
1869        for (input, expected) in testcases {
1870            let expected = expected
1871                .into_iter()
1872                .map(|(r, s)| (r.into(), s))
1873                .collect_vec();
1874            let input = input.into_iter().map(|(r, s)| (r.into(), s)).collect();
1875            assert_eq!(
1876                compute_all_working_ranges(&input, false),
1877                expected,
1878                "input: {:?}",
1879                input
1880            );
1881        }
1882    }
1883
1884    #[test]
1885    fn test_split_overlap_range() {
1886        let testcases = vec![
1887            // simple one range
1888            (
1889                vec![PartitionRange {
1890                    start: Timestamp::new_second(1),
1891                    end: Timestamp::new_second(2),
1892                    num_rows: 2,
1893                    identifier: 0,
1894                }],
1895                BTreeMap::from_iter(
1896                    vec![(
1897                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1898                        vec![0],
1899                    )]
1900                    .into_iter(),
1901                ),
1902            ),
1903            // two overlapping range
1904            (
1905                vec![
1906                    PartitionRange {
1907                        start: Timestamp::new_second(1),
1908                        end: Timestamp::new_second(2),
1909                        num_rows: 2,
1910                        identifier: 0,
1911                    },
1912                    PartitionRange {
1913                        start: Timestamp::new_second(1),
1914                        end: Timestamp::new_second(2),
1915                        num_rows: 2,
1916                        identifier: 1,
1917                    },
1918                ],
1919                BTreeMap::from_iter(
1920                    vec![(
1921                        (Timestamp::new_second(1), Timestamp::new_second(2)),
1922                        vec![0, 1],
1923                    )]
1924                    .into_iter(),
1925                ),
1926            ),
1927            (
1928                vec![
1929                    PartitionRange {
1930                        start: Timestamp::new_second(1),
1931                        end: Timestamp::new_second(3),
1932                        num_rows: 2,
1933                        identifier: 0,
1934                    },
1935                    PartitionRange {
1936                        start: Timestamp::new_second(2),
1937                        end: Timestamp::new_second(4),
1938                        num_rows: 2,
1939                        identifier: 1,
1940                    },
1941                ],
1942                BTreeMap::from_iter(
1943                    vec![
1944                        (
1945                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1946                            vec![0],
1947                        ),
1948                        (
1949                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1950                            vec![0, 1],
1951                        ),
1952                        (
1953                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1954                            vec![1],
1955                        ),
1956                    ]
1957                    .into_iter(),
1958                ),
1959            ),
1960            // three or more overlapping range
1961            (
1962                vec![
1963                    PartitionRange {
1964                        start: Timestamp::new_second(1),
1965                        end: Timestamp::new_second(3),
1966                        num_rows: 2,
1967                        identifier: 0,
1968                    },
1969                    PartitionRange {
1970                        start: Timestamp::new_second(2),
1971                        end: Timestamp::new_second(4),
1972                        num_rows: 2,
1973                        identifier: 1,
1974                    },
1975                    PartitionRange {
1976                        start: Timestamp::new_second(1),
1977                        end: Timestamp::new_second(4),
1978                        num_rows: 2,
1979                        identifier: 2,
1980                    },
1981                ],
1982                BTreeMap::from_iter(
1983                    vec![
1984                        (
1985                            (Timestamp::new_second(1), Timestamp::new_second(2)),
1986                            vec![0, 2],
1987                        ),
1988                        (
1989                            (Timestamp::new_second(2), Timestamp::new_second(3)),
1990                            vec![0, 1, 2],
1991                        ),
1992                        (
1993                            (Timestamp::new_second(3), Timestamp::new_second(4)),
1994                            vec![1, 2],
1995                        ),
1996                    ]
1997                    .into_iter(),
1998                ),
1999            ),
2000            (
2001                vec![
2002                    PartitionRange {
2003                        start: Timestamp::new_second(1),
2004                        end: Timestamp::new_second(3),
2005                        num_rows: 2,
2006                        identifier: 0,
2007                    },
2008                    PartitionRange {
2009                        start: Timestamp::new_second(1),
2010                        end: Timestamp::new_second(4),
2011                        num_rows: 2,
2012                        identifier: 1,
2013                    },
2014                    PartitionRange {
2015                        start: Timestamp::new_second(2),
2016                        end: Timestamp::new_second(4),
2017                        num_rows: 2,
2018                        identifier: 2,
2019                    },
2020                ],
2021                BTreeMap::from_iter(
2022                    vec![
2023                        (
2024                            (Timestamp::new_second(1), Timestamp::new_second(2)),
2025                            vec![0, 1],
2026                        ),
2027                        (
2028                            (Timestamp::new_second(2), Timestamp::new_second(3)),
2029                            vec![0, 1, 2],
2030                        ),
2031                        (
2032                            (Timestamp::new_second(3), Timestamp::new_second(4)),
2033                            vec![1, 2],
2034                        ),
2035                    ]
2036                    .into_iter(),
2037                ),
2038            ),
2039        ];
2040
2041        for (input, expected) in testcases {
2042            let expected = expected.into_iter().map(|(r, s)| (r.into(), s)).collect();
2043            assert_eq!(split_overlapping_ranges(&input), expected);
2044        }
2045    }
2046
2047    impl From<(i32, i32, Option<i32>, Option<i32>)> for SucRun<i32> {
2048        fn from((offset, len, min_val, max_val): (i32, i32, Option<i32>, Option<i32>)) -> Self {
2049            Self {
2050                offset: offset as usize,
2051                len: len as usize,
2052                first_val: min_val,
2053                last_val: max_val,
2054            }
2055        }
2056    }
2057
2058    #[test]
2059    fn test_find_successive_runs() {
2060        let testcases = vec![
2061            (
2062                vec![Some(1), Some(1), Some(2), Some(1), Some(3)],
2063                Some(SortOptions {
2064                    descending: false,
2065                    nulls_first: false,
2066                }),
2067                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2068            ),
2069            (
2070                vec![Some(1), Some(2), Some(2), Some(1), Some(3)],
2071                Some(SortOptions {
2072                    descending: false,
2073                    nulls_first: false,
2074                }),
2075                vec![(0, 3, Some(1), Some(2)), (3, 2, Some(1), Some(3))],
2076            ),
2077            (
2078                vec![Some(1), Some(2), None, None, Some(1), Some(3)],
2079                Some(SortOptions {
2080                    descending: false,
2081                    nulls_first: false,
2082                }),
2083                vec![(0, 4, Some(1), Some(2)), (4, 2, Some(1), Some(3))],
2084            ),
2085            (
2086                vec![Some(1), Some(2), Some(1), Some(3)],
2087                Some(SortOptions {
2088                    descending: false,
2089                    nulls_first: false,
2090                }),
2091                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(1), Some(3))],
2092            ),
2093            (
2094                vec![Some(1), Some(2), Some(1), Some(3)],
2095                Some(SortOptions {
2096                    descending: true,
2097                    nulls_first: false,
2098                }),
2099                vec![
2100                    (0, 1, Some(1), Some(1)),
2101                    (1, 2, Some(2), Some(1)),
2102                    (3, 1, Some(3), Some(3)),
2103                ],
2104            ),
2105            (
2106                vec![Some(1), Some(2), None, Some(3)],
2107                Some(SortOptions {
2108                    descending: false,
2109                    nulls_first: true,
2110                }),
2111                vec![(0, 2, Some(1), Some(2)), (2, 2, Some(3), Some(3))],
2112            ),
2113            (
2114                vec![Some(1), Some(2), None, Some(3)],
2115                Some(SortOptions {
2116                    descending: false,
2117                    nulls_first: false,
2118                }),
2119                vec![(0, 3, Some(1), Some(2)), (3, 1, Some(3), Some(3))],
2120            ),
2121            (
2122                vec![Some(2), Some(1), None, Some(3)],
2123                Some(SortOptions {
2124                    descending: true,
2125                    nulls_first: true,
2126                }),
2127                vec![(0, 2, Some(2), Some(1)), (2, 2, Some(3), Some(3))],
2128            ),
2129            (
2130                vec![],
2131                Some(SortOptions {
2132                    descending: false,
2133                    nulls_first: true,
2134                }),
2135                vec![(0, 0, None, None)],
2136            ),
2137            (
2138                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2139                Some(SortOptions {
2140                    descending: true,
2141                    nulls_first: true,
2142                }),
2143                vec![(0, 5, Some(2), Some(1)), (5, 2, Some(5), Some(4))],
2144            ),
2145            (
2146                vec![None, None, Some(2), Some(2), Some(1), Some(5), Some(4)],
2147                Some(SortOptions {
2148                    descending: true,
2149                    nulls_first: false,
2150                }),
2151                vec![
2152                    (0, 2, None, None),
2153                    (2, 3, Some(2), Some(1)),
2154                    (5, 2, Some(5), Some(4)),
2155                ],
2156            ),
2157        ];
2158        for (input, sort_opts, expected) in testcases {
2159            let ret = find_successive_runs(input.clone().into_iter().enumerate(), &sort_opts);
2160            let expected = expected.into_iter().map(SucRun::<i32>::from).collect_vec();
2161            assert_eq!(
2162                ret, expected,
2163                "input: {:?}, opt: {:?},expected: {:?}",
2164                input, sort_opts, expected
2165            );
2166        }
2167    }
2168
2169    #[test]
2170    fn test_cmp_with_opts() {
2171        let testcases = vec![
2172            (
2173                Some(1),
2174                Some(2),
2175                Some(SortOptions {
2176                    descending: false,
2177                    nulls_first: false,
2178                }),
2179                std::cmp::Ordering::Less,
2180            ),
2181            (
2182                Some(1),
2183                Some(2),
2184                Some(SortOptions {
2185                    descending: true,
2186                    nulls_first: false,
2187                }),
2188                std::cmp::Ordering::Greater,
2189            ),
2190            (
2191                Some(1),
2192                None,
2193                Some(SortOptions {
2194                    descending: false,
2195                    nulls_first: true,
2196                }),
2197                std::cmp::Ordering::Greater,
2198            ),
2199            (
2200                Some(1),
2201                None,
2202                Some(SortOptions {
2203                    descending: true,
2204                    nulls_first: true,
2205                }),
2206                std::cmp::Ordering::Greater,
2207            ),
2208            (
2209                Some(1),
2210                None,
2211                Some(SortOptions {
2212                    descending: true,
2213                    nulls_first: false,
2214                }),
2215                std::cmp::Ordering::Less,
2216            ),
2217            (
2218                Some(1),
2219                None,
2220                Some(SortOptions {
2221                    descending: false,
2222                    nulls_first: false,
2223                }),
2224                std::cmp::Ordering::Less,
2225            ),
2226            (
2227                None,
2228                None,
2229                Some(SortOptions {
2230                    descending: true,
2231                    nulls_first: true,
2232                }),
2233                std::cmp::Ordering::Equal,
2234            ),
2235            (
2236                None,
2237                None,
2238                Some(SortOptions {
2239                    descending: false,
2240                    nulls_first: true,
2241                }),
2242                std::cmp::Ordering::Equal,
2243            ),
2244            (
2245                None,
2246                None,
2247                Some(SortOptions {
2248                    descending: true,
2249                    nulls_first: false,
2250                }),
2251                std::cmp::Ordering::Equal,
2252            ),
2253            (
2254                None,
2255                None,
2256                Some(SortOptions {
2257                    descending: false,
2258                    nulls_first: false,
2259                }),
2260                std::cmp::Ordering::Equal,
2261            ),
2262        ];
2263        for (a, b, opts, expected) in testcases {
2264            assert_eq!(
2265                cmp_with_opts(&a, &b, &opts),
2266                expected,
2267                "a: {:?}, b: {:?}, opts: {:?}",
2268                a,
2269                b,
2270                opts
2271            );
2272        }
2273    }
2274
2275    #[test]
2276    fn test_find_slice_from_range() {
2277        let test_cases = vec![
2278            // test for off by one case
2279            (
2280                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2281                false,
2282                TimeRange {
2283                    start: Timestamp::new_millisecond(2),
2284                    end: Timestamp::new_millisecond(4),
2285                },
2286                Ok((1, 2)),
2287            ),
2288            (
2289                Arc::new(TimestampMillisecondArray::from_iter_values([
2290                    -2, -1, 0, 1, 2, 3, 4, 5,
2291                ])) as ArrayRef,
2292                false,
2293                TimeRange {
2294                    start: Timestamp::new_millisecond(-1),
2295                    end: Timestamp::new_millisecond(4),
2296                },
2297                Ok((1, 5)),
2298            ),
2299            (
2300                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 6])) as ArrayRef,
2301                false,
2302                TimeRange {
2303                    start: Timestamp::new_millisecond(2),
2304                    end: Timestamp::new_millisecond(5),
2305                },
2306                Ok((1, 2)),
2307            ),
2308            (
2309                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 6])) as ArrayRef,
2310                false,
2311                TimeRange {
2312                    start: Timestamp::new_millisecond(2),
2313                    end: Timestamp::new_millisecond(5),
2314                },
2315                Ok((1, 3)),
2316            ),
2317            (
2318                Arc::new(TimestampMillisecondArray::from_iter_values([1, 3, 4, 5, 6])) as ArrayRef,
2319                false,
2320                TimeRange {
2321                    start: Timestamp::new_millisecond(2),
2322                    end: Timestamp::new_millisecond(5),
2323                },
2324                Ok((1, 2)),
2325            ),
2326            (
2327                Arc::new(TimestampMillisecondArray::from_iter_values([1, 2, 3, 4, 5])) as ArrayRef,
2328                false,
2329                TimeRange {
2330                    start: Timestamp::new_millisecond(6),
2331                    end: Timestamp::new_millisecond(7),
2332                },
2333                Ok((5, 0)),
2334            ),
2335            // descending off by one cases
2336            (
2337                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 1])) as ArrayRef,
2338                true,
2339                TimeRange {
2340                    end: Timestamp::new_millisecond(4),
2341                    start: Timestamp::new_millisecond(1),
2342                },
2343                Ok((1, 3)),
2344            ),
2345            (
2346                Arc::new(TimestampMillisecondArray::from_iter_values([
2347                    5, 4, 3, 2, 1, 0,
2348                ])) as ArrayRef,
2349                true,
2350                TimeRange {
2351                    end: Timestamp::new_millisecond(4),
2352                    start: Timestamp::new_millisecond(1),
2353                },
2354                Ok((2, 3)),
2355            ),
2356            (
2357                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2, 0])) as ArrayRef,
2358                true,
2359                TimeRange {
2360                    end: Timestamp::new_millisecond(4),
2361                    start: Timestamp::new_millisecond(1),
2362                },
2363                Ok((1, 2)),
2364            ),
2365            (
2366                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 0])) as ArrayRef,
2367                true,
2368                TimeRange {
2369                    end: Timestamp::new_millisecond(4),
2370                    start: Timestamp::new_millisecond(1),
2371                },
2372                Ok((2, 2)),
2373            ),
2374            (
2375                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2, 1])) as ArrayRef,
2376                true,
2377                TimeRange {
2378                    end: Timestamp::new_millisecond(5),
2379                    start: Timestamp::new_millisecond(2),
2380                },
2381                Ok((1, 3)),
2382            ),
2383            (
2384                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 1])) as ArrayRef,
2385                true,
2386                TimeRange {
2387                    end: Timestamp::new_millisecond(5),
2388                    start: Timestamp::new_millisecond(2),
2389                },
2390                Ok((1, 2)),
2391            ),
2392            (
2393                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 2, 1])) as ArrayRef,
2394                true,
2395                TimeRange {
2396                    end: Timestamp::new_millisecond(5),
2397                    start: Timestamp::new_millisecond(2),
2398                },
2399                Ok((1, 3)),
2400            ),
2401            (
2402                Arc::new(TimestampMillisecondArray::from_iter_values([6, 4, 3, 1])) as ArrayRef,
2403                true,
2404                TimeRange {
2405                    end: Timestamp::new_millisecond(5),
2406                    start: Timestamp::new_millisecond(2),
2407                },
2408                Ok((1, 2)),
2409            ),
2410            (
2411                Arc::new(TimestampMillisecondArray::from_iter_values([
2412                    10, 9, 8, 7, 6,
2413                ])) as ArrayRef,
2414                true,
2415                TimeRange {
2416                    end: Timestamp::new_millisecond(5),
2417                    start: Timestamp::new_millisecond(2),
2418                },
2419                Ok((5, 0)),
2420            ),
2421            // test off by one case
2422            (
2423                Arc::new(TimestampMillisecondArray::from_iter_values([3, 2, 1, 0])) as ArrayRef,
2424                true,
2425                TimeRange {
2426                    end: Timestamp::new_millisecond(4),
2427                    start: Timestamp::new_millisecond(3),
2428                },
2429                Ok((0, 1)),
2430            ),
2431            (
2432                Arc::new(TimestampMillisecondArray::from_iter_values([5, 3, 2])) as ArrayRef,
2433                true,
2434                TimeRange {
2435                    end: Timestamp::new_millisecond(4),
2436                    start: Timestamp::new_millisecond(3),
2437                },
2438                Ok((1, 1)),
2439            ),
2440            (
2441                Arc::new(TimestampMillisecondArray::from_iter_values([5, 4, 3, 2])) as ArrayRef,
2442                true,
2443                TimeRange {
2444                    end: Timestamp::new_millisecond(4),
2445                    start: Timestamp::new_millisecond(3),
2446                },
2447                Ok((2, 1)),
2448            ),
2449        ];
2450
2451        for (sort_vals, descending, range, expected) in test_cases {
2452            let sort_column = SortColumn {
2453                values: sort_vals,
2454                options: Some(SortOptions {
2455                    descending,
2456                    ..Default::default()
2457                }),
2458            };
2459            let ret = find_slice_from_range(&sort_column, &range);
2460            match (ret, expected) {
2461                (Ok(ret), Ok(expected)) => {
2462                    assert_eq!(
2463                        ret, expected,
2464                        "sort_vals: {:?}, range: {:?}",
2465                        sort_column, range
2466                    )
2467                }
2468                (Err(err), Err(expected)) => {
2469                    let expected: &str = expected;
2470                    assert!(
2471                        err.to_string().contains(expected),
2472                        "err: {:?}, expected: {:?}",
2473                        err,
2474                        expected
2475                    );
2476                }
2477                (r, e) => panic!("unexpected result: {:?}, expected: {:?}", r, e),
2478            }
2479        }
2480    }
2481
2482    #[derive(Debug)]
2483    struct TestStream {
2484        expression: PhysicalSortExpr,
2485        fetch: Option<usize>,
2486        input: Vec<(PartitionRange, DfRecordBatch)>,
2487        output: Vec<DfRecordBatch>,
2488        schema: SchemaRef,
2489    }
2490    use datafusion::physical_plan::expressions::Column;
2491    impl TestStream {
2492        fn new(
2493            ts_col: Column,
2494            opt: SortOptions,
2495            fetch: Option<usize>,
2496            schema: impl Into<arrow_schema::Fields>,
2497            input: Vec<(PartitionRange, Vec<ArrayRef>)>,
2498            expected: Vec<Vec<ArrayRef>>,
2499        ) -> Self {
2500            let expression = PhysicalSortExpr {
2501                expr: Arc::new(ts_col),
2502                options: opt,
2503            };
2504            let schema = Schema::new(schema.into());
2505            let schema = Arc::new(schema);
2506            let input = input
2507                .into_iter()
2508                .map(|(k, v)| (k, DfRecordBatch::try_new(schema.clone(), v).unwrap()))
2509                .collect_vec();
2510            let output_batchs = expected
2511                .into_iter()
2512                .map(|v| DfRecordBatch::try_new(schema.clone(), v).unwrap())
2513                .collect_vec();
2514            Self {
2515                expression,
2516                fetch,
2517                input,
2518                output: output_batchs,
2519                schema,
2520            }
2521        }
2522
2523        async fn run_test(&self) -> Vec<DfRecordBatch> {
2524            let (ranges, batches): (Vec<_>, Vec<_>) = self.input.clone().into_iter().unzip();
2525
2526            let mock_input = MockInputExec::new(batches, self.schema.clone());
2527
2528            let exec = WindowedSortExec::try_new(
2529                self.expression.clone(),
2530                self.fetch,
2531                vec![ranges],
2532                Arc::new(mock_input),
2533            )
2534            .unwrap();
2535
2536            let exec_stream = exec.execute(0, Arc::new(TaskContext::default())).unwrap();
2537
2538            let real_output = exec_stream.collect::<Vec<_>>().await;
2539            let real_output: Vec<_> = real_output.into_iter().try_collect().unwrap();
2540            real_output
2541        }
2542    }
2543
2544    #[tokio::test]
2545    async fn test_window_sort_stream() {
2546        let test_cases = [
2547            TestStream::new(
2548                Column::new("ts", 0),
2549                SortOptions {
2550                    descending: false,
2551                    nulls_first: true,
2552                },
2553                None,
2554                vec![Field::new(
2555                    "ts",
2556                    DataType::Timestamp(TimeUnit::Millisecond, None),
2557                    false,
2558                )],
2559                vec![],
2560                vec![],
2561            ),
2562            TestStream::new(
2563                Column::new("ts", 0),
2564                SortOptions {
2565                    descending: false,
2566                    nulls_first: true,
2567                },
2568                None,
2569                vec![Field::new(
2570                    "ts",
2571                    DataType::Timestamp(TimeUnit::Millisecond, None),
2572                    false,
2573                )],
2574                vec![
2575                    // test one empty
2576                    (
2577                        PartitionRange {
2578                            start: Timestamp::new_millisecond(1),
2579                            end: Timestamp::new_millisecond(2),
2580                            num_rows: 1,
2581                            identifier: 0,
2582                        },
2583                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2584                    ),
2585                    (
2586                        PartitionRange {
2587                            start: Timestamp::new_millisecond(1),
2588                            end: Timestamp::new_millisecond(3),
2589                            num_rows: 1,
2590                            identifier: 0,
2591                        },
2592                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2593                    ),
2594                ],
2595                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2596                    [2],
2597                ))]],
2598            ),
2599            TestStream::new(
2600                Column::new("ts", 0),
2601                SortOptions {
2602                    descending: false,
2603                    nulls_first: true,
2604                },
2605                None,
2606                vec![Field::new(
2607                    "ts",
2608                    DataType::Timestamp(TimeUnit::Millisecond, None),
2609                    false,
2610                )],
2611                vec![
2612                    // test one empty
2613                    (
2614                        PartitionRange {
2615                            start: Timestamp::new_millisecond(1),
2616                            end: Timestamp::new_millisecond(2),
2617                            num_rows: 1,
2618                            identifier: 0,
2619                        },
2620                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2621                    ),
2622                    (
2623                        PartitionRange {
2624                            start: Timestamp::new_millisecond(1),
2625                            end: Timestamp::new_millisecond(3),
2626                            num_rows: 1,
2627                            identifier: 0,
2628                        },
2629                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([]))],
2630                    ),
2631                ],
2632                vec![],
2633            ),
2634            TestStream::new(
2635                Column::new("ts", 0),
2636                SortOptions {
2637                    descending: false,
2638                    nulls_first: true,
2639                },
2640                None,
2641                vec![Field::new(
2642                    "ts",
2643                    DataType::Timestamp(TimeUnit::Millisecond, None),
2644                    false,
2645                )],
2646                vec![
2647                    // test indistinguishable case
2648                    // we can't know whether `2` belong to which range
2649                    (
2650                        PartitionRange {
2651                            start: Timestamp::new_millisecond(1),
2652                            end: Timestamp::new_millisecond(2),
2653                            num_rows: 1,
2654                            identifier: 0,
2655                        },
2656                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2657                    ),
2658                    (
2659                        PartitionRange {
2660                            start: Timestamp::new_millisecond(1),
2661                            end: Timestamp::new_millisecond(3),
2662                            num_rows: 1,
2663                            identifier: 0,
2664                        },
2665                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2666                    ),
2667                ],
2668                vec![
2669                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
2670                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2671                ],
2672            ),
2673            TestStream::new(
2674                Column::new("ts", 0),
2675                SortOptions {
2676                    descending: false,
2677                    nulls_first: true,
2678                },
2679                None,
2680                vec![Field::new(
2681                    "ts",
2682                    DataType::Timestamp(TimeUnit::Millisecond, None),
2683                    false,
2684                )],
2685                vec![
2686                    // test direct emit
2687                    (
2688                        PartitionRange {
2689                            start: Timestamp::new_millisecond(1),
2690                            end: Timestamp::new_millisecond(3),
2691                            num_rows: 1,
2692                            identifier: 0,
2693                        },
2694                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2695                            1, 2,
2696                        ]))],
2697                    ),
2698                    (
2699                        PartitionRange {
2700                            start: Timestamp::new_millisecond(1),
2701                            end: Timestamp::new_millisecond(4),
2702                            num_rows: 1,
2703                            identifier: 0,
2704                        },
2705                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2706                            2, 3,
2707                        ]))],
2708                    ),
2709                ],
2710                vec![
2711                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2712                        1, 2,
2713                    ]))],
2714                    // didn't trigger a merge sort/concat here so this is it
2715                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([2]))],
2716                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2717                ],
2718            ),
2719            TestStream::new(
2720                Column::new("ts", 0),
2721                SortOptions {
2722                    descending: false,
2723                    nulls_first: true,
2724                },
2725                None,
2726                vec![Field::new(
2727                    "ts",
2728                    DataType::Timestamp(TimeUnit::Millisecond, None),
2729                    false,
2730                )],
2731                vec![
2732                    // test more of cross working range batch intersection
2733                    (
2734                        PartitionRange {
2735                            start: Timestamp::new_millisecond(1),
2736                            end: Timestamp::new_millisecond(3),
2737                            num_rows: 1,
2738                            identifier: 0,
2739                        },
2740                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2741                            1, 2,
2742                        ]))],
2743                    ),
2744                    (
2745                        PartitionRange {
2746                            start: Timestamp::new_millisecond(1),
2747                            end: Timestamp::new_millisecond(4),
2748                            num_rows: 1,
2749                            identifier: 1,
2750                        },
2751                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2752                            1, 2, 3,
2753                        ]))],
2754                    ),
2755                ],
2756                vec![
2757                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2758                        1, 1, 2, 2,
2759                    ]))],
2760                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2761                ],
2762            ),
2763            TestStream::new(
2764                Column::new("ts", 0),
2765                SortOptions {
2766                    descending: false,
2767                    nulls_first: true,
2768                },
2769                None,
2770                vec![Field::new(
2771                    "ts",
2772                    DataType::Timestamp(TimeUnit::Millisecond, None),
2773                    false,
2774                )],
2775                vec![
2776                    // no overlap, empty intersection batch case
2777                    (
2778                        PartitionRange {
2779                            start: Timestamp::new_millisecond(1),
2780                            end: Timestamp::new_millisecond(3),
2781                            num_rows: 1,
2782                            identifier: 0,
2783                        },
2784                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2785                            1, 2,
2786                        ]))],
2787                    ),
2788                    (
2789                        PartitionRange {
2790                            start: Timestamp::new_millisecond(1),
2791                            end: Timestamp::new_millisecond(4),
2792                            num_rows: 1,
2793                            identifier: 1,
2794                        },
2795                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2796                            1, 2, 3,
2797                        ]))],
2798                    ),
2799                    (
2800                        PartitionRange {
2801                            start: Timestamp::new_millisecond(4),
2802                            end: Timestamp::new_millisecond(6),
2803                            num_rows: 1,
2804                            identifier: 1,
2805                        },
2806                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2807                            4, 5,
2808                        ]))],
2809                    ),
2810                ],
2811                vec![
2812                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2813                        1, 1, 2, 2,
2814                    ]))],
2815                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2816                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2817                        4, 5,
2818                    ]))],
2819                ],
2820            ),
2821            // test fetch
2822            TestStream::new(
2823                Column::new("ts", 0),
2824                SortOptions {
2825                    descending: false,
2826                    nulls_first: true,
2827                },
2828                Some(6),
2829                vec![Field::new(
2830                    "ts",
2831                    DataType::Timestamp(TimeUnit::Millisecond, None),
2832                    false,
2833                )],
2834                vec![
2835                    // no overlap, empty intersection batch case
2836                    (
2837                        PartitionRange {
2838                            start: Timestamp::new_millisecond(1),
2839                            end: Timestamp::new_millisecond(3),
2840                            num_rows: 1,
2841                            identifier: 0,
2842                        },
2843                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2844                            1, 2,
2845                        ]))],
2846                    ),
2847                    (
2848                        PartitionRange {
2849                            start: Timestamp::new_millisecond(1),
2850                            end: Timestamp::new_millisecond(4),
2851                            num_rows: 1,
2852                            identifier: 1,
2853                        },
2854                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2855                            1, 2, 3,
2856                        ]))],
2857                    ),
2858                    (
2859                        PartitionRange {
2860                            start: Timestamp::new_millisecond(3),
2861                            end: Timestamp::new_millisecond(6),
2862                            num_rows: 1,
2863                            identifier: 1,
2864                        },
2865                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2866                            4, 5,
2867                        ]))],
2868                    ),
2869                ],
2870                vec![
2871                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2872                        1, 1, 2, 2,
2873                    ]))],
2874                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2875                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([4]))],
2876                ],
2877            ),
2878            TestStream::new(
2879                Column::new("ts", 0),
2880                SortOptions {
2881                    descending: false,
2882                    nulls_first: true,
2883                },
2884                Some(3),
2885                vec![Field::new(
2886                    "ts",
2887                    DataType::Timestamp(TimeUnit::Millisecond, None),
2888                    false,
2889                )],
2890                vec![
2891                    // no overlap, empty intersection batch case
2892                    (
2893                        PartitionRange {
2894                            start: Timestamp::new_millisecond(1),
2895                            end: Timestamp::new_millisecond(3),
2896                            num_rows: 1,
2897                            identifier: 0,
2898                        },
2899                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2900                            1, 2,
2901                        ]))],
2902                    ),
2903                    (
2904                        PartitionRange {
2905                            start: Timestamp::new_millisecond(1),
2906                            end: Timestamp::new_millisecond(4),
2907                            num_rows: 1,
2908                            identifier: 1,
2909                        },
2910                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2911                            1, 2, 3,
2912                        ]))],
2913                    ),
2914                    (
2915                        PartitionRange {
2916                            start: Timestamp::new_millisecond(3),
2917                            end: Timestamp::new_millisecond(6),
2918                            num_rows: 1,
2919                            identifier: 1,
2920                        },
2921                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2922                            4, 5,
2923                        ]))],
2924                    ),
2925                ],
2926                vec![vec![Arc::new(TimestampMillisecondArray::from_iter_values(
2927                    [1, 1, 2],
2928                ))]],
2929            ),
2930            // rev case
2931            TestStream::new(
2932                Column::new("ts", 0),
2933                SortOptions {
2934                    descending: true,
2935                    nulls_first: true,
2936                },
2937                None,
2938                vec![Field::new(
2939                    "ts",
2940                    DataType::Timestamp(TimeUnit::Millisecond, None),
2941                    false,
2942                )],
2943                vec![
2944                    // reverse order
2945                    (
2946                        PartitionRange {
2947                            start: Timestamp::new_millisecond(3),
2948                            end: Timestamp::new_millisecond(6),
2949                            num_rows: 1,
2950                            identifier: 1,
2951                        },
2952                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2953                            5, 4,
2954                        ]))],
2955                    ),
2956                    (
2957                        PartitionRange {
2958                            start: Timestamp::new_millisecond(1),
2959                            end: Timestamp::new_millisecond(4),
2960                            num_rows: 1,
2961                            identifier: 1,
2962                        },
2963                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2964                            3, 2, 1,
2965                        ]))],
2966                    ),
2967                    (
2968                        PartitionRange {
2969                            start: Timestamp::new_millisecond(1),
2970                            end: Timestamp::new_millisecond(3),
2971                            num_rows: 1,
2972                            identifier: 0,
2973                        },
2974                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2975                            2, 1,
2976                        ]))],
2977                    ),
2978                ],
2979                vec![
2980                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2981                        5, 4,
2982                    ]))],
2983                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([3]))],
2984                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
2985                        2, 2, 1, 1,
2986                    ]))],
2987                ],
2988            ),
2989            TestStream::new(
2990                Column::new("ts", 0),
2991                SortOptions {
2992                    descending: false,
2993                    nulls_first: true,
2994                },
2995                None,
2996                vec![Field::new(
2997                    "ts",
2998                    DataType::Timestamp(TimeUnit::Millisecond, None),
2999                    false,
3000                )],
3001                vec![
3002                    // long have subset short run case
3003                    (
3004                        PartitionRange {
3005                            start: Timestamp::new_millisecond(1),
3006                            end: Timestamp::new_millisecond(10),
3007                            num_rows: 1,
3008                            identifier: 0,
3009                        },
3010                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3011                            1, 5, 9,
3012                        ]))],
3013                    ),
3014                    (
3015                        PartitionRange {
3016                            start: Timestamp::new_millisecond(3),
3017                            end: Timestamp::new_millisecond(7),
3018                            num_rows: 1,
3019                            identifier: 1,
3020                        },
3021                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3022                            3, 4, 5, 6,
3023                        ]))],
3024                    ),
3025                ],
3026                vec![
3027                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([1]))],
3028                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3029                        3, 4, 5, 5, 6, 9,
3030                    ]))],
3031                ],
3032            ),
3033            TestStream::new(
3034                Column::new("ts", 0),
3035                SortOptions {
3036                    descending: false,
3037                    nulls_first: true,
3038                },
3039                None,
3040                vec![Field::new(
3041                    "ts",
3042                    DataType::Timestamp(TimeUnit::Millisecond, None),
3043                    false,
3044                )],
3045                vec![
3046                    // complex overlap
3047                    (
3048                        PartitionRange {
3049                            start: Timestamp::new_millisecond(1),
3050                            end: Timestamp::new_millisecond(3),
3051                            num_rows: 1,
3052                            identifier: 0,
3053                        },
3054                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3055                            1, 2,
3056                        ]))],
3057                    ),
3058                    (
3059                        PartitionRange {
3060                            start: Timestamp::new_millisecond(1),
3061                            end: Timestamp::new_millisecond(10),
3062                            num_rows: 1,
3063                            identifier: 1,
3064                        },
3065                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3066                            1, 3, 4, 5, 6, 8,
3067                        ]))],
3068                    ),
3069                    (
3070                        PartitionRange {
3071                            start: Timestamp::new_millisecond(7),
3072                            end: Timestamp::new_millisecond(10),
3073                            num_rows: 1,
3074                            identifier: 1,
3075                        },
3076                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3077                            7, 8, 9,
3078                        ]))],
3079                    ),
3080                ],
3081                vec![
3082                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3083                        1, 1, 2,
3084                    ]))],
3085                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3086                        3, 4, 5, 6,
3087                    ]))],
3088                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3089                        7, 8, 8, 9,
3090                    ]))],
3091                ],
3092            ),
3093            TestStream::new(
3094                Column::new("ts", 0),
3095                SortOptions {
3096                    descending: false,
3097                    nulls_first: true,
3098                },
3099                None,
3100                vec![Field::new(
3101                    "ts",
3102                    DataType::Timestamp(TimeUnit::Millisecond, None),
3103                    false,
3104                )],
3105                vec![
3106                    // complex subset with having same datapoint
3107                    (
3108                        PartitionRange {
3109                            start: Timestamp::new_millisecond(1),
3110                            end: Timestamp::new_millisecond(11),
3111                            num_rows: 1,
3112                            identifier: 0,
3113                        },
3114                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3115                            1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
3116                        ]))],
3117                    ),
3118                    (
3119                        PartitionRange {
3120                            start: Timestamp::new_millisecond(5),
3121                            end: Timestamp::new_millisecond(7),
3122                            num_rows: 1,
3123                            identifier: 1,
3124                        },
3125                        vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3126                            5, 6,
3127                        ]))],
3128                    ),
3129                ],
3130                vec![
3131                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3132                        1, 2, 3, 4,
3133                    ]))],
3134                    vec![Arc::new(TimestampMillisecondArray::from_iter_values([
3135                        5, 5, 6, 6, 7, 8, 9, 10,
3136                    ]))],
3137                ],
3138            ),
3139        ];
3140
3141        let indexed_test_cases = test_cases.iter().enumerate().collect_vec();
3142
3143        for (idx, testcase) in &indexed_test_cases {
3144            let output = testcase.run_test().await;
3145            assert_eq!(output, testcase.output, "case {idx} failed.");
3146        }
3147    }
3148
3149    #[tokio::test]
3150    async fn fuzzy_ish_test_window_sort_stream() {
3151        let test_cnt = 100;
3152        let part_cnt_bound = 100;
3153        let range_size_bound = 100;
3154        let range_offset_bound = 100;
3155        let in_range_datapoint_cnt_bound = 100;
3156        let fetch_bound = 100;
3157
3158        let mut rng = fastrand::Rng::new();
3159        let rng_seed = rng.u64(..);
3160        rng.seed(rng_seed);
3161        let mut bound_val = None;
3162        // construct testcases
3163        type CmpFn<T> = Box<dyn FnMut(&T, &T) -> std::cmp::Ordering>;
3164        let mut full_testcase_list = Vec::new();
3165        for _case_id in 0..test_cnt {
3166            let descending = rng.bool();
3167            fn ret_cmp_fn<T: Ord>(descending: bool) -> CmpFn<T> {
3168                if descending {
3169                    return Box::new(|a: &T, b: &T| b.cmp(a));
3170                }
3171                Box::new(|a: &T, b: &T| a.cmp(b))
3172            }
3173            let unit = match rng.u8(0..3) {
3174                0 => TimeUnit::Second,
3175                1 => TimeUnit::Millisecond,
3176                2 => TimeUnit::Microsecond,
3177                _ => TimeUnit::Nanosecond,
3178            };
3179            let fetch = if rng.bool() {
3180                Some(rng.usize(0..fetch_bound))
3181            } else {
3182                None
3183            };
3184
3185            let mut input_ranged_data = vec![];
3186            let mut output_data: Vec<i64> = vec![];
3187            // generate input data
3188            for part_id in 0..rng.usize(0..part_cnt_bound) {
3189                let (start, end) = if descending {
3190                    let end = bound_val
3191                        .map(|i| i - rng.i64(0..range_offset_bound))
3192                        .unwrap_or_else(|| rng.i64(..));
3193                    bound_val = Some(end);
3194                    let start = end - rng.i64(1..range_size_bound);
3195                    let start = Timestamp::new(start, unit.into());
3196                    let end = Timestamp::new(end, unit.into());
3197                    (start, end)
3198                } else {
3199                    let start = bound_val
3200                        .map(|i| i + rng.i64(0..range_offset_bound))
3201                        .unwrap_or_else(|| rng.i64(..));
3202                    bound_val = Some(start);
3203                    let end = start + rng.i64(1..range_size_bound);
3204                    let start = Timestamp::new(start, unit.into());
3205                    let end = Timestamp::new(end, unit.into());
3206                    (start, end)
3207                };
3208
3209                let iter = 0..rng.usize(0..in_range_datapoint_cnt_bound);
3210                let data_gen = iter
3211                    .map(|_| rng.i64(start.value()..end.value()))
3212                    .sorted_by(ret_cmp_fn(descending))
3213                    .collect_vec();
3214                output_data.extend(data_gen.clone());
3215                let arr = new_ts_array(unit, data_gen);
3216                let range = PartitionRange {
3217                    start,
3218                    end,
3219                    num_rows: arr.len(),
3220                    identifier: part_id,
3221                };
3222                input_ranged_data.push((range, vec![arr]));
3223            }
3224
3225            output_data.sort_by(ret_cmp_fn(descending));
3226            if let Some(fetch) = fetch {
3227                output_data.truncate(fetch);
3228            }
3229            let output_arr = new_ts_array(unit, output_data);
3230
3231            let test_stream = TestStream::new(
3232                Column::new("ts", 0),
3233                SortOptions {
3234                    descending,
3235                    nulls_first: true,
3236                },
3237                fetch,
3238                vec![Field::new("ts", DataType::Timestamp(unit, None), false)],
3239                input_ranged_data.clone(),
3240                vec![vec![output_arr]],
3241            );
3242            full_testcase_list.push(test_stream);
3243        }
3244
3245        for (case_id, test_stream) in full_testcase_list.into_iter().enumerate() {
3246            let res = test_stream.run_test().await;
3247            let res_concat = concat_batches(&test_stream.schema, &res).unwrap();
3248            let expected = test_stream.output;
3249            let expected_concat = concat_batches(&test_stream.schema, &expected).unwrap();
3250
3251            if res_concat != expected_concat {
3252                {
3253                    let mut f_input = std::io::stderr();
3254                    f_input.write_all(b"[").unwrap();
3255                    for (input_range, input_arr) in test_stream.input {
3256                        let range_json = json!({
3257                            "start": input_range.start.to_chrono_datetime().unwrap().to_string(),
3258                            "end": input_range.end.to_chrono_datetime().unwrap().to_string(),
3259                            "num_rows": input_range.num_rows,
3260                            "identifier": input_range.identifier,
3261                        });
3262                        let buf = Vec::new();
3263                        let mut input_writer = ArrayWriter::new(buf);
3264                        input_writer.write(&input_arr).unwrap();
3265                        input_writer.finish().unwrap();
3266                        let res_str =
3267                            String::from_utf8_lossy(&input_writer.into_inner()).to_string();
3268                        let whole_json =
3269                            format!(r#"{{"range": {}, "data": {}}},"#, range_json, res_str);
3270                        f_input.write_all(whole_json.as_bytes()).unwrap();
3271                    }
3272                    f_input.write_all(b"]").unwrap();
3273                }
3274                {
3275                    let mut f_res = std::io::stderr();
3276                    f_res.write_all(b"[").unwrap();
3277                    for batch in &res {
3278                        let mut res_writer = ArrayWriter::new(f_res);
3279                        res_writer.write(batch).unwrap();
3280                        res_writer.finish().unwrap();
3281                        f_res = res_writer.into_inner();
3282                        f_res.write_all(b",").unwrap();
3283                    }
3284                    f_res.write_all(b"]").unwrap();
3285
3286                    let f_res_concat = std::io::stderr();
3287                    let mut res_writer = ArrayWriter::new(f_res_concat);
3288                    res_writer.write(&res_concat).unwrap();
3289                    res_writer.finish().unwrap();
3290
3291                    let f_expected = std::io::stderr();
3292                    let mut expected_writer = ArrayWriter::new(f_expected);
3293                    expected_writer.write(&expected_concat).unwrap();
3294                    expected_writer.finish().unwrap();
3295                }
3296                panic!(
3297                    "case failed, case id: {0}, output and expected output to stderr",
3298                    case_id
3299                );
3300            }
3301            assert_eq!(
3302                res_concat, expected_concat,
3303                "case failed, case id: {}, rng seed: {}",
3304                case_id, rng_seed
3305            );
3306        }
3307    }
3308}