mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        #[cfg(feature = "enterprise")]
101        Self::push_extension_ranges(input, &mut ranges);
102
103        let ranges = group_ranges_for_seq_scan(ranges);
104        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
106            return ranges;
107        }
108        maybe_split_ranges_for_seq_scan(ranges)
109    }
110
111    /// Creates a list of ranges from the `input` for unordered scan.
112    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115        Self::push_unordered_file_ranges(
116            input.memtables.len(),
117            &input.files,
118            &input.cache_strategy,
119            &mut ranges,
120        );
121
122        #[cfg(feature = "enterprise")]
123        Self::push_extension_ranges(input, &mut ranges);
124
125        ranges
126    }
127
128    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
129    fn overlaps(&self, meta: &RangeMeta) -> bool {
130        overlaps(&self.time_range, &meta.time_range)
131    }
132
133    /// Merges given `meta` to this meta.
134    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
135    fn merge(&mut self, mut other: RangeMeta) {
136        debug_assert!(self.overlaps(&other));
137        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138        debug_assert!(self
139            .row_group_indices
140            .iter()
141            .all(|idx| !other.row_group_indices.contains(idx)));
142
143        self.time_range = (
144            self.time_range.0.min(other.time_range.0),
145            self.time_range.1.max(other.time_range.1),
146        );
147        self.indices.append(&mut other.indices);
148        self.row_group_indices.append(&mut other.row_group_indices);
149        self.num_rows += other.num_rows;
150    }
151
152    /// Returns true if we can split the range into multiple smaller ranges and
153    /// still preserve the order for [SeqScan].
154    fn can_split_preserve_order(&self) -> bool {
155        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
156    }
157
158    /// Splits the range if it can preserve the order.
159    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
160        if self.can_split_preserve_order() {
161            let num_row_groups = self.indices[0].num_row_groups;
162            debug_assert_eq!(1, self.row_group_indices.len());
163            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
164
165            output.reserve(self.row_group_indices.len());
166            let num_rows = self.num_rows / num_row_groups as usize;
167            // Splits by row group.
168            for row_group_index in 0..num_row_groups {
169                output.push(RangeMeta {
170                    time_range: self.time_range,
171                    indices: self.indices.clone(),
172                    row_group_indices: smallvec![RowGroupIndex {
173                        index: self.indices[0].index,
174                        row_group_index: row_group_index as i64,
175                    }],
176                    num_rows,
177                });
178            }
179        } else {
180            output.push(self);
181        }
182    }
183
184    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
185        // For append mode, we can parallelize reading memtables.
186        for (memtable_index, memtable) in memtables.iter().enumerate() {
187            let stats = memtable.stats();
188            let Some(time_range) = stats.time_range() else {
189                continue;
190            };
191            for row_group_index in 0..stats.num_ranges() {
192                let num_rows = stats.num_rows() / stats.num_ranges();
193                ranges.push(RangeMeta {
194                    time_range,
195                    indices: smallvec![SourceIndex {
196                        index: memtable_index,
197                        num_row_groups: stats.num_ranges() as u64,
198                    }],
199                    row_group_indices: smallvec![RowGroupIndex {
200                        index: memtable_index,
201                        row_group_index: row_group_index as i64,
202                    }],
203                    num_rows,
204                });
205            }
206        }
207    }
208
209    fn push_unordered_file_ranges(
210        num_memtables: usize,
211        files: &[FileHandle],
212        cache: &CacheStrategy,
213        ranges: &mut Vec<RangeMeta>,
214    ) {
215        // For append mode, we can parallelize reading row groups.
216        for (i, file) in files.iter().enumerate() {
217            let file_index = num_memtables + i;
218            // Get parquet meta from the cache.
219            let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
220            if let Some(parquet_meta) = parquet_meta {
221                // Scans each row group.
222                for row_group_index in 0..file.meta_ref().num_row_groups {
223                    let time_range = parquet_row_group_time_range(
224                        file.meta_ref(),
225                        &parquet_meta,
226                        row_group_index as usize,
227                    );
228                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
229                    ranges.push(RangeMeta {
230                        time_range: time_range.unwrap_or_else(|| file.time_range()),
231                        indices: smallvec![SourceIndex {
232                            index: file_index,
233                            num_row_groups: file.meta_ref().num_row_groups,
234                        }],
235                        row_group_indices: smallvec![RowGroupIndex {
236                            index: file_index,
237                            row_group_index: row_group_index as i64,
238                        }],
239                        num_rows: num_rows as usize,
240                    });
241                }
242            } else if file.meta_ref().num_row_groups > 0 {
243                // Scans each row group.
244                for row_group_index in 0..file.meta_ref().num_row_groups {
245                    ranges.push(RangeMeta {
246                        time_range: file.time_range(),
247                        indices: smallvec![SourceIndex {
248                            index: file_index,
249                            num_row_groups: file.meta_ref().num_row_groups,
250                        }],
251                        row_group_indices: smallvec![RowGroupIndex {
252                            index: file_index,
253                            row_group_index: row_group_index as i64,
254                        }],
255                        num_rows: DEFAULT_ROW_GROUP_SIZE,
256                    });
257                }
258            } else {
259                // If we don't known the number of row groups in advance, scan all row groups.
260                ranges.push(RangeMeta {
261                    time_range: file.time_range(),
262                    indices: smallvec![SourceIndex {
263                        index: file_index,
264                        num_row_groups: 0,
265                    }],
266                    row_group_indices: smallvec![RowGroupIndex {
267                        index: file_index,
268                        row_group_index: ALL_ROW_GROUPS,
269                    }],
270                    // This may be 0.
271                    num_rows: file.meta_ref().num_rows as usize,
272                });
273            }
274        }
275    }
276
277    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
278        // For non append-only mode, each range only contains one memtable by default.
279        for (i, memtable) in memtables.iter().enumerate() {
280            let stats = memtable.stats();
281            let Some(time_range) = stats.time_range() else {
282                continue;
283            };
284            ranges.push(RangeMeta {
285                time_range,
286                indices: smallvec![SourceIndex {
287                    index: i,
288                    num_row_groups: stats.num_ranges() as u64,
289                }],
290                row_group_indices: smallvec![RowGroupIndex {
291                    index: i,
292                    row_group_index: ALL_ROW_GROUPS,
293                }],
294                num_rows: stats.num_rows(),
295            });
296        }
297    }
298
299    fn push_seq_file_ranges(
300        num_memtables: usize,
301        files: &[FileHandle],
302        ranges: &mut Vec<RangeMeta>,
303    ) {
304        // For non append-only mode, each range only contains one file.
305        for (i, file) in files.iter().enumerate() {
306            let file_index = num_memtables + i;
307            ranges.push(RangeMeta {
308                time_range: file.time_range(),
309                indices: smallvec![SourceIndex {
310                    index: file_index,
311                    num_row_groups: file.meta_ref().num_row_groups,
312                }],
313                row_group_indices: smallvec![RowGroupIndex {
314                    index: file_index,
315                    row_group_index: ALL_ROW_GROUPS,
316                }],
317                num_rows: file.meta_ref().num_rows as usize,
318            });
319        }
320    }
321
322    #[cfg(feature = "enterprise")]
323    fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
324        for (i, range) in input.extension_ranges().iter().enumerate() {
325            let index = input.num_memtables() + input.num_files() + i;
326            metas.push(RangeMeta {
327                time_range: range.time_range(),
328                indices: smallvec![SourceIndex {
329                    index,
330                    num_row_groups: range.num_row_groups(),
331                }],
332                row_group_indices: smallvec![RowGroupIndex {
333                    index,
334                    row_group_index: ALL_ROW_GROUPS,
335                }],
336                num_rows: range.num_rows() as usize,
337            });
338        }
339    }
340}
341
342/// Groups ranges by time range.
343/// It assumes each input range only contains a file or a memtable.
344fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
345    if ranges.is_empty() {
346        return ranges;
347    }
348
349    // Sorts ranges by time range (start asc, end desc).
350    ranges.sort_unstable_by(|a, b| {
351        let l = a.time_range;
352        let r = b.time_range;
353        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
354    });
355    let mut range_in_progress = None;
356    // Parts with exclusive time ranges.
357    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
358    for range in ranges {
359        let Some(mut prev_range) = range_in_progress.take() else {
360            // This is the new range to process.
361            range_in_progress = Some(range);
362            continue;
363        };
364
365        if prev_range.overlaps(&range) {
366            prev_range.merge(range);
367            range_in_progress = Some(prev_range);
368        } else {
369            exclusive_ranges.push(prev_range);
370            range_in_progress = Some(range);
371        }
372    }
373    if let Some(range) = range_in_progress {
374        exclusive_ranges.push(range);
375    }
376
377    exclusive_ranges
378}
379
380/// Splits the range into multiple smaller ranges.
381/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
382fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
383    let mut new_ranges = Vec::with_capacity(ranges.len());
384    for range in ranges {
385        range.maybe_split(&mut new_ranges);
386    }
387
388    new_ranges
389}
390
391/// Builder to create file ranges.
392#[derive(Default)]
393pub struct FileRangeBuilder {
394    /// Context for the file.
395    /// None indicates nothing to read.
396    context: Option<FileRangeContextRef>,
397    /// Row group selection for the file to read.
398    selection: RowGroupSelection,
399}
400
401impl FileRangeBuilder {
402    /// Builds a file range builder from context and row groups.
403    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
404        Self {
405            context: Some(context),
406            selection,
407        }
408    }
409
410    /// Builds file ranges to read.
411    /// Negative `row_group_index` indicates all row groups.
412    pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
413        let Some(context) = self.context.clone() else {
414            return;
415        };
416        if row_group_index >= 0 {
417            let row_group_index = row_group_index as usize;
418            // Scans one row group.
419            let Some(row_selection) = self.selection.get(row_group_index) else {
420                return;
421            };
422            ranges.push(FileRange::new(
423                context,
424                row_group_index,
425                Some(row_selection.clone()),
426            ));
427        } else {
428            // Scans all row groups.
429            ranges.extend(
430                self.selection
431                    .iter()
432                    .map(|(row_group_index, row_selection)| {
433                        FileRange::new(
434                            context.clone(),
435                            *row_group_index,
436                            Some(row_selection.clone()),
437                        )
438                    }),
439            );
440        }
441    }
442}
443
444/// Builder to create mem ranges.
445pub(crate) struct MemRangeBuilder {
446    /// Ranges of a memtable.
447    range: MemtableRange,
448    /// Stats of a memtable.
449    stats: MemtableStats,
450}
451
452impl MemRangeBuilder {
453    /// Builds a mem range builder from row groups.
454    pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
455        Self { range, stats }
456    }
457
458    /// Builds mem ranges to read in the memtable.
459    /// Negative `row_group_index` indicates all row groups.
460    pub(crate) fn build_ranges(
461        &self,
462        _row_group_index: i64,
463        ranges: &mut SmallVec<[MemtableRange; 2]>,
464    ) {
465        ranges.push(self.range.clone())
466    }
467
468    /// Returns the statistics of the memtable.
469    pub(crate) fn stats(&self) -> &MemtableStats {
470        &self.stats
471    }
472}
473
474/// List to manages the builders to create file ranges.
475/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
476/// the list to different streams in the same partition.
477pub(crate) struct RangeBuilderList {
478    num_memtables: usize,
479    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
480}
481
482impl RangeBuilderList {
483    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
484    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
485        let file_builders = (0..num_files).map(|_| None).collect();
486        Self {
487            num_memtables,
488            file_builders: Mutex::new(file_builders),
489        }
490    }
491
492    /// Builds file ranges to read the row group at `index`.
493    pub(crate) async fn build_file_ranges(
494        &self,
495        input: &ScanInput,
496        index: RowGroupIndex,
497        reader_metrics: &mut ReaderMetrics,
498    ) -> Result<SmallVec<[FileRange; 2]>> {
499        let mut ranges = SmallVec::new();
500        let file_index = index.index - self.num_memtables;
501        let builder_opt = self.get_file_builder(file_index);
502        match builder_opt {
503            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
504            None => {
505                let file = &input.files[file_index];
506                let builder = input.prune_file(file, reader_metrics).await?;
507                builder.build_ranges(index.row_group_index, &mut ranges);
508                self.set_file_builder(file_index, Arc::new(builder));
509            }
510        }
511        Ok(ranges)
512    }
513
514    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
515        let file_builders = self.file_builders.lock().unwrap();
516        file_builders[index].clone()
517    }
518
519    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
520        let mut file_builders = self.file_builders.lock().unwrap();
521        file_builders[index] = Some(builder);
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use common_time::timestamp::TimeUnit;
528    use common_time::Timestamp;
529
530    use super::*;
531
532    type Output = (Vec<usize>, i64, i64);
533
534    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
535        let ranges = input
536            .iter()
537            .map(|(idx, start, end)| {
538                let time_range = (
539                    Timestamp::new(*start, TimeUnit::Second),
540                    Timestamp::new(*end, TimeUnit::Second),
541                );
542                RangeMeta {
543                    time_range,
544                    indices: smallvec![SourceIndex {
545                        index: *idx,
546                        num_row_groups: 0,
547                    }],
548                    row_group_indices: smallvec![RowGroupIndex {
549                        index: *idx,
550                        row_group_index: 0
551                    }],
552                    num_rows: 1,
553                }
554            })
555            .collect();
556        let output = group_ranges_for_seq_scan(ranges);
557        let actual: Vec<_> = output
558            .iter()
559            .map(|range| {
560                let indices = range.indices.iter().map(|index| index.index).collect();
561                let group_indices: Vec<_> = range
562                    .row_group_indices
563                    .iter()
564                    .map(|idx| idx.index)
565                    .collect();
566                assert_eq!(indices, group_indices);
567                let range = range.time_range;
568                (indices, range.0.value(), range.1.value())
569            })
570            .collect();
571        assert_eq!(expect, actual);
572    }
573
574    #[test]
575    fn test_group_ranges() {
576        // Group 1 part.
577        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
578
579        // 1, 2, 3, 4 => [3, 1, 4], [2]
580        run_group_ranges_test(
581            &[
582                (1, 1000, 2000),
583                (2, 6000, 7000),
584                (3, 0, 1500),
585                (4, 1500, 3000),
586            ],
587            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
588        );
589
590        // 1, 2, 3 => [3], [1], [2],
591        run_group_ranges_test(
592            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
593            &[
594                (vec![3], 0, 1000),
595                (vec![1], 3000, 4000),
596                (vec![2], 4001, 6000),
597            ],
598        );
599
600        // 1, 2, 3 => [3], [1, 2]
601        run_group_ranges_test(
602            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
603            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
604        );
605    }
606
607    #[test]
608    fn test_merge_range() {
609        let mut left = RangeMeta {
610            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
611            indices: smallvec![SourceIndex {
612                index: 1,
613                num_row_groups: 2,
614            }],
615            row_group_indices: smallvec![
616                RowGroupIndex {
617                    index: 1,
618                    row_group_index: 1
619                },
620                RowGroupIndex {
621                    index: 1,
622                    row_group_index: 2
623                }
624            ],
625            num_rows: 5,
626        };
627        let right = RangeMeta {
628            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
629            indices: smallvec![SourceIndex {
630                index: 2,
631                num_row_groups: 2,
632            }],
633            row_group_indices: smallvec![
634                RowGroupIndex {
635                    index: 2,
636                    row_group_index: 1
637                },
638                RowGroupIndex {
639                    index: 2,
640                    row_group_index: 2
641                }
642            ],
643            num_rows: 4,
644        };
645        left.merge(right);
646
647        assert_eq!(
648            left,
649            RangeMeta {
650                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
651                indices: smallvec![
652                    SourceIndex {
653                        index: 1,
654                        num_row_groups: 2
655                    },
656                    SourceIndex {
657                        index: 2,
658                        num_row_groups: 2
659                    }
660                ],
661                row_group_indices: smallvec![
662                    RowGroupIndex {
663                        index: 1,
664                        row_group_index: 1
665                    },
666                    RowGroupIndex {
667                        index: 1,
668                        row_group_index: 2
669                    },
670                    RowGroupIndex {
671                        index: 2,
672                        row_group_index: 1
673                    },
674                    RowGroupIndex {
675                        index: 2,
676                        row_group_index: 2
677                    },
678                ],
679                num_rows: 9,
680            }
681        );
682    }
683
684    #[test]
685    fn test_split_range() {
686        let range = RangeMeta {
687            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
688            indices: smallvec![SourceIndex {
689                index: 1,
690                num_row_groups: 2,
691            }],
692            row_group_indices: smallvec![RowGroupIndex {
693                index: 1,
694                row_group_index: ALL_ROW_GROUPS,
695            }],
696            num_rows: 5,
697        };
698
699        assert!(range.can_split_preserve_order());
700        let mut output = Vec::new();
701        range.maybe_split(&mut output);
702
703        assert_eq!(
704            output,
705            &[
706                RangeMeta {
707                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
708                    indices: smallvec![SourceIndex {
709                        index: 1,
710                        num_row_groups: 2,
711                    }],
712                    row_group_indices: smallvec![RowGroupIndex {
713                        index: 1,
714                        row_group_index: 0
715                    },],
716                    num_rows: 2,
717                },
718                RangeMeta {
719                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
720                    indices: smallvec![SourceIndex {
721                        index: 1,
722                        num_row_groups: 2,
723                    }],
724                    row_group_indices: smallvec![RowGroupIndex {
725                        index: 1,
726                        row_group_index: 1
727                    }],
728                    num_rows: 2,
729                }
730            ]
731        );
732    }
733
734    #[test]
735    fn test_not_split_range() {
736        let range = RangeMeta {
737            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
738            indices: smallvec![
739                SourceIndex {
740                    index: 1,
741                    num_row_groups: 1,
742                },
743                SourceIndex {
744                    index: 2,
745                    num_row_groups: 1,
746                }
747            ],
748            row_group_indices: smallvec![
749                RowGroupIndex {
750                    index: 1,
751                    row_group_index: 1
752                },
753                RowGroupIndex {
754                    index: 2,
755                    row_group_index: 1
756                }
757            ],
758            num_rows: 5,
759        };
760
761        assert!(!range.can_split_preserve_order());
762        let mut output = Vec::new();
763        range.maybe_split(&mut output);
764        assert_eq!(1, output.len());
765    }
766
767    #[test]
768    fn test_maybe_split_ranges() {
769        let ranges = vec![
770            RangeMeta {
771                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
772                indices: smallvec![SourceIndex {
773                    index: 0,
774                    num_row_groups: 1,
775                }],
776                row_group_indices: smallvec![RowGroupIndex {
777                    index: 0,
778                    row_group_index: 0,
779                },],
780                num_rows: 4,
781            },
782            RangeMeta {
783                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
784                indices: smallvec![SourceIndex {
785                    index: 1,
786                    num_row_groups: 2,
787                }],
788                row_group_indices: smallvec![RowGroupIndex {
789                    index: 1,
790                    row_group_index: ALL_ROW_GROUPS,
791                },],
792                num_rows: 4,
793            },
794            RangeMeta {
795                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
796                indices: smallvec![
797                    SourceIndex {
798                        index: 2,
799                        num_row_groups: 2,
800                    },
801                    SourceIndex {
802                        index: 3,
803                        num_row_groups: 0,
804                    }
805                ],
806                row_group_indices: smallvec![
807                    RowGroupIndex {
808                        index: 2,
809                        row_group_index: ALL_ROW_GROUPS,
810                    },
811                    RowGroupIndex {
812                        index: 3,
813                        row_group_index: ALL_ROW_GROUPS,
814                    }
815                ],
816                num_rows: 5,
817            },
818        ];
819        let output = maybe_split_ranges_for_seq_scan(ranges);
820        assert_eq!(
821            output,
822            vec![
823                RangeMeta {
824                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
825                    indices: smallvec![SourceIndex {
826                        index: 0,
827                        num_row_groups: 1,
828                    }],
829                    row_group_indices: smallvec![RowGroupIndex {
830                        index: 0,
831                        row_group_index: 0
832                    },],
833                    num_rows: 4,
834                },
835                RangeMeta {
836                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
837                    indices: smallvec![SourceIndex {
838                        index: 1,
839                        num_row_groups: 2,
840                    }],
841                    row_group_indices: smallvec![RowGroupIndex {
842                        index: 1,
843                        row_group_index: 0
844                    },],
845                    num_rows: 2,
846                },
847                RangeMeta {
848                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
849                    indices: smallvec![SourceIndex {
850                        index: 1,
851                        num_row_groups: 2,
852                    }],
853                    row_group_indices: smallvec![RowGroupIndex {
854                        index: 1,
855                        row_group_index: 1
856                    }],
857                    num_rows: 2,
858                },
859                RangeMeta {
860                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
861                    indices: smallvec![
862                        SourceIndex {
863                            index: 2,
864                            num_row_groups: 2
865                        },
866                        SourceIndex {
867                            index: 3,
868                            num_row_groups: 0,
869                        }
870                    ],
871                    row_group_indices: smallvec![
872                        RowGroupIndex {
873                            index: 2,
874                            row_group_index: ALL_ROW_GROUPS,
875                        },
876                        RowGroupIndex {
877                            index: 3,
878                            row_group_index: ALL_ROW_GROUPS,
879                        }
880                    ],
881                    num_rows: 5,
882                },
883            ]
884        )
885    }
886}