mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        #[cfg(feature = "enterprise")]
101        Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
102
103        let ranges = group_ranges_for_seq_scan(ranges);
104        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
106            return ranges;
107        }
108        maybe_split_ranges_for_seq_scan(ranges)
109    }
110
111    /// Creates a list of ranges from the `input` for unordered scan.
112    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115        Self::push_unordered_file_ranges(
116            input.memtables.len(),
117            &input.files,
118            &input.cache_strategy,
119            &mut ranges,
120        );
121
122        #[cfg(feature = "enterprise")]
123        Self::push_extension_ranges(input.extension_ranges(), &mut ranges);
124
125        ranges
126    }
127
128    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
129    fn overlaps(&self, meta: &RangeMeta) -> bool {
130        overlaps(&self.time_range, &meta.time_range)
131    }
132
133    /// Merges given `meta` to this meta.
134    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
135    fn merge(&mut self, mut other: RangeMeta) {
136        debug_assert!(self.overlaps(&other));
137        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138        debug_assert!(self
139            .row_group_indices
140            .iter()
141            .all(|idx| !other.row_group_indices.contains(idx)));
142
143        self.time_range = (
144            self.time_range.0.min(other.time_range.0),
145            self.time_range.1.max(other.time_range.1),
146        );
147        self.indices.append(&mut other.indices);
148        self.row_group_indices.append(&mut other.row_group_indices);
149        self.num_rows += other.num_rows;
150    }
151
152    /// Returns true if we can split the range into multiple smaller ranges and
153    /// still preserve the order for [SeqScan].
154    fn can_split_preserve_order(&self) -> bool {
155        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
156    }
157
158    /// Splits the range if it can preserve the order.
159    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
160        if self.can_split_preserve_order() {
161            let num_row_groups = self.indices[0].num_row_groups;
162            debug_assert_eq!(1, self.row_group_indices.len());
163            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
164
165            output.reserve(self.row_group_indices.len());
166            let num_rows = self.num_rows / num_row_groups as usize;
167            // Splits by row group.
168            for row_group_index in 0..num_row_groups {
169                output.push(RangeMeta {
170                    time_range: self.time_range,
171                    indices: self.indices.clone(),
172                    row_group_indices: smallvec![RowGroupIndex {
173                        index: self.indices[0].index,
174                        row_group_index: row_group_index as i64,
175                    }],
176                    num_rows,
177                });
178            }
179        } else {
180            output.push(self);
181        }
182    }
183
184    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
185        // For append mode, we can parallelize reading memtables.
186        for (memtable_index, memtable) in memtables.iter().enumerate() {
187            let stats = memtable.stats();
188            let Some(time_range) = stats.time_range() else {
189                continue;
190            };
191            for row_group_index in 0..stats.num_ranges() {
192                let num_rows = stats.num_rows() / stats.num_ranges();
193                ranges.push(RangeMeta {
194                    time_range,
195                    indices: smallvec![SourceIndex {
196                        index: memtable_index,
197                        num_row_groups: stats.num_ranges() as u64,
198                    }],
199                    row_group_indices: smallvec![RowGroupIndex {
200                        index: memtable_index,
201                        row_group_index: row_group_index as i64,
202                    }],
203                    num_rows,
204                });
205            }
206        }
207    }
208
209    fn push_unordered_file_ranges(
210        num_memtables: usize,
211        files: &[FileHandle],
212        cache: &CacheStrategy,
213        ranges: &mut Vec<RangeMeta>,
214    ) {
215        // For append mode, we can parallelize reading row groups.
216        for (i, file) in files.iter().enumerate() {
217            let file_index = num_memtables + i;
218            // Get parquet meta from the cache.
219            let parquet_meta =
220                cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
221            if let Some(parquet_meta) = parquet_meta {
222                // Scans each row group.
223                for row_group_index in 0..file.meta_ref().num_row_groups {
224                    let time_range = parquet_row_group_time_range(
225                        file.meta_ref(),
226                        &parquet_meta,
227                        row_group_index as usize,
228                    );
229                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
230                    ranges.push(RangeMeta {
231                        time_range: time_range.unwrap_or_else(|| file.time_range()),
232                        indices: smallvec![SourceIndex {
233                            index: file_index,
234                            num_row_groups: file.meta_ref().num_row_groups,
235                        }],
236                        row_group_indices: smallvec![RowGroupIndex {
237                            index: file_index,
238                            row_group_index: row_group_index as i64,
239                        }],
240                        num_rows: num_rows as usize,
241                    });
242                }
243            } else if file.meta_ref().num_row_groups > 0 {
244                // Scans each row group.
245                for row_group_index in 0..file.meta_ref().num_row_groups {
246                    ranges.push(RangeMeta {
247                        time_range: file.time_range(),
248                        indices: smallvec![SourceIndex {
249                            index: file_index,
250                            num_row_groups: file.meta_ref().num_row_groups,
251                        }],
252                        row_group_indices: smallvec![RowGroupIndex {
253                            index: file_index,
254                            row_group_index: row_group_index as i64,
255                        }],
256                        num_rows: DEFAULT_ROW_GROUP_SIZE,
257                    });
258                }
259            } else {
260                // If we don't known the number of row groups in advance, scan all row groups.
261                ranges.push(RangeMeta {
262                    time_range: file.time_range(),
263                    indices: smallvec![SourceIndex {
264                        index: file_index,
265                        num_row_groups: 0,
266                    }],
267                    row_group_indices: smallvec![RowGroupIndex {
268                        index: file_index,
269                        row_group_index: ALL_ROW_GROUPS,
270                    }],
271                    // This may be 0.
272                    num_rows: file.meta_ref().num_rows as usize,
273                });
274            }
275        }
276    }
277
278    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
279        // For non append-only mode, each range only contains one memtable by default.
280        for (i, memtable) in memtables.iter().enumerate() {
281            let stats = memtable.stats();
282            let Some(time_range) = stats.time_range() else {
283                continue;
284            };
285            ranges.push(RangeMeta {
286                time_range,
287                indices: smallvec![SourceIndex {
288                    index: i,
289                    num_row_groups: stats.num_ranges() as u64,
290                }],
291                row_group_indices: smallvec![RowGroupIndex {
292                    index: i,
293                    row_group_index: ALL_ROW_GROUPS,
294                }],
295                num_rows: stats.num_rows(),
296            });
297        }
298    }
299
300    fn push_seq_file_ranges(
301        num_memtables: usize,
302        files: &[FileHandle],
303        ranges: &mut Vec<RangeMeta>,
304    ) {
305        // For non append-only mode, each range only contains one file.
306        for (i, file) in files.iter().enumerate() {
307            let file_index = num_memtables + i;
308            ranges.push(RangeMeta {
309                time_range: file.time_range(),
310                indices: smallvec![SourceIndex {
311                    index: file_index,
312                    num_row_groups: file.meta_ref().num_row_groups,
313                }],
314                row_group_indices: smallvec![RowGroupIndex {
315                    index: file_index,
316                    row_group_index: ALL_ROW_GROUPS,
317                }],
318                num_rows: file.meta_ref().num_rows as usize,
319            });
320        }
321    }
322
323    #[cfg(feature = "enterprise")]
324    fn push_extension_ranges(
325        ranges: &[crate::extension::BoxedExtensionRange],
326        metas: &mut Vec<RangeMeta>,
327    ) {
328        for range in ranges.iter() {
329            let index = metas.len();
330            metas.push(RangeMeta {
331                time_range: range.time_range(),
332                indices: smallvec![SourceIndex {
333                    index,
334                    num_row_groups: range.num_row_groups(),
335                }],
336                row_group_indices: smallvec![RowGroupIndex {
337                    index,
338                    row_group_index: ALL_ROW_GROUPS,
339                }],
340                num_rows: range.num_rows() as usize,
341            });
342        }
343    }
344}
345
346/// Groups ranges by time range.
347/// It assumes each input range only contains a file or a memtable.
348fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
349    if ranges.is_empty() {
350        return ranges;
351    }
352
353    // Sorts ranges by time range (start asc, end desc).
354    ranges.sort_unstable_by(|a, b| {
355        let l = a.time_range;
356        let r = b.time_range;
357        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
358    });
359    let mut range_in_progress = None;
360    // Parts with exclusive time ranges.
361    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
362    for range in ranges {
363        let Some(mut prev_range) = range_in_progress.take() else {
364            // This is the new range to process.
365            range_in_progress = Some(range);
366            continue;
367        };
368
369        if prev_range.overlaps(&range) {
370            prev_range.merge(range);
371            range_in_progress = Some(prev_range);
372        } else {
373            exclusive_ranges.push(prev_range);
374            range_in_progress = Some(range);
375        }
376    }
377    if let Some(range) = range_in_progress {
378        exclusive_ranges.push(range);
379    }
380
381    exclusive_ranges
382}
383
384/// Splits the range into multiple smaller ranges.
385/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
386fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
387    let mut new_ranges = Vec::with_capacity(ranges.len());
388    for range in ranges {
389        range.maybe_split(&mut new_ranges);
390    }
391
392    new_ranges
393}
394
395/// Builder to create file ranges.
396#[derive(Default)]
397pub struct FileRangeBuilder {
398    /// Context for the file.
399    /// None indicates nothing to read.
400    context: Option<FileRangeContextRef>,
401    /// Row group selection for the file to read.
402    selection: RowGroupSelection,
403}
404
405impl FileRangeBuilder {
406    /// Builds a file range builder from context and row groups.
407    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
408        Self {
409            context: Some(context),
410            selection,
411        }
412    }
413
414    /// Builds file ranges to read.
415    /// Negative `row_group_index` indicates all row groups.
416    pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
417        let Some(context) = self.context.clone() else {
418            return;
419        };
420        if row_group_index >= 0 {
421            let row_group_index = row_group_index as usize;
422            // Scans one row group.
423            let Some(row_selection) = self.selection.get(row_group_index) else {
424                return;
425            };
426            ranges.push(FileRange::new(
427                context,
428                row_group_index,
429                Some(row_selection.clone()),
430            ));
431        } else {
432            // Scans all row groups.
433            ranges.extend(
434                self.selection
435                    .iter()
436                    .map(|(row_group_index, row_selection)| {
437                        FileRange::new(
438                            context.clone(),
439                            *row_group_index,
440                            Some(row_selection.clone()),
441                        )
442                    }),
443            );
444        }
445    }
446}
447
448/// Builder to create mem ranges.
449pub(crate) struct MemRangeBuilder {
450    /// Ranges of a memtable.
451    ranges: MemtableRanges,
452}
453
454impl MemRangeBuilder {
455    /// Builds a mem range builder from row groups.
456    pub(crate) fn new(ranges: MemtableRanges) -> Self {
457        Self { ranges }
458    }
459
460    /// Builds mem ranges to read in the memtable.
461    /// Negative `row_group_index` indicates all row groups.
462    pub(crate) fn build_ranges(
463        &self,
464        row_group_index: i64,
465        ranges: &mut SmallVec<[MemtableRange; 2]>,
466    ) {
467        if row_group_index >= 0 {
468            let row_group_index = row_group_index as usize;
469            // Scans one row group.
470            let Some(range) = self.ranges.ranges.get(&row_group_index) else {
471                return;
472            };
473            ranges.push(range.clone());
474        } else {
475            ranges.extend(self.ranges.ranges.values().cloned());
476        }
477    }
478
479    /// Returns the statistics of the memtable.
480    pub(crate) fn stats(&self) -> &MemtableStats {
481        &self.ranges.stats
482    }
483}
484
485/// List to manages the builders to create file ranges.
486/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
487/// the list to different streams in the same partition.
488pub(crate) struct RangeBuilderList {
489    num_memtables: usize,
490    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
491}
492
493impl RangeBuilderList {
494    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
495    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
496        let file_builders = (0..num_files).map(|_| None).collect();
497        Self {
498            num_memtables,
499            file_builders: Mutex::new(file_builders),
500        }
501    }
502
503    /// Builds file ranges to read the row group at `index`.
504    pub(crate) async fn build_file_ranges(
505        &self,
506        input: &ScanInput,
507        index: RowGroupIndex,
508        reader_metrics: &mut ReaderMetrics,
509    ) -> Result<SmallVec<[FileRange; 2]>> {
510        let mut ranges = SmallVec::new();
511        let file_index = index.index - self.num_memtables;
512        let builder_opt = self.get_file_builder(file_index);
513        match builder_opt {
514            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
515            None => {
516                let file = &input.files[file_index];
517                let builder = input.prune_file(file, reader_metrics).await?;
518                builder.build_ranges(index.row_group_index, &mut ranges);
519                self.set_file_builder(file_index, Arc::new(builder));
520            }
521        }
522        Ok(ranges)
523    }
524
525    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
526        let file_builders = self.file_builders.lock().unwrap();
527        file_builders[index].clone()
528    }
529
530    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
531        let mut file_builders = self.file_builders.lock().unwrap();
532        file_builders[index] = Some(builder);
533    }
534}
535
536#[cfg(test)]
537mod tests {
538    use common_time::timestamp::TimeUnit;
539    use common_time::Timestamp;
540
541    use super::*;
542
543    type Output = (Vec<usize>, i64, i64);
544
545    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
546        let ranges = input
547            .iter()
548            .map(|(idx, start, end)| {
549                let time_range = (
550                    Timestamp::new(*start, TimeUnit::Second),
551                    Timestamp::new(*end, TimeUnit::Second),
552                );
553                RangeMeta {
554                    time_range,
555                    indices: smallvec![SourceIndex {
556                        index: *idx,
557                        num_row_groups: 0,
558                    }],
559                    row_group_indices: smallvec![RowGroupIndex {
560                        index: *idx,
561                        row_group_index: 0
562                    }],
563                    num_rows: 1,
564                }
565            })
566            .collect();
567        let output = group_ranges_for_seq_scan(ranges);
568        let actual: Vec<_> = output
569            .iter()
570            .map(|range| {
571                let indices = range.indices.iter().map(|index| index.index).collect();
572                let group_indices: Vec<_> = range
573                    .row_group_indices
574                    .iter()
575                    .map(|idx| idx.index)
576                    .collect();
577                assert_eq!(indices, group_indices);
578                let range = range.time_range;
579                (indices, range.0.value(), range.1.value())
580            })
581            .collect();
582        assert_eq!(expect, actual);
583    }
584
585    #[test]
586    fn test_group_ranges() {
587        // Group 1 part.
588        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
589
590        // 1, 2, 3, 4 => [3, 1, 4], [2]
591        run_group_ranges_test(
592            &[
593                (1, 1000, 2000),
594                (2, 6000, 7000),
595                (3, 0, 1500),
596                (4, 1500, 3000),
597            ],
598            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
599        );
600
601        // 1, 2, 3 => [3], [1], [2],
602        run_group_ranges_test(
603            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
604            &[
605                (vec![3], 0, 1000),
606                (vec![1], 3000, 4000),
607                (vec![2], 4001, 6000),
608            ],
609        );
610
611        // 1, 2, 3 => [3], [1, 2]
612        run_group_ranges_test(
613            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
614            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
615        );
616    }
617
618    #[test]
619    fn test_merge_range() {
620        let mut left = RangeMeta {
621            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
622            indices: smallvec![SourceIndex {
623                index: 1,
624                num_row_groups: 2,
625            }],
626            row_group_indices: smallvec![
627                RowGroupIndex {
628                    index: 1,
629                    row_group_index: 1
630                },
631                RowGroupIndex {
632                    index: 1,
633                    row_group_index: 2
634                }
635            ],
636            num_rows: 5,
637        };
638        let right = RangeMeta {
639            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
640            indices: smallvec![SourceIndex {
641                index: 2,
642                num_row_groups: 2,
643            }],
644            row_group_indices: smallvec![
645                RowGroupIndex {
646                    index: 2,
647                    row_group_index: 1
648                },
649                RowGroupIndex {
650                    index: 2,
651                    row_group_index: 2
652                }
653            ],
654            num_rows: 4,
655        };
656        left.merge(right);
657
658        assert_eq!(
659            left,
660            RangeMeta {
661                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
662                indices: smallvec![
663                    SourceIndex {
664                        index: 1,
665                        num_row_groups: 2
666                    },
667                    SourceIndex {
668                        index: 2,
669                        num_row_groups: 2
670                    }
671                ],
672                row_group_indices: smallvec![
673                    RowGroupIndex {
674                        index: 1,
675                        row_group_index: 1
676                    },
677                    RowGroupIndex {
678                        index: 1,
679                        row_group_index: 2
680                    },
681                    RowGroupIndex {
682                        index: 2,
683                        row_group_index: 1
684                    },
685                    RowGroupIndex {
686                        index: 2,
687                        row_group_index: 2
688                    },
689                ],
690                num_rows: 9,
691            }
692        );
693    }
694
695    #[test]
696    fn test_split_range() {
697        let range = RangeMeta {
698            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
699            indices: smallvec![SourceIndex {
700                index: 1,
701                num_row_groups: 2,
702            }],
703            row_group_indices: smallvec![RowGroupIndex {
704                index: 1,
705                row_group_index: ALL_ROW_GROUPS,
706            }],
707            num_rows: 5,
708        };
709
710        assert!(range.can_split_preserve_order());
711        let mut output = Vec::new();
712        range.maybe_split(&mut output);
713
714        assert_eq!(
715            output,
716            &[
717                RangeMeta {
718                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
719                    indices: smallvec![SourceIndex {
720                        index: 1,
721                        num_row_groups: 2,
722                    }],
723                    row_group_indices: smallvec![RowGroupIndex {
724                        index: 1,
725                        row_group_index: 0
726                    },],
727                    num_rows: 2,
728                },
729                RangeMeta {
730                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
731                    indices: smallvec![SourceIndex {
732                        index: 1,
733                        num_row_groups: 2,
734                    }],
735                    row_group_indices: smallvec![RowGroupIndex {
736                        index: 1,
737                        row_group_index: 1
738                    }],
739                    num_rows: 2,
740                }
741            ]
742        );
743    }
744
745    #[test]
746    fn test_not_split_range() {
747        let range = RangeMeta {
748            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
749            indices: smallvec![
750                SourceIndex {
751                    index: 1,
752                    num_row_groups: 1,
753                },
754                SourceIndex {
755                    index: 2,
756                    num_row_groups: 1,
757                }
758            ],
759            row_group_indices: smallvec![
760                RowGroupIndex {
761                    index: 1,
762                    row_group_index: 1
763                },
764                RowGroupIndex {
765                    index: 2,
766                    row_group_index: 1
767                }
768            ],
769            num_rows: 5,
770        };
771
772        assert!(!range.can_split_preserve_order());
773        let mut output = Vec::new();
774        range.maybe_split(&mut output);
775        assert_eq!(1, output.len());
776    }
777
778    #[test]
779    fn test_maybe_split_ranges() {
780        let ranges = vec![
781            RangeMeta {
782                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
783                indices: smallvec![SourceIndex {
784                    index: 0,
785                    num_row_groups: 1,
786                }],
787                row_group_indices: smallvec![RowGroupIndex {
788                    index: 0,
789                    row_group_index: 0,
790                },],
791                num_rows: 4,
792            },
793            RangeMeta {
794                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
795                indices: smallvec![SourceIndex {
796                    index: 1,
797                    num_row_groups: 2,
798                }],
799                row_group_indices: smallvec![RowGroupIndex {
800                    index: 1,
801                    row_group_index: ALL_ROW_GROUPS,
802                },],
803                num_rows: 4,
804            },
805            RangeMeta {
806                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
807                indices: smallvec![
808                    SourceIndex {
809                        index: 2,
810                        num_row_groups: 2,
811                    },
812                    SourceIndex {
813                        index: 3,
814                        num_row_groups: 0,
815                    }
816                ],
817                row_group_indices: smallvec![
818                    RowGroupIndex {
819                        index: 2,
820                        row_group_index: ALL_ROW_GROUPS,
821                    },
822                    RowGroupIndex {
823                        index: 3,
824                        row_group_index: ALL_ROW_GROUPS,
825                    }
826                ],
827                num_rows: 5,
828            },
829        ];
830        let output = maybe_split_ranges_for_seq_scan(ranges);
831        assert_eq!(
832            output,
833            vec![
834                RangeMeta {
835                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
836                    indices: smallvec![SourceIndex {
837                        index: 0,
838                        num_row_groups: 1,
839                    }],
840                    row_group_indices: smallvec![RowGroupIndex {
841                        index: 0,
842                        row_group_index: 0
843                    },],
844                    num_rows: 4,
845                },
846                RangeMeta {
847                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
848                    indices: smallvec![SourceIndex {
849                        index: 1,
850                        num_row_groups: 2,
851                    }],
852                    row_group_indices: smallvec![RowGroupIndex {
853                        index: 1,
854                        row_group_index: 0
855                    },],
856                    num_rows: 2,
857                },
858                RangeMeta {
859                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
860                    indices: smallvec![SourceIndex {
861                        index: 1,
862                        num_row_groups: 2,
863                    }],
864                    row_group_indices: smallvec![RowGroupIndex {
865                        index: 1,
866                        row_group_index: 1
867                    }],
868                    num_rows: 2,
869                },
870                RangeMeta {
871                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
872                    indices: smallvec![
873                        SourceIndex {
874                            index: 2,
875                            num_row_groups: 2
876                        },
877                        SourceIndex {
878                            index: 3,
879                            num_row_groups: 0,
880                        }
881                    ],
882                    row_group_indices: smallvec![
883                        RowGroupIndex {
884                            index: 2,
885                            row_group_index: ALL_ROW_GROUPS,
886                        },
887                        RowGroupIndex {
888                            index: 3,
889                            row_group_index: ALL_ROW_GROUPS,
890                        }
891                    ],
892                    num_rows: 5,
893                },
894            ]
895        )
896    }
897}