mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{SmallVec, smallvec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{FileHandle, FileTimeRange, overlaps};
29use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
30use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
31use crate::sst::parquet::format::parquet_row_group_time_range;
32use crate::sst::parquet::reader::ReaderMetrics;
33use crate::sst::parquet::row_selection::RowGroupSelection;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        #[cfg(feature = "enterprise")]
101        Self::push_extension_ranges(input, &mut ranges);
102
103        let ranges = group_ranges_for_seq_scan(ranges);
104        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
106            return ranges;
107        }
108        maybe_split_ranges_for_seq_scan(ranges)
109    }
110
111    /// Creates a list of ranges from the `input` for unordered scan.
112    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115        Self::push_unordered_file_ranges(
116            input.memtables.len(),
117            &input.files,
118            &input.cache_strategy,
119            &mut ranges,
120        );
121
122        #[cfg(feature = "enterprise")]
123        Self::push_extension_ranges(input, &mut ranges);
124
125        ranges
126    }
127
128    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
129    fn overlaps(&self, meta: &RangeMeta) -> bool {
130        overlaps(&self.time_range, &meta.time_range)
131    }
132
133    /// Merges given `meta` to this meta.
134    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
135    fn merge(&mut self, mut other: RangeMeta) {
136        debug_assert!(self.overlaps(&other));
137        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138        debug_assert!(
139            self.row_group_indices
140                .iter()
141                .all(|idx| !other.row_group_indices.contains(idx))
142        );
143
144        self.time_range = (
145            self.time_range.0.min(other.time_range.0),
146            self.time_range.1.max(other.time_range.1),
147        );
148        self.indices.append(&mut other.indices);
149        self.row_group_indices.append(&mut other.row_group_indices);
150        self.num_rows += other.num_rows;
151    }
152
153    /// Returns true if we can split the range into multiple smaller ranges and
154    /// still preserve the order for [SeqScan].
155    fn can_split_preserve_order(&self) -> bool {
156        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
157    }
158
159    /// Splits the range if it can preserve the order.
160    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
161        if self.can_split_preserve_order() {
162            let num_row_groups = self.indices[0].num_row_groups;
163            debug_assert_eq!(1, self.row_group_indices.len());
164            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
165
166            output.reserve(self.row_group_indices.len());
167            let num_rows = self.num_rows / num_row_groups as usize;
168            // Splits by row group.
169            for row_group_index in 0..num_row_groups {
170                output.push(RangeMeta {
171                    time_range: self.time_range,
172                    indices: self.indices.clone(),
173                    row_group_indices: smallvec![RowGroupIndex {
174                        index: self.indices[0].index,
175                        row_group_index: row_group_index as i64,
176                    }],
177                    num_rows,
178                });
179            }
180        } else {
181            output.push(self);
182        }
183    }
184
185    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
186        // For append mode, we can parallelize reading memtables.
187        for (memtable_index, memtable) in memtables.iter().enumerate() {
188            let stats = memtable.stats();
189            let Some(time_range) = stats.time_range() else {
190                continue;
191            };
192            for row_group_index in 0..stats.num_ranges() {
193                let num_rows = stats.num_rows() / stats.num_ranges();
194                ranges.push(RangeMeta {
195                    time_range,
196                    indices: smallvec![SourceIndex {
197                        index: memtable_index,
198                        num_row_groups: stats.num_ranges() as u64,
199                    }],
200                    row_group_indices: smallvec![RowGroupIndex {
201                        index: memtable_index,
202                        row_group_index: row_group_index as i64,
203                    }],
204                    num_rows,
205                });
206            }
207        }
208    }
209
210    fn push_unordered_file_ranges(
211        num_memtables: usize,
212        files: &[FileHandle],
213        cache: &CacheStrategy,
214        ranges: &mut Vec<RangeMeta>,
215    ) {
216        // For append mode, we can parallelize reading row groups.
217        for (i, file) in files.iter().enumerate() {
218            let file_index = num_memtables + i;
219            // Get parquet meta from the cache.
220            let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
221            if let Some(parquet_meta) = parquet_meta {
222                // Scans each row group.
223                for row_group_index in 0..file.meta_ref().num_row_groups {
224                    let time_range = parquet_row_group_time_range(
225                        file.meta_ref(),
226                        &parquet_meta,
227                        row_group_index as usize,
228                    );
229                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
230                    ranges.push(RangeMeta {
231                        time_range: time_range.unwrap_or_else(|| file.time_range()),
232                        indices: smallvec![SourceIndex {
233                            index: file_index,
234                            num_row_groups: file.meta_ref().num_row_groups,
235                        }],
236                        row_group_indices: smallvec![RowGroupIndex {
237                            index: file_index,
238                            row_group_index: row_group_index as i64,
239                        }],
240                        num_rows: num_rows as usize,
241                    });
242                }
243            } else if file.meta_ref().num_row_groups > 0 {
244                // Scans each row group.
245                for row_group_index in 0..file.meta_ref().num_row_groups {
246                    ranges.push(RangeMeta {
247                        time_range: file.time_range(),
248                        indices: smallvec![SourceIndex {
249                            index: file_index,
250                            num_row_groups: file.meta_ref().num_row_groups,
251                        }],
252                        row_group_indices: smallvec![RowGroupIndex {
253                            index: file_index,
254                            row_group_index: row_group_index as i64,
255                        }],
256                        num_rows: DEFAULT_ROW_GROUP_SIZE,
257                    });
258                }
259            } else {
260                // If we don't known the number of row groups in advance, scan all row groups.
261                ranges.push(RangeMeta {
262                    time_range: file.time_range(),
263                    indices: smallvec![SourceIndex {
264                        index: file_index,
265                        num_row_groups: 0,
266                    }],
267                    row_group_indices: smallvec![RowGroupIndex {
268                        index: file_index,
269                        row_group_index: ALL_ROW_GROUPS,
270                    }],
271                    // This may be 0.
272                    num_rows: file.meta_ref().num_rows as usize,
273                });
274            }
275        }
276    }
277
278    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
279        // For non append-only mode, each range only contains one memtable by default.
280        for (i, memtable) in memtables.iter().enumerate() {
281            let stats = memtable.stats();
282            let Some(time_range) = stats.time_range() else {
283                continue;
284            };
285            ranges.push(RangeMeta {
286                time_range,
287                indices: smallvec![SourceIndex {
288                    index: i,
289                    num_row_groups: stats.num_ranges() as u64,
290                }],
291                row_group_indices: smallvec![RowGroupIndex {
292                    index: i,
293                    row_group_index: ALL_ROW_GROUPS,
294                }],
295                num_rows: stats.num_rows(),
296            });
297        }
298    }
299
300    fn push_seq_file_ranges(
301        num_memtables: usize,
302        files: &[FileHandle],
303        ranges: &mut Vec<RangeMeta>,
304    ) {
305        // For non append-only mode, each range only contains one file.
306        for (i, file) in files.iter().enumerate() {
307            let file_index = num_memtables + i;
308            ranges.push(RangeMeta {
309                time_range: file.time_range(),
310                indices: smallvec![SourceIndex {
311                    index: file_index,
312                    num_row_groups: file.meta_ref().num_row_groups,
313                }],
314                row_group_indices: smallvec![RowGroupIndex {
315                    index: file_index,
316                    row_group_index: ALL_ROW_GROUPS,
317                }],
318                num_rows: file.meta_ref().num_rows as usize,
319            });
320        }
321    }
322
323    #[cfg(feature = "enterprise")]
324    fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
325        for (i, range) in input.extension_ranges().iter().enumerate() {
326            let index = input.num_memtables() + input.num_files() + i;
327            metas.push(RangeMeta {
328                time_range: range.time_range(),
329                indices: smallvec![SourceIndex {
330                    index,
331                    num_row_groups: range.num_row_groups(),
332                }],
333                row_group_indices: smallvec![RowGroupIndex {
334                    index,
335                    row_group_index: ALL_ROW_GROUPS,
336                }],
337                num_rows: range.num_rows() as usize,
338            });
339        }
340    }
341}
342
343/// Groups ranges by time range.
344/// It assumes each input range only contains a file or a memtable.
345fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
346    if ranges.is_empty() {
347        return ranges;
348    }
349
350    // Sorts ranges by time range (start asc, end desc).
351    ranges.sort_unstable_by(|a, b| {
352        let l = a.time_range;
353        let r = b.time_range;
354        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
355    });
356    let mut range_in_progress = None;
357    // Parts with exclusive time ranges.
358    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
359    for range in ranges {
360        let Some(mut prev_range) = range_in_progress.take() else {
361            // This is the new range to process.
362            range_in_progress = Some(range);
363            continue;
364        };
365
366        if prev_range.overlaps(&range) {
367            prev_range.merge(range);
368            range_in_progress = Some(prev_range);
369        } else {
370            exclusive_ranges.push(prev_range);
371            range_in_progress = Some(range);
372        }
373    }
374    if let Some(range) = range_in_progress {
375        exclusive_ranges.push(range);
376    }
377
378    exclusive_ranges
379}
380
381/// Splits the range into multiple smaller ranges.
382/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
383fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
384    let mut new_ranges = Vec::with_capacity(ranges.len());
385    for range in ranges {
386        range.maybe_split(&mut new_ranges);
387    }
388
389    new_ranges
390}
391
392/// Builder to create file ranges.
393#[derive(Default)]
394pub struct FileRangeBuilder {
395    /// Context for the file.
396    /// None indicates nothing to read.
397    context: Option<FileRangeContextRef>,
398    /// Row group selection for the file to read.
399    selection: RowGroupSelection,
400}
401
402impl FileRangeBuilder {
403    /// Builds a file range builder from context and row groups.
404    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
405        Self {
406            context: Some(context),
407            selection,
408        }
409    }
410
411    /// Builds file ranges to read.
412    /// Negative `row_group_index` indicates all row groups.
413    pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
414        let Some(context) = self.context.clone() else {
415            return;
416        };
417        if row_group_index >= 0 {
418            let row_group_index = row_group_index as usize;
419            // Scans one row group.
420            let Some(row_selection) = self.selection.get(row_group_index) else {
421                return;
422            };
423            ranges.push(FileRange::new(
424                context,
425                row_group_index,
426                Some(row_selection.clone()),
427            ));
428        } else {
429            // Scans all row groups.
430            ranges.extend(
431                self.selection
432                    .iter()
433                    .map(|(row_group_index, row_selection)| {
434                        FileRange::new(
435                            context.clone(),
436                            *row_group_index,
437                            Some(row_selection.clone()),
438                        )
439                    }),
440            );
441        }
442    }
443}
444
445/// Builder to create mem ranges.
446pub(crate) struct MemRangeBuilder {
447    /// Ranges of a memtable.
448    range: MemtableRange,
449    /// Stats of a memtable.
450    stats: MemtableStats,
451}
452
453impl MemRangeBuilder {
454    /// Builds a mem range builder from row groups.
455    pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
456        Self { range, stats }
457    }
458
459    /// Builds mem ranges to read in the memtable.
460    /// Negative `row_group_index` indicates all row groups.
461    pub(crate) fn build_ranges(
462        &self,
463        _row_group_index: i64,
464        ranges: &mut SmallVec<[MemtableRange; 2]>,
465    ) {
466        ranges.push(self.range.clone())
467    }
468
469    /// Returns the statistics of the memtable.
470    pub(crate) fn stats(&self) -> &MemtableStats {
471        &self.stats
472    }
473}
474
475/// List to manages the builders to create file ranges.
476/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
477/// the list to different streams in the same partition.
478pub(crate) struct RangeBuilderList {
479    num_memtables: usize,
480    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
481}
482
483impl RangeBuilderList {
484    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
485    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
486        let file_builders = (0..num_files).map(|_| None).collect();
487        Self {
488            num_memtables,
489            file_builders: Mutex::new(file_builders),
490        }
491    }
492
493    /// Builds file ranges to read the row group at `index`.
494    pub(crate) async fn build_file_ranges(
495        &self,
496        input: &ScanInput,
497        index: RowGroupIndex,
498        reader_metrics: &mut ReaderMetrics,
499    ) -> Result<SmallVec<[FileRange; 2]>> {
500        let mut ranges = SmallVec::new();
501        let file_index = index.index - self.num_memtables;
502        let builder_opt = self.get_file_builder(file_index);
503        match builder_opt {
504            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
505            None => {
506                let file = &input.files[file_index];
507                let builder = input.prune_file(file, reader_metrics).await?;
508                builder.build_ranges(index.row_group_index, &mut ranges);
509                self.set_file_builder(file_index, Arc::new(builder));
510            }
511        }
512        Ok(ranges)
513    }
514
515    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
516        let file_builders = self.file_builders.lock().unwrap();
517        file_builders[index].clone()
518    }
519
520    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
521        let mut file_builders = self.file_builders.lock().unwrap();
522        file_builders[index] = Some(builder);
523    }
524}
525
526#[cfg(test)]
527mod tests {
528    use common_time::Timestamp;
529    use common_time::timestamp::TimeUnit;
530
531    use super::*;
532
533    type Output = (Vec<usize>, i64, i64);
534
535    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
536        let ranges = input
537            .iter()
538            .map(|(idx, start, end)| {
539                let time_range = (
540                    Timestamp::new(*start, TimeUnit::Second),
541                    Timestamp::new(*end, TimeUnit::Second),
542                );
543                RangeMeta {
544                    time_range,
545                    indices: smallvec![SourceIndex {
546                        index: *idx,
547                        num_row_groups: 0,
548                    }],
549                    row_group_indices: smallvec![RowGroupIndex {
550                        index: *idx,
551                        row_group_index: 0
552                    }],
553                    num_rows: 1,
554                }
555            })
556            .collect();
557        let output = group_ranges_for_seq_scan(ranges);
558        let actual: Vec<_> = output
559            .iter()
560            .map(|range| {
561                let indices = range.indices.iter().map(|index| index.index).collect();
562                let group_indices: Vec<_> = range
563                    .row_group_indices
564                    .iter()
565                    .map(|idx| idx.index)
566                    .collect();
567                assert_eq!(indices, group_indices);
568                let range = range.time_range;
569                (indices, range.0.value(), range.1.value())
570            })
571            .collect();
572        assert_eq!(expect, actual);
573    }
574
575    #[test]
576    fn test_group_ranges() {
577        // Group 1 part.
578        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
579
580        // 1, 2, 3, 4 => [3, 1, 4], [2]
581        run_group_ranges_test(
582            &[
583                (1, 1000, 2000),
584                (2, 6000, 7000),
585                (3, 0, 1500),
586                (4, 1500, 3000),
587            ],
588            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
589        );
590
591        // 1, 2, 3 => [3], [1], [2],
592        run_group_ranges_test(
593            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
594            &[
595                (vec![3], 0, 1000),
596                (vec![1], 3000, 4000),
597                (vec![2], 4001, 6000),
598            ],
599        );
600
601        // 1, 2, 3 => [3], [1, 2]
602        run_group_ranges_test(
603            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
604            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
605        );
606    }
607
608    #[test]
609    fn test_merge_range() {
610        let mut left = RangeMeta {
611            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
612            indices: smallvec![SourceIndex {
613                index: 1,
614                num_row_groups: 2,
615            }],
616            row_group_indices: smallvec![
617                RowGroupIndex {
618                    index: 1,
619                    row_group_index: 1
620                },
621                RowGroupIndex {
622                    index: 1,
623                    row_group_index: 2
624                }
625            ],
626            num_rows: 5,
627        };
628        let right = RangeMeta {
629            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
630            indices: smallvec![SourceIndex {
631                index: 2,
632                num_row_groups: 2,
633            }],
634            row_group_indices: smallvec![
635                RowGroupIndex {
636                    index: 2,
637                    row_group_index: 1
638                },
639                RowGroupIndex {
640                    index: 2,
641                    row_group_index: 2
642                }
643            ],
644            num_rows: 4,
645        };
646        left.merge(right);
647
648        assert_eq!(
649            left,
650            RangeMeta {
651                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
652                indices: smallvec![
653                    SourceIndex {
654                        index: 1,
655                        num_row_groups: 2
656                    },
657                    SourceIndex {
658                        index: 2,
659                        num_row_groups: 2
660                    }
661                ],
662                row_group_indices: smallvec![
663                    RowGroupIndex {
664                        index: 1,
665                        row_group_index: 1
666                    },
667                    RowGroupIndex {
668                        index: 1,
669                        row_group_index: 2
670                    },
671                    RowGroupIndex {
672                        index: 2,
673                        row_group_index: 1
674                    },
675                    RowGroupIndex {
676                        index: 2,
677                        row_group_index: 2
678                    },
679                ],
680                num_rows: 9,
681            }
682        );
683    }
684
685    #[test]
686    fn test_split_range() {
687        let range = RangeMeta {
688            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
689            indices: smallvec![SourceIndex {
690                index: 1,
691                num_row_groups: 2,
692            }],
693            row_group_indices: smallvec![RowGroupIndex {
694                index: 1,
695                row_group_index: ALL_ROW_GROUPS,
696            }],
697            num_rows: 5,
698        };
699
700        assert!(range.can_split_preserve_order());
701        let mut output = Vec::new();
702        range.maybe_split(&mut output);
703
704        assert_eq!(
705            output,
706            &[
707                RangeMeta {
708                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
709                    indices: smallvec![SourceIndex {
710                        index: 1,
711                        num_row_groups: 2,
712                    }],
713                    row_group_indices: smallvec![RowGroupIndex {
714                        index: 1,
715                        row_group_index: 0
716                    },],
717                    num_rows: 2,
718                },
719                RangeMeta {
720                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
721                    indices: smallvec![SourceIndex {
722                        index: 1,
723                        num_row_groups: 2,
724                    }],
725                    row_group_indices: smallvec![RowGroupIndex {
726                        index: 1,
727                        row_group_index: 1
728                    }],
729                    num_rows: 2,
730                }
731            ]
732        );
733    }
734
735    #[test]
736    fn test_not_split_range() {
737        let range = RangeMeta {
738            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
739            indices: smallvec![
740                SourceIndex {
741                    index: 1,
742                    num_row_groups: 1,
743                },
744                SourceIndex {
745                    index: 2,
746                    num_row_groups: 1,
747                }
748            ],
749            row_group_indices: smallvec![
750                RowGroupIndex {
751                    index: 1,
752                    row_group_index: 1
753                },
754                RowGroupIndex {
755                    index: 2,
756                    row_group_index: 1
757                }
758            ],
759            num_rows: 5,
760        };
761
762        assert!(!range.can_split_preserve_order());
763        let mut output = Vec::new();
764        range.maybe_split(&mut output);
765        assert_eq!(1, output.len());
766    }
767
768    #[test]
769    fn test_maybe_split_ranges() {
770        let ranges = vec![
771            RangeMeta {
772                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
773                indices: smallvec![SourceIndex {
774                    index: 0,
775                    num_row_groups: 1,
776                }],
777                row_group_indices: smallvec![RowGroupIndex {
778                    index: 0,
779                    row_group_index: 0,
780                },],
781                num_rows: 4,
782            },
783            RangeMeta {
784                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
785                indices: smallvec![SourceIndex {
786                    index: 1,
787                    num_row_groups: 2,
788                }],
789                row_group_indices: smallvec![RowGroupIndex {
790                    index: 1,
791                    row_group_index: ALL_ROW_GROUPS,
792                },],
793                num_rows: 4,
794            },
795            RangeMeta {
796                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
797                indices: smallvec![
798                    SourceIndex {
799                        index: 2,
800                        num_row_groups: 2,
801                    },
802                    SourceIndex {
803                        index: 3,
804                        num_row_groups: 0,
805                    }
806                ],
807                row_group_indices: smallvec![
808                    RowGroupIndex {
809                        index: 2,
810                        row_group_index: ALL_ROW_GROUPS,
811                    },
812                    RowGroupIndex {
813                        index: 3,
814                        row_group_index: ALL_ROW_GROUPS,
815                    }
816                ],
817                num_rows: 5,
818            },
819        ];
820        let output = maybe_split_ranges_for_seq_scan(ranges);
821        assert_eq!(
822            output,
823            vec![
824                RangeMeta {
825                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
826                    indices: smallvec![SourceIndex {
827                        index: 0,
828                        num_row_groups: 1,
829                    }],
830                    row_group_indices: smallvec![RowGroupIndex {
831                        index: 0,
832                        row_group_index: 0
833                    },],
834                    num_rows: 4,
835                },
836                RangeMeta {
837                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
838                    indices: smallvec![SourceIndex {
839                        index: 1,
840                        num_row_groups: 2,
841                    }],
842                    row_group_indices: smallvec![RowGroupIndex {
843                        index: 1,
844                        row_group_index: 0
845                    },],
846                    num_rows: 2,
847                },
848                RangeMeta {
849                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
850                    indices: smallvec![SourceIndex {
851                        index: 1,
852                        num_row_groups: 2,
853                    }],
854                    row_group_indices: smallvec![RowGroupIndex {
855                        index: 1,
856                        row_group_index: 1
857                    }],
858                    num_rows: 2,
859                },
860                RangeMeta {
861                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
862                    indices: smallvec![
863                        SourceIndex {
864                            index: 2,
865                            num_row_groups: 2
866                        },
867                        SourceIndex {
868                            index: 3,
869                            num_row_groups: 0,
870                        }
871                    ],
872                    row_group_indices: smallvec![
873                        RowGroupIndex {
874                            index: 2,
875                            row_group_index: ALL_ROW_GROUPS,
876                        },
877                        RowGroupIndex {
878                            index: 3,
879                            row_group_index: ALL_ROW_GROUPS,
880                        }
881                    ],
882                    num_rows: 5,
883                },
884            ]
885        )
886    }
887}