mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::collections::BTreeMap;
18use std::sync::{Arc, Mutex};
19
20use common_time::Timestamp;
21use parquet::arrow::arrow_reader::RowSelection;
22use smallvec::{smallvec, SmallVec};
23use store_api::region_engine::PartitionRange;
24use store_api::storage::TimeSeriesDistribution;
25
26use crate::cache::CacheStrategy;
27use crate::error::Result;
28use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
29use crate::read::scan_region::ScanInput;
30use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
31use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
32use crate::sst::parquet::format::parquet_row_group_time_range;
33use crate::sst::parquet::reader::ReaderMetrics;
34use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
35
36const ALL_ROW_GROUPS: i64 = -1;
37
38/// Index and metadata for a memtable or file.
39#[derive(Debug, Clone, Copy, PartialEq)]
40pub(crate) struct SourceIndex {
41    /// Index of the memtable and file.
42    pub(crate) index: usize,
43    /// Total number of row groups in this source. 0 if the metadata
44    /// is unavailable. We use this to split files.
45    pub(crate) num_row_groups: u64,
46}
47
48/// Index to access a row group.
49#[derive(Debug, Clone, Copy, PartialEq)]
50pub(crate) struct RowGroupIndex {
51    /// Index to the memtable/file.
52    pub(crate) index: usize,
53    /// Row group index in the file.
54    /// Negative index indicates all row groups.
55    pub(crate) row_group_index: i64,
56}
57
58/// Meta data of a partition range.
59/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
60/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
61#[derive(Debug, PartialEq)]
62pub(crate) struct RangeMeta {
63    /// The time range of the range.
64    pub(crate) time_range: FileTimeRange,
65    /// Indices to memtables or files.
66    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
67    /// Indices to memtable/file row groups that this range scans.
68    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
69    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
70    pub(crate) num_rows: usize,
71}
72
73impl RangeMeta {
74    /// Creates a [PartitionRange] with specific identifier.
75    /// It converts the inclusive max timestamp to exclusive end timestamp.
76    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
77        PartitionRange {
78            start: self.time_range.0,
79            end: Timestamp::new(
80                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
81                // value now.
82                self.time_range
83                    .1
84                    .value()
85                    .checked_add(1)
86                    .unwrap_or(self.time_range.1.value()),
87                self.time_range.1.unit(),
88            ),
89            num_rows: self.num_rows,
90            identifier,
91        }
92    }
93
94    /// Creates a list of ranges from the `input` for seq scan.
95    /// If `compaction` is true, it doesn't split the ranges.
96    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
97        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
98        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
99        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
100
101        let ranges = group_ranges_for_seq_scan(ranges);
102        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
103            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
104            return ranges;
105        }
106        maybe_split_ranges_for_seq_scan(ranges)
107    }
108
109    /// Creates a list of ranges from the `input` for unordered scan.
110    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
111        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
112        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
113        Self::push_unordered_file_ranges(
114            input.memtables.len(),
115            &input.files,
116            &input.cache_strategy,
117            &mut ranges,
118        );
119
120        ranges
121    }
122
123    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
124    fn overlaps(&self, meta: &RangeMeta) -> bool {
125        overlaps(&self.time_range, &meta.time_range)
126    }
127
128    /// Merges given `meta` to this meta.
129    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
130    fn merge(&mut self, mut other: RangeMeta) {
131        debug_assert!(self.overlaps(&other));
132        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
133        debug_assert!(self
134            .row_group_indices
135            .iter()
136            .all(|idx| !other.row_group_indices.contains(idx)));
137
138        self.time_range = (
139            self.time_range.0.min(other.time_range.0),
140            self.time_range.1.max(other.time_range.1),
141        );
142        self.indices.append(&mut other.indices);
143        self.row_group_indices.append(&mut other.row_group_indices);
144        self.num_rows += other.num_rows;
145    }
146
147    /// Returns true if we can split the range into multiple smaller ranges and
148    /// still preserve the order for [SeqScan].
149    fn can_split_preserve_order(&self) -> bool {
150        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
151    }
152
153    /// Splits the range if it can preserve the order.
154    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
155        if self.can_split_preserve_order() {
156            let num_row_groups = self.indices[0].num_row_groups;
157            debug_assert_eq!(1, self.row_group_indices.len());
158            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
159
160            output.reserve(self.row_group_indices.len());
161            let num_rows = self.num_rows / num_row_groups as usize;
162            // Splits by row group.
163            for row_group_index in 0..num_row_groups {
164                output.push(RangeMeta {
165                    time_range: self.time_range,
166                    indices: self.indices.clone(),
167                    row_group_indices: smallvec![RowGroupIndex {
168                        index: self.indices[0].index,
169                        row_group_index: row_group_index as i64,
170                    }],
171                    num_rows,
172                });
173            }
174        } else {
175            output.push(self);
176        }
177    }
178
179    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
180        // For append mode, we can parallelize reading memtables.
181        for (memtable_index, memtable) in memtables.iter().enumerate() {
182            let stats = memtable.stats();
183            let Some(time_range) = stats.time_range() else {
184                continue;
185            };
186            for row_group_index in 0..stats.num_ranges() {
187                let num_rows = stats.num_rows() / stats.num_ranges();
188                ranges.push(RangeMeta {
189                    time_range,
190                    indices: smallvec![SourceIndex {
191                        index: memtable_index,
192                        num_row_groups: stats.num_ranges() as u64,
193                    }],
194                    row_group_indices: smallvec![RowGroupIndex {
195                        index: memtable_index,
196                        row_group_index: row_group_index as i64,
197                    }],
198                    num_rows,
199                });
200            }
201        }
202    }
203
204    fn push_unordered_file_ranges(
205        num_memtables: usize,
206        files: &[FileHandle],
207        cache: &CacheStrategy,
208        ranges: &mut Vec<RangeMeta>,
209    ) {
210        // For append mode, we can parallelize reading row groups.
211        for (i, file) in files.iter().enumerate() {
212            let file_index = num_memtables + i;
213            // Get parquet meta from the cache.
214            let parquet_meta =
215                cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
216            if let Some(parquet_meta) = parquet_meta {
217                // Scans each row group.
218                for row_group_index in 0..file.meta_ref().num_row_groups {
219                    let time_range = parquet_row_group_time_range(
220                        file.meta_ref(),
221                        &parquet_meta,
222                        row_group_index as usize,
223                    );
224                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
225                    ranges.push(RangeMeta {
226                        time_range: time_range.unwrap_or_else(|| file.time_range()),
227                        indices: smallvec![SourceIndex {
228                            index: file_index,
229                            num_row_groups: file.meta_ref().num_row_groups,
230                        }],
231                        row_group_indices: smallvec![RowGroupIndex {
232                            index: file_index,
233                            row_group_index: row_group_index as i64,
234                        }],
235                        num_rows: num_rows as usize,
236                    });
237                }
238            } else if file.meta_ref().num_row_groups > 0 {
239                // Scans each row group.
240                for row_group_index in 0..file.meta_ref().num_row_groups {
241                    ranges.push(RangeMeta {
242                        time_range: file.time_range(),
243                        indices: smallvec![SourceIndex {
244                            index: file_index,
245                            num_row_groups: file.meta_ref().num_row_groups,
246                        }],
247                        row_group_indices: smallvec![RowGroupIndex {
248                            index: file_index,
249                            row_group_index: row_group_index as i64,
250                        }],
251                        num_rows: DEFAULT_ROW_GROUP_SIZE,
252                    });
253                }
254            } else {
255                // If we don't known the number of row groups in advance, scan all row groups.
256                ranges.push(RangeMeta {
257                    time_range: file.time_range(),
258                    indices: smallvec![SourceIndex {
259                        index: file_index,
260                        num_row_groups: 0,
261                    }],
262                    row_group_indices: smallvec![RowGroupIndex {
263                        index: file_index,
264                        row_group_index: ALL_ROW_GROUPS,
265                    }],
266                    // This may be 0.
267                    num_rows: file.meta_ref().num_rows as usize,
268                });
269            }
270        }
271    }
272
273    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
274        // For non append-only mode, each range only contains one memtable by default.
275        for (i, memtable) in memtables.iter().enumerate() {
276            let stats = memtable.stats();
277            let Some(time_range) = stats.time_range() else {
278                continue;
279            };
280            ranges.push(RangeMeta {
281                time_range,
282                indices: smallvec![SourceIndex {
283                    index: i,
284                    num_row_groups: stats.num_ranges() as u64,
285                }],
286                row_group_indices: smallvec![RowGroupIndex {
287                    index: i,
288                    row_group_index: ALL_ROW_GROUPS,
289                }],
290                num_rows: stats.num_rows(),
291            });
292        }
293    }
294
295    fn push_seq_file_ranges(
296        num_memtables: usize,
297        files: &[FileHandle],
298        ranges: &mut Vec<RangeMeta>,
299    ) {
300        // For non append-only mode, each range only contains one file.
301        for (i, file) in files.iter().enumerate() {
302            let file_index = num_memtables + i;
303            ranges.push(RangeMeta {
304                time_range: file.time_range(),
305                indices: smallvec![SourceIndex {
306                    index: file_index,
307                    num_row_groups: file.meta_ref().num_row_groups,
308                }],
309                row_group_indices: smallvec![RowGroupIndex {
310                    index: file_index,
311                    row_group_index: ALL_ROW_GROUPS,
312                }],
313                num_rows: file.meta_ref().num_rows as usize,
314            });
315        }
316    }
317}
318
319/// Groups ranges by time range.
320/// It assumes each input range only contains a file or a memtable.
321fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
322    if ranges.is_empty() {
323        return ranges;
324    }
325
326    // Sorts ranges by time range (start asc, end desc).
327    ranges.sort_unstable_by(|a, b| {
328        let l = a.time_range;
329        let r = b.time_range;
330        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
331    });
332    let mut range_in_progress = None;
333    // Parts with exclusive time ranges.
334    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
335    for range in ranges {
336        let Some(mut prev_range) = range_in_progress.take() else {
337            // This is the new range to process.
338            range_in_progress = Some(range);
339            continue;
340        };
341
342        if prev_range.overlaps(&range) {
343            prev_range.merge(range);
344            range_in_progress = Some(prev_range);
345        } else {
346            exclusive_ranges.push(prev_range);
347            range_in_progress = Some(range);
348        }
349    }
350    if let Some(range) = range_in_progress {
351        exclusive_ranges.push(range);
352    }
353
354    exclusive_ranges
355}
356
357/// Splits the range into multiple smaller ranges.
358/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
359fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
360    let mut new_ranges = Vec::with_capacity(ranges.len());
361    for range in ranges {
362        range.maybe_split(&mut new_ranges);
363    }
364
365    new_ranges
366}
367
368/// Builder to create file ranges.
369#[derive(Default)]
370pub(crate) struct FileRangeBuilder {
371    /// Context for the file.
372    /// None indicates nothing to read.
373    context: Option<FileRangeContextRef>,
374    /// Row selections for each row group to read.
375    /// It skips the row group if it is not in the map.
376    row_groups: BTreeMap<usize, Option<RowSelection>>,
377}
378
379impl FileRangeBuilder {
380    /// Builds a file range builder from context and row groups.
381    pub(crate) fn new(
382        context: FileRangeContextRef,
383        row_groups: BTreeMap<usize, Option<RowSelection>>,
384    ) -> Self {
385        Self {
386            context: Some(context),
387            row_groups,
388        }
389    }
390
391    /// Builds file ranges to read.
392    /// Negative `row_group_index` indicates all row groups.
393    pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
394        let Some(context) = self.context.clone() else {
395            return;
396        };
397        if row_group_index >= 0 {
398            let row_group_index = row_group_index as usize;
399            // Scans one row group.
400            let Some(row_selection) = self.row_groups.get(&row_group_index) else {
401                return;
402            };
403            ranges.push(FileRange::new(
404                context,
405                row_group_index,
406                row_selection.clone(),
407            ));
408        } else {
409            // Scans all row groups.
410            ranges.extend(
411                self.row_groups
412                    .iter()
413                    .map(|(row_group_index, row_selection)| {
414                        FileRange::new(context.clone(), *row_group_index, row_selection.clone())
415                    }),
416            );
417        }
418    }
419}
420
421/// Builder to create mem ranges.
422pub(crate) struct MemRangeBuilder {
423    /// Ranges of a memtable.
424    ranges: MemtableRanges,
425}
426
427impl MemRangeBuilder {
428    /// Builds a mem range builder from row groups.
429    pub(crate) fn new(ranges: MemtableRanges) -> Self {
430        Self { ranges }
431    }
432
433    /// Builds mem ranges to read in the memtable.
434    /// Negative `row_group_index` indicates all row groups.
435    pub(crate) fn build_ranges(
436        &self,
437        row_group_index: i64,
438        ranges: &mut SmallVec<[MemtableRange; 2]>,
439    ) {
440        if row_group_index >= 0 {
441            let row_group_index = row_group_index as usize;
442            // Scans one row group.
443            let Some(range) = self.ranges.ranges.get(&row_group_index) else {
444                return;
445            };
446            ranges.push(range.clone());
447        } else {
448            ranges.extend(self.ranges.ranges.values().cloned());
449        }
450    }
451
452    /// Returns the statistics of the memtable.
453    pub(crate) fn stats(&self) -> &MemtableStats {
454        &self.ranges.stats
455    }
456}
457
458/// List to manages the builders to create file ranges.
459/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
460/// the list to different streams in the same partition.
461pub(crate) struct RangeBuilderList {
462    num_memtables: usize,
463    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
464}
465
466impl RangeBuilderList {
467    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
468    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
469        let file_builders = (0..num_files).map(|_| None).collect();
470        Self {
471            num_memtables,
472            file_builders: Mutex::new(file_builders),
473        }
474    }
475
476    /// Builds file ranges to read the row group at `index`.
477    pub(crate) async fn build_file_ranges(
478        &self,
479        input: &ScanInput,
480        index: RowGroupIndex,
481        reader_metrics: &mut ReaderMetrics,
482    ) -> Result<SmallVec<[FileRange; 2]>> {
483        let mut ranges = SmallVec::new();
484        let file_index = index.index - self.num_memtables;
485        let builder_opt = self.get_file_builder(file_index);
486        match builder_opt {
487            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
488            None => {
489                let builder = input.prune_file(file_index, reader_metrics).await?;
490                builder.build_ranges(index.row_group_index, &mut ranges);
491                self.set_file_builder(file_index, Arc::new(builder));
492            }
493        }
494        Ok(ranges)
495    }
496
497    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
498        let file_builders = self.file_builders.lock().unwrap();
499        file_builders[index].clone()
500    }
501
502    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
503        let mut file_builders = self.file_builders.lock().unwrap();
504        file_builders[index] = Some(builder);
505    }
506}
507
508#[cfg(test)]
509mod tests {
510    use common_time::timestamp::TimeUnit;
511    use common_time::Timestamp;
512
513    use super::*;
514
515    type Output = (Vec<usize>, i64, i64);
516
517    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
518        let ranges = input
519            .iter()
520            .map(|(idx, start, end)| {
521                let time_range = (
522                    Timestamp::new(*start, TimeUnit::Second),
523                    Timestamp::new(*end, TimeUnit::Second),
524                );
525                RangeMeta {
526                    time_range,
527                    indices: smallvec![SourceIndex {
528                        index: *idx,
529                        num_row_groups: 0,
530                    }],
531                    row_group_indices: smallvec![RowGroupIndex {
532                        index: *idx,
533                        row_group_index: 0
534                    }],
535                    num_rows: 1,
536                }
537            })
538            .collect();
539        let output = group_ranges_for_seq_scan(ranges);
540        let actual: Vec<_> = output
541            .iter()
542            .map(|range| {
543                let indices = range.indices.iter().map(|index| index.index).collect();
544                let group_indices: Vec<_> = range
545                    .row_group_indices
546                    .iter()
547                    .map(|idx| idx.index)
548                    .collect();
549                assert_eq!(indices, group_indices);
550                let range = range.time_range;
551                (indices, range.0.value(), range.1.value())
552            })
553            .collect();
554        assert_eq!(expect, actual);
555    }
556
557    #[test]
558    fn test_group_ranges() {
559        // Group 1 part.
560        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
561
562        // 1, 2, 3, 4 => [3, 1, 4], [2]
563        run_group_ranges_test(
564            &[
565                (1, 1000, 2000),
566                (2, 6000, 7000),
567                (3, 0, 1500),
568                (4, 1500, 3000),
569            ],
570            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
571        );
572
573        // 1, 2, 3 => [3], [1], [2],
574        run_group_ranges_test(
575            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
576            &[
577                (vec![3], 0, 1000),
578                (vec![1], 3000, 4000),
579                (vec![2], 4001, 6000),
580            ],
581        );
582
583        // 1, 2, 3 => [3], [1, 2]
584        run_group_ranges_test(
585            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
586            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
587        );
588    }
589
590    #[test]
591    fn test_merge_range() {
592        let mut left = RangeMeta {
593            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
594            indices: smallvec![SourceIndex {
595                index: 1,
596                num_row_groups: 2,
597            }],
598            row_group_indices: smallvec![
599                RowGroupIndex {
600                    index: 1,
601                    row_group_index: 1
602                },
603                RowGroupIndex {
604                    index: 1,
605                    row_group_index: 2
606                }
607            ],
608            num_rows: 5,
609        };
610        let right = RangeMeta {
611            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
612            indices: smallvec![SourceIndex {
613                index: 2,
614                num_row_groups: 2,
615            }],
616            row_group_indices: smallvec![
617                RowGroupIndex {
618                    index: 2,
619                    row_group_index: 1
620                },
621                RowGroupIndex {
622                    index: 2,
623                    row_group_index: 2
624                }
625            ],
626            num_rows: 4,
627        };
628        left.merge(right);
629
630        assert_eq!(
631            left,
632            RangeMeta {
633                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
634                indices: smallvec![
635                    SourceIndex {
636                        index: 1,
637                        num_row_groups: 2
638                    },
639                    SourceIndex {
640                        index: 2,
641                        num_row_groups: 2
642                    }
643                ],
644                row_group_indices: smallvec![
645                    RowGroupIndex {
646                        index: 1,
647                        row_group_index: 1
648                    },
649                    RowGroupIndex {
650                        index: 1,
651                        row_group_index: 2
652                    },
653                    RowGroupIndex {
654                        index: 2,
655                        row_group_index: 1
656                    },
657                    RowGroupIndex {
658                        index: 2,
659                        row_group_index: 2
660                    },
661                ],
662                num_rows: 9,
663            }
664        );
665    }
666
667    #[test]
668    fn test_split_range() {
669        let range = RangeMeta {
670            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
671            indices: smallvec![SourceIndex {
672                index: 1,
673                num_row_groups: 2,
674            }],
675            row_group_indices: smallvec![RowGroupIndex {
676                index: 1,
677                row_group_index: ALL_ROW_GROUPS,
678            }],
679            num_rows: 5,
680        };
681
682        assert!(range.can_split_preserve_order());
683        let mut output = Vec::new();
684        range.maybe_split(&mut output);
685
686        assert_eq!(
687            output,
688            &[
689                RangeMeta {
690                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
691                    indices: smallvec![SourceIndex {
692                        index: 1,
693                        num_row_groups: 2,
694                    }],
695                    row_group_indices: smallvec![RowGroupIndex {
696                        index: 1,
697                        row_group_index: 0
698                    },],
699                    num_rows: 2,
700                },
701                RangeMeta {
702                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
703                    indices: smallvec![SourceIndex {
704                        index: 1,
705                        num_row_groups: 2,
706                    }],
707                    row_group_indices: smallvec![RowGroupIndex {
708                        index: 1,
709                        row_group_index: 1
710                    }],
711                    num_rows: 2,
712                }
713            ]
714        );
715    }
716
717    #[test]
718    fn test_not_split_range() {
719        let range = RangeMeta {
720            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
721            indices: smallvec![
722                SourceIndex {
723                    index: 1,
724                    num_row_groups: 1,
725                },
726                SourceIndex {
727                    index: 2,
728                    num_row_groups: 1,
729                }
730            ],
731            row_group_indices: smallvec![
732                RowGroupIndex {
733                    index: 1,
734                    row_group_index: 1
735                },
736                RowGroupIndex {
737                    index: 2,
738                    row_group_index: 1
739                }
740            ],
741            num_rows: 5,
742        };
743
744        assert!(!range.can_split_preserve_order());
745        let mut output = Vec::new();
746        range.maybe_split(&mut output);
747        assert_eq!(1, output.len());
748    }
749
750    #[test]
751    fn test_maybe_split_ranges() {
752        let ranges = vec![
753            RangeMeta {
754                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
755                indices: smallvec![SourceIndex {
756                    index: 0,
757                    num_row_groups: 1,
758                }],
759                row_group_indices: smallvec![RowGroupIndex {
760                    index: 0,
761                    row_group_index: 0,
762                },],
763                num_rows: 4,
764            },
765            RangeMeta {
766                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
767                indices: smallvec![SourceIndex {
768                    index: 1,
769                    num_row_groups: 2,
770                }],
771                row_group_indices: smallvec![RowGroupIndex {
772                    index: 1,
773                    row_group_index: ALL_ROW_GROUPS,
774                },],
775                num_rows: 4,
776            },
777            RangeMeta {
778                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
779                indices: smallvec![
780                    SourceIndex {
781                        index: 2,
782                        num_row_groups: 2,
783                    },
784                    SourceIndex {
785                        index: 3,
786                        num_row_groups: 0,
787                    }
788                ],
789                row_group_indices: smallvec![
790                    RowGroupIndex {
791                        index: 2,
792                        row_group_index: ALL_ROW_GROUPS,
793                    },
794                    RowGroupIndex {
795                        index: 3,
796                        row_group_index: ALL_ROW_GROUPS,
797                    }
798                ],
799                num_rows: 5,
800            },
801        ];
802        let output = maybe_split_ranges_for_seq_scan(ranges);
803        assert_eq!(
804            output,
805            vec![
806                RangeMeta {
807                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
808                    indices: smallvec![SourceIndex {
809                        index: 0,
810                        num_row_groups: 1,
811                    }],
812                    row_group_indices: smallvec![RowGroupIndex {
813                        index: 0,
814                        row_group_index: 0
815                    },],
816                    num_rows: 4,
817                },
818                RangeMeta {
819                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
820                    indices: smallvec![SourceIndex {
821                        index: 1,
822                        num_row_groups: 2,
823                    }],
824                    row_group_indices: smallvec![RowGroupIndex {
825                        index: 1,
826                        row_group_index: 0
827                    },],
828                    num_rows: 2,
829                },
830                RangeMeta {
831                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
832                    indices: smallvec![SourceIndex {
833                        index: 1,
834                        num_row_groups: 2,
835                    }],
836                    row_group_indices: smallvec![RowGroupIndex {
837                        index: 1,
838                        row_group_index: 1
839                    }],
840                    num_rows: 2,
841                },
842                RangeMeta {
843                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
844                    indices: smallvec![
845                        SourceIndex {
846                            index: 2,
847                            num_row_groups: 2
848                        },
849                        SourceIndex {
850                            index: 3,
851                            num_row_groups: 0,
852                        }
853                    ],
854                    row_group_indices: smallvec![
855                        RowGroupIndex {
856                            index: 2,
857                            row_group_index: ALL_ROW_GROUPS,
858                        },
859                        RowGroupIndex {
860                            index: 3,
861                            row_group_index: ALL_ROW_GROUPS,
862                        }
863                    ],
864                    num_rows: 5,
865                },
866            ]
867        )
868    }
869}