mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        #[cfg(feature = "enterprise")]
101        Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
102
103        let ranges = group_ranges_for_seq_scan(ranges);
104        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
106            return ranges;
107        }
108        maybe_split_ranges_for_seq_scan(ranges)
109    }
110
111    /// Creates a list of ranges from the `input` for unordered scan.
112    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115        Self::push_unordered_file_ranges(
116            input.memtables.len(),
117            &input.files,
118            &input.cache_strategy,
119            &mut ranges,
120        );
121
122        #[cfg(feature = "enterprise")]
123        Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
124
125        ranges
126    }
127
128    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
129    fn overlaps(&self, meta: &RangeMeta) -> bool {
130        overlaps(&self.time_range, &meta.time_range)
131    }
132
133    /// Merges given `meta` to this meta.
134    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
135    fn merge(&mut self, mut other: RangeMeta) {
136        debug_assert!(self.overlaps(&other));
137        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138        debug_assert!(self
139            .row_group_indices
140            .iter()
141            .all(|idx| !other.row_group_indices.contains(idx)));
142
143        self.time_range = (
144            self.time_range.0.min(other.time_range.0),
145            self.time_range.1.max(other.time_range.1),
146        );
147        self.indices.append(&mut other.indices);
148        self.row_group_indices.append(&mut other.row_group_indices);
149        self.num_rows += other.num_rows;
150    }
151
152    /// Returns true if we can split the range into multiple smaller ranges and
153    /// still preserve the order for [SeqScan].
154    fn can_split_preserve_order(&self) -> bool {
155        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
156    }
157
158    /// Splits the range if it can preserve the order.
159    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
160        if self.can_split_preserve_order() {
161            let num_row_groups = self.indices[0].num_row_groups;
162            debug_assert_eq!(1, self.row_group_indices.len());
163            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
164
165            output.reserve(self.row_group_indices.len());
166            let num_rows = self.num_rows / num_row_groups as usize;
167            // Splits by row group.
168            for row_group_index in 0..num_row_groups {
169                output.push(RangeMeta {
170                    time_range: self.time_range,
171                    indices: self.indices.clone(),
172                    row_group_indices: smallvec![RowGroupIndex {
173                        index: self.indices[0].index,
174                        row_group_index: row_group_index as i64,
175                    }],
176                    num_rows,
177                });
178            }
179        } else {
180            output.push(self);
181        }
182    }
183
184    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
185        // For append mode, we can parallelize reading memtables.
186        for (memtable_index, memtable) in memtables.iter().enumerate() {
187            let stats = memtable.stats();
188            let Some(time_range) = stats.time_range() else {
189                continue;
190            };
191            for row_group_index in 0..stats.num_ranges() {
192                let num_rows = stats.num_rows() / stats.num_ranges();
193                ranges.push(RangeMeta {
194                    time_range,
195                    indices: smallvec![SourceIndex {
196                        index: memtable_index,
197                        num_row_groups: stats.num_ranges() as u64,
198                    }],
199                    row_group_indices: smallvec![RowGroupIndex {
200                        index: memtable_index,
201                        row_group_index: row_group_index as i64,
202                    }],
203                    num_rows,
204                });
205            }
206        }
207    }
208
209    fn push_unordered_file_ranges(
210        num_memtables: usize,
211        files: &[FileHandle],
212        cache: &CacheStrategy,
213        ranges: &mut Vec<RangeMeta>,
214    ) {
215        // For append mode, we can parallelize reading row groups.
216        for (i, file) in files.iter().enumerate() {
217            let file_index = num_memtables + i;
218            // Get parquet meta from the cache.
219            let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
220            if let Some(parquet_meta) = parquet_meta {
221                // Scans each row group.
222                for row_group_index in 0..file.meta_ref().num_row_groups {
223                    let time_range = parquet_row_group_time_range(
224                        file.meta_ref(),
225                        &parquet_meta,
226                        row_group_index as usize,
227                    );
228                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
229                    ranges.push(RangeMeta {
230                        time_range: time_range.unwrap_or_else(|| file.time_range()),
231                        indices: smallvec![SourceIndex {
232                            index: file_index,
233                            num_row_groups: file.meta_ref().num_row_groups,
234                        }],
235                        row_group_indices: smallvec![RowGroupIndex {
236                            index: file_index,
237                            row_group_index: row_group_index as i64,
238                        }],
239                        num_rows: num_rows as usize,
240                    });
241                }
242            } else if file.meta_ref().num_row_groups > 0 {
243                // Scans each row group.
244                for row_group_index in 0..file.meta_ref().num_row_groups {
245                    ranges.push(RangeMeta {
246                        time_range: file.time_range(),
247                        indices: smallvec![SourceIndex {
248                            index: file_index,
249                            num_row_groups: file.meta_ref().num_row_groups,
250                        }],
251                        row_group_indices: smallvec![RowGroupIndex {
252                            index: file_index,
253                            row_group_index: row_group_index as i64,
254                        }],
255                        num_rows: DEFAULT_ROW_GROUP_SIZE,
256                    });
257                }
258            } else {
259                // If we don't known the number of row groups in advance, scan all row groups.
260                ranges.push(RangeMeta {
261                    time_range: file.time_range(),
262                    indices: smallvec![SourceIndex {
263                        index: file_index,
264                        num_row_groups: 0,
265                    }],
266                    row_group_indices: smallvec![RowGroupIndex {
267                        index: file_index,
268                        row_group_index: ALL_ROW_GROUPS,
269                    }],
270                    // This may be 0.
271                    num_rows: file.meta_ref().num_rows as usize,
272                });
273            }
274        }
275    }
276
277    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
278        // For non append-only mode, each range only contains one memtable by default.
279        for (i, memtable) in memtables.iter().enumerate() {
280            let stats = memtable.stats();
281            let Some(time_range) = stats.time_range() else {
282                continue;
283            };
284            ranges.push(RangeMeta {
285                time_range,
286                indices: smallvec![SourceIndex {
287                    index: i,
288                    num_row_groups: stats.num_ranges() as u64,
289                }],
290                row_group_indices: smallvec![RowGroupIndex {
291                    index: i,
292                    row_group_index: ALL_ROW_GROUPS,
293                }],
294                num_rows: stats.num_rows(),
295            });
296        }
297    }
298
299    fn push_seq_file_ranges(
300        num_memtables: usize,
301        files: &[FileHandle],
302        ranges: &mut Vec<RangeMeta>,
303    ) {
304        // For non append-only mode, each range only contains one file.
305        for (i, file) in files.iter().enumerate() {
306            let file_index = num_memtables + i;
307            ranges.push(RangeMeta {
308                time_range: file.time_range(),
309                indices: smallvec![SourceIndex {
310                    index: file_index,
311                    num_row_groups: file.meta_ref().num_row_groups,
312                }],
313                row_group_indices: smallvec![RowGroupIndex {
314                    index: file_index,
315                    row_group_index: ALL_ROW_GROUPS,
316                }],
317                num_rows: file.meta_ref().num_rows as usize,
318            });
319        }
320    }
321
322    #[cfg(feature = "enterprise")]
323    fn push_extension_ranges(
324        ranges: &[crate::extension::BoxedExtensionRange],
325        metas: &mut Vec<RangeMeta>,
326    ) {
327        for range in ranges.iter() {
328            let index = metas.len();
329            metas.push(RangeMeta {
330                time_range: range.time_range(),
331                indices: smallvec![SourceIndex {
332                    index,
333                    num_row_groups: range.num_row_groups(),
334                }],
335                row_group_indices: smallvec![RowGroupIndex {
336                    index,
337                    row_group_index: ALL_ROW_GROUPS,
338                }],
339                num_rows: range.num_rows() as usize,
340            });
341        }
342    }
343}
344
345/// Groups ranges by time range.
346/// It assumes each input range only contains a file or a memtable.
347fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
348    if ranges.is_empty() {
349        return ranges;
350    }
351
352    // Sorts ranges by time range (start asc, end desc).
353    ranges.sort_unstable_by(|a, b| {
354        let l = a.time_range;
355        let r = b.time_range;
356        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
357    });
358    let mut range_in_progress = None;
359    // Parts with exclusive time ranges.
360    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
361    for range in ranges {
362        let Some(mut prev_range) = range_in_progress.take() else {
363            // This is the new range to process.
364            range_in_progress = Some(range);
365            continue;
366        };
367
368        if prev_range.overlaps(&range) {
369            prev_range.merge(range);
370            range_in_progress = Some(prev_range);
371        } else {
372            exclusive_ranges.push(prev_range);
373            range_in_progress = Some(range);
374        }
375    }
376    if let Some(range) = range_in_progress {
377        exclusive_ranges.push(range);
378    }
379
380    exclusive_ranges
381}
382
383/// Splits the range into multiple smaller ranges.
384/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
385fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
386    let mut new_ranges = Vec::with_capacity(ranges.len());
387    for range in ranges {
388        range.maybe_split(&mut new_ranges);
389    }
390
391    new_ranges
392}
393
394/// Builder to create file ranges.
395#[derive(Default)]
396pub struct FileRangeBuilder {
397    /// Context for the file.
398    /// None indicates nothing to read.
399    context: Option<FileRangeContextRef>,
400    /// Row group selection for the file to read.
401    selection: RowGroupSelection,
402}
403
404impl FileRangeBuilder {
405    /// Builds a file range builder from context and row groups.
406    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
407        Self {
408            context: Some(context),
409            selection,
410        }
411    }
412
413    /// Builds file ranges to read.
414    /// Negative `row_group_index` indicates all row groups.
415    pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
416        let Some(context) = self.context.clone() else {
417            return;
418        };
419        if row_group_index >= 0 {
420            let row_group_index = row_group_index as usize;
421            // Scans one row group.
422            let Some(row_selection) = self.selection.get(row_group_index) else {
423                return;
424            };
425            ranges.push(FileRange::new(
426                context,
427                row_group_index,
428                Some(row_selection.clone()),
429            ));
430        } else {
431            // Scans all row groups.
432            ranges.extend(
433                self.selection
434                    .iter()
435                    .map(|(row_group_index, row_selection)| {
436                        FileRange::new(
437                            context.clone(),
438                            *row_group_index,
439                            Some(row_selection.clone()),
440                        )
441                    }),
442            );
443        }
444    }
445}
446
447/// Builder to create mem ranges.
448pub(crate) struct MemRangeBuilder {
449    /// Ranges of a memtable.
450    range: MemtableRange,
451    /// Stats of a memtable.
452    stats: MemtableStats,
453}
454
455impl MemRangeBuilder {
456    /// Builds a mem range builder from row groups.
457    pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
458        Self { range, stats }
459    }
460
461    /// Builds mem ranges to read in the memtable.
462    /// Negative `row_group_index` indicates all row groups.
463    pub(crate) fn build_ranges(
464        &self,
465        _row_group_index: i64,
466        ranges: &mut SmallVec<[MemtableRange; 2]>,
467    ) {
468        ranges.push(self.range.clone())
469    }
470
471    /// Returns the statistics of the memtable.
472    pub(crate) fn stats(&self) -> &MemtableStats {
473        &self.stats
474    }
475}
476
477/// List to manages the builders to create file ranges.
478/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
479/// the list to different streams in the same partition.
480pub(crate) struct RangeBuilderList {
481    num_memtables: usize,
482    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
483}
484
485impl RangeBuilderList {
486    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
487    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
488        let file_builders = (0..num_files).map(|_| None).collect();
489        Self {
490            num_memtables,
491            file_builders: Mutex::new(file_builders),
492        }
493    }
494
495    /// Builds file ranges to read the row group at `index`.
496    pub(crate) async fn build_file_ranges(
497        &self,
498        input: &ScanInput,
499        index: RowGroupIndex,
500        reader_metrics: &mut ReaderMetrics,
501    ) -> Result<SmallVec<[FileRange; 2]>> {
502        let mut ranges = SmallVec::new();
503        let file_index = index.index - self.num_memtables;
504        let builder_opt = self.get_file_builder(file_index);
505        match builder_opt {
506            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
507            None => {
508                let file = &input.files[file_index];
509                let builder = input.prune_file(file, reader_metrics).await?;
510                builder.build_ranges(index.row_group_index, &mut ranges);
511                self.set_file_builder(file_index, Arc::new(builder));
512            }
513        }
514        Ok(ranges)
515    }
516
517    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
518        let file_builders = self.file_builders.lock().unwrap();
519        file_builders[index].clone()
520    }
521
522    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
523        let mut file_builders = self.file_builders.lock().unwrap();
524        file_builders[index] = Some(builder);
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use common_time::timestamp::TimeUnit;
531    use common_time::Timestamp;
532
533    use super::*;
534
535    type Output = (Vec<usize>, i64, i64);
536
537    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
538        let ranges = input
539            .iter()
540            .map(|(idx, start, end)| {
541                let time_range = (
542                    Timestamp::new(*start, TimeUnit::Second),
543                    Timestamp::new(*end, TimeUnit::Second),
544                );
545                RangeMeta {
546                    time_range,
547                    indices: smallvec![SourceIndex {
548                        index: *idx,
549                        num_row_groups: 0,
550                    }],
551                    row_group_indices: smallvec![RowGroupIndex {
552                        index: *idx,
553                        row_group_index: 0
554                    }],
555                    num_rows: 1,
556                }
557            })
558            .collect();
559        let output = group_ranges_for_seq_scan(ranges);
560        let actual: Vec<_> = output
561            .iter()
562            .map(|range| {
563                let indices = range.indices.iter().map(|index| index.index).collect();
564                let group_indices: Vec<_> = range
565                    .row_group_indices
566                    .iter()
567                    .map(|idx| idx.index)
568                    .collect();
569                assert_eq!(indices, group_indices);
570                let range = range.time_range;
571                (indices, range.0.value(), range.1.value())
572            })
573            .collect();
574        assert_eq!(expect, actual);
575    }
576
577    #[test]
578    fn test_group_ranges() {
579        // Group 1 part.
580        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
581
582        // 1, 2, 3, 4 => [3, 1, 4], [2]
583        run_group_ranges_test(
584            &[
585                (1, 1000, 2000),
586                (2, 6000, 7000),
587                (3, 0, 1500),
588                (4, 1500, 3000),
589            ],
590            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
591        );
592
593        // 1, 2, 3 => [3], [1], [2],
594        run_group_ranges_test(
595            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
596            &[
597                (vec![3], 0, 1000),
598                (vec![1], 3000, 4000),
599                (vec![2], 4001, 6000),
600            ],
601        );
602
603        // 1, 2, 3 => [3], [1, 2]
604        run_group_ranges_test(
605            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
606            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
607        );
608    }
609
610    #[test]
611    fn test_merge_range() {
612        let mut left = RangeMeta {
613            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
614            indices: smallvec![SourceIndex {
615                index: 1,
616                num_row_groups: 2,
617            }],
618            row_group_indices: smallvec![
619                RowGroupIndex {
620                    index: 1,
621                    row_group_index: 1
622                },
623                RowGroupIndex {
624                    index: 1,
625                    row_group_index: 2
626                }
627            ],
628            num_rows: 5,
629        };
630        let right = RangeMeta {
631            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
632            indices: smallvec![SourceIndex {
633                index: 2,
634                num_row_groups: 2,
635            }],
636            row_group_indices: smallvec![
637                RowGroupIndex {
638                    index: 2,
639                    row_group_index: 1
640                },
641                RowGroupIndex {
642                    index: 2,
643                    row_group_index: 2
644                }
645            ],
646            num_rows: 4,
647        };
648        left.merge(right);
649
650        assert_eq!(
651            left,
652            RangeMeta {
653                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
654                indices: smallvec![
655                    SourceIndex {
656                        index: 1,
657                        num_row_groups: 2
658                    },
659                    SourceIndex {
660                        index: 2,
661                        num_row_groups: 2
662                    }
663                ],
664                row_group_indices: smallvec![
665                    RowGroupIndex {
666                        index: 1,
667                        row_group_index: 1
668                    },
669                    RowGroupIndex {
670                        index: 1,
671                        row_group_index: 2
672                    },
673                    RowGroupIndex {
674                        index: 2,
675                        row_group_index: 1
676                    },
677                    RowGroupIndex {
678                        index: 2,
679                        row_group_index: 2
680                    },
681                ],
682                num_rows: 9,
683            }
684        );
685    }
686
687    #[test]
688    fn test_split_range() {
689        let range = RangeMeta {
690            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
691            indices: smallvec![SourceIndex {
692                index: 1,
693                num_row_groups: 2,
694            }],
695            row_group_indices: smallvec![RowGroupIndex {
696                index: 1,
697                row_group_index: ALL_ROW_GROUPS,
698            }],
699            num_rows: 5,
700        };
701
702        assert!(range.can_split_preserve_order());
703        let mut output = Vec::new();
704        range.maybe_split(&mut output);
705
706        assert_eq!(
707            output,
708            &[
709                RangeMeta {
710                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
711                    indices: smallvec![SourceIndex {
712                        index: 1,
713                        num_row_groups: 2,
714                    }],
715                    row_group_indices: smallvec![RowGroupIndex {
716                        index: 1,
717                        row_group_index: 0
718                    },],
719                    num_rows: 2,
720                },
721                RangeMeta {
722                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
723                    indices: smallvec![SourceIndex {
724                        index: 1,
725                        num_row_groups: 2,
726                    }],
727                    row_group_indices: smallvec![RowGroupIndex {
728                        index: 1,
729                        row_group_index: 1
730                    }],
731                    num_rows: 2,
732                }
733            ]
734        );
735    }
736
737    #[test]
738    fn test_not_split_range() {
739        let range = RangeMeta {
740            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
741            indices: smallvec![
742                SourceIndex {
743                    index: 1,
744                    num_row_groups: 1,
745                },
746                SourceIndex {
747                    index: 2,
748                    num_row_groups: 1,
749                }
750            ],
751            row_group_indices: smallvec![
752                RowGroupIndex {
753                    index: 1,
754                    row_group_index: 1
755                },
756                RowGroupIndex {
757                    index: 2,
758                    row_group_index: 1
759                }
760            ],
761            num_rows: 5,
762        };
763
764        assert!(!range.can_split_preserve_order());
765        let mut output = Vec::new();
766        range.maybe_split(&mut output);
767        assert_eq!(1, output.len());
768    }
769
770    #[test]
771    fn test_maybe_split_ranges() {
772        let ranges = vec![
773            RangeMeta {
774                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
775                indices: smallvec![SourceIndex {
776                    index: 0,
777                    num_row_groups: 1,
778                }],
779                row_group_indices: smallvec![RowGroupIndex {
780                    index: 0,
781                    row_group_index: 0,
782                },],
783                num_rows: 4,
784            },
785            RangeMeta {
786                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
787                indices: smallvec![SourceIndex {
788                    index: 1,
789                    num_row_groups: 2,
790                }],
791                row_group_indices: smallvec![RowGroupIndex {
792                    index: 1,
793                    row_group_index: ALL_ROW_GROUPS,
794                },],
795                num_rows: 4,
796            },
797            RangeMeta {
798                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
799                indices: smallvec![
800                    SourceIndex {
801                        index: 2,
802                        num_row_groups: 2,
803                    },
804                    SourceIndex {
805                        index: 3,
806                        num_row_groups: 0,
807                    }
808                ],
809                row_group_indices: smallvec![
810                    RowGroupIndex {
811                        index: 2,
812                        row_group_index: ALL_ROW_GROUPS,
813                    },
814                    RowGroupIndex {
815                        index: 3,
816                        row_group_index: ALL_ROW_GROUPS,
817                    }
818                ],
819                num_rows: 5,
820            },
821        ];
822        let output = maybe_split_ranges_for_seq_scan(ranges);
823        assert_eq!(
824            output,
825            vec![
826                RangeMeta {
827                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
828                    indices: smallvec![SourceIndex {
829                        index: 0,
830                        num_row_groups: 1,
831                    }],
832                    row_group_indices: smallvec![RowGroupIndex {
833                        index: 0,
834                        row_group_index: 0
835                    },],
836                    num_rows: 4,
837                },
838                RangeMeta {
839                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
840                    indices: smallvec![SourceIndex {
841                        index: 1,
842                        num_row_groups: 2,
843                    }],
844                    row_group_indices: smallvec![RowGroupIndex {
845                        index: 1,
846                        row_group_index: 0
847                    },],
848                    num_rows: 2,
849                },
850                RangeMeta {
851                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
852                    indices: smallvec![SourceIndex {
853                        index: 1,
854                        num_row_groups: 2,
855                    }],
856                    row_group_indices: smallvec![RowGroupIndex {
857                        index: 1,
858                        row_group_index: 1
859                    }],
860                    num_rows: 2,
861                },
862                RangeMeta {
863                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
864                    indices: smallvec![
865                        SourceIndex {
866                            index: 2,
867                            num_row_groups: 2
868                        },
869                        SourceIndex {
870                            index: 3,
871                            num_row_groups: 0,
872                        }
873                    ],
874                    row_group_indices: smallvec![
875                        RowGroupIndex {
876                            index: 2,
877                            row_group_index: ALL_ROW_GROUPS,
878                        },
879                        RowGroupIndex {
880                            index: 3,
881                            row_group_index: ALL_ROW_GROUPS,
882                        }
883                    ],
884                    num_rows: 5,
885                },
886            ]
887        )
888    }
889}