mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{smallvec, SmallVec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableRanges, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{overlaps, FileHandle, FileTimeRange};
29use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
30use crate::sst::parquet::format::parquet_row_group_time_range;
31use crate::sst::parquet::reader::ReaderMetrics;
32use crate::sst::parquet::row_selection::RowGroupSelection;
33use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub(crate) struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub(crate) row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput, compaction: bool) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        let ranges = group_ranges_for_seq_scan(ranges);
101        if compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
102            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
103            return ranges;
104        }
105        maybe_split_ranges_for_seq_scan(ranges)
106    }
107
108    /// Creates a list of ranges from the `input` for unordered scan.
109    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
110        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
111        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
112        Self::push_unordered_file_ranges(
113            input.memtables.len(),
114            &input.files,
115            &input.cache_strategy,
116            &mut ranges,
117        );
118
119        ranges
120    }
121
122    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
123    fn overlaps(&self, meta: &RangeMeta) -> bool {
124        overlaps(&self.time_range, &meta.time_range)
125    }
126
127    /// Merges given `meta` to this meta.
128    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
129    fn merge(&mut self, mut other: RangeMeta) {
130        debug_assert!(self.overlaps(&other));
131        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
132        debug_assert!(self
133            .row_group_indices
134            .iter()
135            .all(|idx| !other.row_group_indices.contains(idx)));
136
137        self.time_range = (
138            self.time_range.0.min(other.time_range.0),
139            self.time_range.1.max(other.time_range.1),
140        );
141        self.indices.append(&mut other.indices);
142        self.row_group_indices.append(&mut other.row_group_indices);
143        self.num_rows += other.num_rows;
144    }
145
146    /// Returns true if we can split the range into multiple smaller ranges and
147    /// still preserve the order for [SeqScan].
148    fn can_split_preserve_order(&self) -> bool {
149        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
150    }
151
152    /// Splits the range if it can preserve the order.
153    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
154        if self.can_split_preserve_order() {
155            let num_row_groups = self.indices[0].num_row_groups;
156            debug_assert_eq!(1, self.row_group_indices.len());
157            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
158
159            output.reserve(self.row_group_indices.len());
160            let num_rows = self.num_rows / num_row_groups as usize;
161            // Splits by row group.
162            for row_group_index in 0..num_row_groups {
163                output.push(RangeMeta {
164                    time_range: self.time_range,
165                    indices: self.indices.clone(),
166                    row_group_indices: smallvec![RowGroupIndex {
167                        index: self.indices[0].index,
168                        row_group_index: row_group_index as i64,
169                    }],
170                    num_rows,
171                });
172            }
173        } else {
174            output.push(self);
175        }
176    }
177
178    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
179        // For append mode, we can parallelize reading memtables.
180        for (memtable_index, memtable) in memtables.iter().enumerate() {
181            let stats = memtable.stats();
182            let Some(time_range) = stats.time_range() else {
183                continue;
184            };
185            for row_group_index in 0..stats.num_ranges() {
186                let num_rows = stats.num_rows() / stats.num_ranges();
187                ranges.push(RangeMeta {
188                    time_range,
189                    indices: smallvec![SourceIndex {
190                        index: memtable_index,
191                        num_row_groups: stats.num_ranges() as u64,
192                    }],
193                    row_group_indices: smallvec![RowGroupIndex {
194                        index: memtable_index,
195                        row_group_index: row_group_index as i64,
196                    }],
197                    num_rows,
198                });
199            }
200        }
201    }
202
203    fn push_unordered_file_ranges(
204        num_memtables: usize,
205        files: &[FileHandle],
206        cache: &CacheStrategy,
207        ranges: &mut Vec<RangeMeta>,
208    ) {
209        // For append mode, we can parallelize reading row groups.
210        for (i, file) in files.iter().enumerate() {
211            let file_index = num_memtables + i;
212            // Get parquet meta from the cache.
213            let parquet_meta =
214                cache.get_parquet_meta_data_from_mem_cache(file.region_id(), file.file_id());
215            if let Some(parquet_meta) = parquet_meta {
216                // Scans each row group.
217                for row_group_index in 0..file.meta_ref().num_row_groups {
218                    let time_range = parquet_row_group_time_range(
219                        file.meta_ref(),
220                        &parquet_meta,
221                        row_group_index as usize,
222                    );
223                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
224                    ranges.push(RangeMeta {
225                        time_range: time_range.unwrap_or_else(|| file.time_range()),
226                        indices: smallvec![SourceIndex {
227                            index: file_index,
228                            num_row_groups: file.meta_ref().num_row_groups,
229                        }],
230                        row_group_indices: smallvec![RowGroupIndex {
231                            index: file_index,
232                            row_group_index: row_group_index as i64,
233                        }],
234                        num_rows: num_rows as usize,
235                    });
236                }
237            } else if file.meta_ref().num_row_groups > 0 {
238                // Scans each row group.
239                for row_group_index in 0..file.meta_ref().num_row_groups {
240                    ranges.push(RangeMeta {
241                        time_range: file.time_range(),
242                        indices: smallvec![SourceIndex {
243                            index: file_index,
244                            num_row_groups: file.meta_ref().num_row_groups,
245                        }],
246                        row_group_indices: smallvec![RowGroupIndex {
247                            index: file_index,
248                            row_group_index: row_group_index as i64,
249                        }],
250                        num_rows: DEFAULT_ROW_GROUP_SIZE,
251                    });
252                }
253            } else {
254                // If we don't known the number of row groups in advance, scan all row groups.
255                ranges.push(RangeMeta {
256                    time_range: file.time_range(),
257                    indices: smallvec![SourceIndex {
258                        index: file_index,
259                        num_row_groups: 0,
260                    }],
261                    row_group_indices: smallvec![RowGroupIndex {
262                        index: file_index,
263                        row_group_index: ALL_ROW_GROUPS,
264                    }],
265                    // This may be 0.
266                    num_rows: file.meta_ref().num_rows as usize,
267                });
268            }
269        }
270    }
271
272    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
273        // For non append-only mode, each range only contains one memtable by default.
274        for (i, memtable) in memtables.iter().enumerate() {
275            let stats = memtable.stats();
276            let Some(time_range) = stats.time_range() else {
277                continue;
278            };
279            ranges.push(RangeMeta {
280                time_range,
281                indices: smallvec![SourceIndex {
282                    index: i,
283                    num_row_groups: stats.num_ranges() as u64,
284                }],
285                row_group_indices: smallvec![RowGroupIndex {
286                    index: i,
287                    row_group_index: ALL_ROW_GROUPS,
288                }],
289                num_rows: stats.num_rows(),
290            });
291        }
292    }
293
294    fn push_seq_file_ranges(
295        num_memtables: usize,
296        files: &[FileHandle],
297        ranges: &mut Vec<RangeMeta>,
298    ) {
299        // For non append-only mode, each range only contains one file.
300        for (i, file) in files.iter().enumerate() {
301            let file_index = num_memtables + i;
302            ranges.push(RangeMeta {
303                time_range: file.time_range(),
304                indices: smallvec![SourceIndex {
305                    index: file_index,
306                    num_row_groups: file.meta_ref().num_row_groups,
307                }],
308                row_group_indices: smallvec![RowGroupIndex {
309                    index: file_index,
310                    row_group_index: ALL_ROW_GROUPS,
311                }],
312                num_rows: file.meta_ref().num_rows as usize,
313            });
314        }
315    }
316}
317
318/// Groups ranges by time range.
319/// It assumes each input range only contains a file or a memtable.
320fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
321    if ranges.is_empty() {
322        return ranges;
323    }
324
325    // Sorts ranges by time range (start asc, end desc).
326    ranges.sort_unstable_by(|a, b| {
327        let l = a.time_range;
328        let r = b.time_range;
329        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
330    });
331    let mut range_in_progress = None;
332    // Parts with exclusive time ranges.
333    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
334    for range in ranges {
335        let Some(mut prev_range) = range_in_progress.take() else {
336            // This is the new range to process.
337            range_in_progress = Some(range);
338            continue;
339        };
340
341        if prev_range.overlaps(&range) {
342            prev_range.merge(range);
343            range_in_progress = Some(prev_range);
344        } else {
345            exclusive_ranges.push(prev_range);
346            range_in_progress = Some(range);
347        }
348    }
349    if let Some(range) = range_in_progress {
350        exclusive_ranges.push(range);
351    }
352
353    exclusive_ranges
354}
355
356/// Splits the range into multiple smaller ranges.
357/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
358fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
359    let mut new_ranges = Vec::with_capacity(ranges.len());
360    for range in ranges {
361        range.maybe_split(&mut new_ranges);
362    }
363
364    new_ranges
365}
366
367/// Builder to create file ranges.
368#[derive(Default)]
369pub(crate) struct FileRangeBuilder {
370    /// Context for the file.
371    /// None indicates nothing to read.
372    context: Option<FileRangeContextRef>,
373    /// Row group selection for the file to read.
374    selection: RowGroupSelection,
375}
376
377impl FileRangeBuilder {
378    /// Builds a file range builder from context and row groups.
379    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
380        Self {
381            context: Some(context),
382            selection,
383        }
384    }
385
386    /// Builds file ranges to read.
387    /// Negative `row_group_index` indicates all row groups.
388    pub(crate) fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
389        let Some(context) = self.context.clone() else {
390            return;
391        };
392        if row_group_index >= 0 {
393            let row_group_index = row_group_index as usize;
394            // Scans one row group.
395            let Some(row_selection) = self.selection.get(row_group_index) else {
396                return;
397            };
398            ranges.push(FileRange::new(
399                context,
400                row_group_index,
401                Some(row_selection.clone()),
402            ));
403        } else {
404            // Scans all row groups.
405            ranges.extend(
406                self.selection
407                    .iter()
408                    .map(|(row_group_index, row_selection)| {
409                        FileRange::new(
410                            context.clone(),
411                            *row_group_index,
412                            Some(row_selection.clone()),
413                        )
414                    }),
415            );
416        }
417    }
418}
419
420/// Builder to create mem ranges.
421pub(crate) struct MemRangeBuilder {
422    /// Ranges of a memtable.
423    ranges: MemtableRanges,
424}
425
426impl MemRangeBuilder {
427    /// Builds a mem range builder from row groups.
428    pub(crate) fn new(ranges: MemtableRanges) -> Self {
429        Self { ranges }
430    }
431
432    /// Builds mem ranges to read in the memtable.
433    /// Negative `row_group_index` indicates all row groups.
434    pub(crate) fn build_ranges(
435        &self,
436        row_group_index: i64,
437        ranges: &mut SmallVec<[MemtableRange; 2]>,
438    ) {
439        if row_group_index >= 0 {
440            let row_group_index = row_group_index as usize;
441            // Scans one row group.
442            let Some(range) = self.ranges.ranges.get(&row_group_index) else {
443                return;
444            };
445            ranges.push(range.clone());
446        } else {
447            ranges.extend(self.ranges.ranges.values().cloned());
448        }
449    }
450
451    /// Returns the statistics of the memtable.
452    pub(crate) fn stats(&self) -> &MemtableStats {
453        &self.ranges.stats
454    }
455}
456
457/// List to manages the builders to create file ranges.
458/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
459/// the list to different streams in the same partition.
460pub(crate) struct RangeBuilderList {
461    num_memtables: usize,
462    file_builders: Mutex<Vec<Option<Arc<FileRangeBuilder>>>>,
463}
464
465impl RangeBuilderList {
466    /// Creates a new [ReaderBuilderList] with the given number of memtables and files.
467    pub(crate) fn new(num_memtables: usize, num_files: usize) -> Self {
468        let file_builders = (0..num_files).map(|_| None).collect();
469        Self {
470            num_memtables,
471            file_builders: Mutex::new(file_builders),
472        }
473    }
474
475    /// Builds file ranges to read the row group at `index`.
476    pub(crate) async fn build_file_ranges(
477        &self,
478        input: &ScanInput,
479        index: RowGroupIndex,
480        reader_metrics: &mut ReaderMetrics,
481    ) -> Result<SmallVec<[FileRange; 2]>> {
482        let mut ranges = SmallVec::new();
483        let file_index = index.index - self.num_memtables;
484        let builder_opt = self.get_file_builder(file_index);
485        match builder_opt {
486            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
487            None => {
488                let builder = input.prune_file(file_index, reader_metrics).await?;
489                builder.build_ranges(index.row_group_index, &mut ranges);
490                self.set_file_builder(file_index, Arc::new(builder));
491            }
492        }
493        Ok(ranges)
494    }
495
496    fn get_file_builder(&self, index: usize) -> Option<Arc<FileRangeBuilder>> {
497        let file_builders = self.file_builders.lock().unwrap();
498        file_builders[index].clone()
499    }
500
501    fn set_file_builder(&self, index: usize, builder: Arc<FileRangeBuilder>) {
502        let mut file_builders = self.file_builders.lock().unwrap();
503        file_builders[index] = Some(builder);
504    }
505}
506
507#[cfg(test)]
508mod tests {
509    use common_time::timestamp::TimeUnit;
510    use common_time::Timestamp;
511
512    use super::*;
513
514    type Output = (Vec<usize>, i64, i64);
515
516    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
517        let ranges = input
518            .iter()
519            .map(|(idx, start, end)| {
520                let time_range = (
521                    Timestamp::new(*start, TimeUnit::Second),
522                    Timestamp::new(*end, TimeUnit::Second),
523                );
524                RangeMeta {
525                    time_range,
526                    indices: smallvec![SourceIndex {
527                        index: *idx,
528                        num_row_groups: 0,
529                    }],
530                    row_group_indices: smallvec![RowGroupIndex {
531                        index: *idx,
532                        row_group_index: 0
533                    }],
534                    num_rows: 1,
535                }
536            })
537            .collect();
538        let output = group_ranges_for_seq_scan(ranges);
539        let actual: Vec<_> = output
540            .iter()
541            .map(|range| {
542                let indices = range.indices.iter().map(|index| index.index).collect();
543                let group_indices: Vec<_> = range
544                    .row_group_indices
545                    .iter()
546                    .map(|idx| idx.index)
547                    .collect();
548                assert_eq!(indices, group_indices);
549                let range = range.time_range;
550                (indices, range.0.value(), range.1.value())
551            })
552            .collect();
553        assert_eq!(expect, actual);
554    }
555
556    #[test]
557    fn test_group_ranges() {
558        // Group 1 part.
559        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
560
561        // 1, 2, 3, 4 => [3, 1, 4], [2]
562        run_group_ranges_test(
563            &[
564                (1, 1000, 2000),
565                (2, 6000, 7000),
566                (3, 0, 1500),
567                (4, 1500, 3000),
568            ],
569            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
570        );
571
572        // 1, 2, 3 => [3], [1], [2],
573        run_group_ranges_test(
574            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
575            &[
576                (vec![3], 0, 1000),
577                (vec![1], 3000, 4000),
578                (vec![2], 4001, 6000),
579            ],
580        );
581
582        // 1, 2, 3 => [3], [1, 2]
583        run_group_ranges_test(
584            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
585            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
586        );
587    }
588
589    #[test]
590    fn test_merge_range() {
591        let mut left = RangeMeta {
592            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
593            indices: smallvec![SourceIndex {
594                index: 1,
595                num_row_groups: 2,
596            }],
597            row_group_indices: smallvec![
598                RowGroupIndex {
599                    index: 1,
600                    row_group_index: 1
601                },
602                RowGroupIndex {
603                    index: 1,
604                    row_group_index: 2
605                }
606            ],
607            num_rows: 5,
608        };
609        let right = RangeMeta {
610            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
611            indices: smallvec![SourceIndex {
612                index: 2,
613                num_row_groups: 2,
614            }],
615            row_group_indices: smallvec![
616                RowGroupIndex {
617                    index: 2,
618                    row_group_index: 1
619                },
620                RowGroupIndex {
621                    index: 2,
622                    row_group_index: 2
623                }
624            ],
625            num_rows: 4,
626        };
627        left.merge(right);
628
629        assert_eq!(
630            left,
631            RangeMeta {
632                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
633                indices: smallvec![
634                    SourceIndex {
635                        index: 1,
636                        num_row_groups: 2
637                    },
638                    SourceIndex {
639                        index: 2,
640                        num_row_groups: 2
641                    }
642                ],
643                row_group_indices: smallvec![
644                    RowGroupIndex {
645                        index: 1,
646                        row_group_index: 1
647                    },
648                    RowGroupIndex {
649                        index: 1,
650                        row_group_index: 2
651                    },
652                    RowGroupIndex {
653                        index: 2,
654                        row_group_index: 1
655                    },
656                    RowGroupIndex {
657                        index: 2,
658                        row_group_index: 2
659                    },
660                ],
661                num_rows: 9,
662            }
663        );
664    }
665
666    #[test]
667    fn test_split_range() {
668        let range = RangeMeta {
669            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
670            indices: smallvec![SourceIndex {
671                index: 1,
672                num_row_groups: 2,
673            }],
674            row_group_indices: smallvec![RowGroupIndex {
675                index: 1,
676                row_group_index: ALL_ROW_GROUPS,
677            }],
678            num_rows: 5,
679        };
680
681        assert!(range.can_split_preserve_order());
682        let mut output = Vec::new();
683        range.maybe_split(&mut output);
684
685        assert_eq!(
686            output,
687            &[
688                RangeMeta {
689                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
690                    indices: smallvec![SourceIndex {
691                        index: 1,
692                        num_row_groups: 2,
693                    }],
694                    row_group_indices: smallvec![RowGroupIndex {
695                        index: 1,
696                        row_group_index: 0
697                    },],
698                    num_rows: 2,
699                },
700                RangeMeta {
701                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
702                    indices: smallvec![SourceIndex {
703                        index: 1,
704                        num_row_groups: 2,
705                    }],
706                    row_group_indices: smallvec![RowGroupIndex {
707                        index: 1,
708                        row_group_index: 1
709                    }],
710                    num_rows: 2,
711                }
712            ]
713        );
714    }
715
716    #[test]
717    fn test_not_split_range() {
718        let range = RangeMeta {
719            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
720            indices: smallvec![
721                SourceIndex {
722                    index: 1,
723                    num_row_groups: 1,
724                },
725                SourceIndex {
726                    index: 2,
727                    num_row_groups: 1,
728                }
729            ],
730            row_group_indices: smallvec![
731                RowGroupIndex {
732                    index: 1,
733                    row_group_index: 1
734                },
735                RowGroupIndex {
736                    index: 2,
737                    row_group_index: 1
738                }
739            ],
740            num_rows: 5,
741        };
742
743        assert!(!range.can_split_preserve_order());
744        let mut output = Vec::new();
745        range.maybe_split(&mut output);
746        assert_eq!(1, output.len());
747    }
748
749    #[test]
750    fn test_maybe_split_ranges() {
751        let ranges = vec![
752            RangeMeta {
753                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
754                indices: smallvec![SourceIndex {
755                    index: 0,
756                    num_row_groups: 1,
757                }],
758                row_group_indices: smallvec![RowGroupIndex {
759                    index: 0,
760                    row_group_index: 0,
761                },],
762                num_rows: 4,
763            },
764            RangeMeta {
765                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
766                indices: smallvec![SourceIndex {
767                    index: 1,
768                    num_row_groups: 2,
769                }],
770                row_group_indices: smallvec![RowGroupIndex {
771                    index: 1,
772                    row_group_index: ALL_ROW_GROUPS,
773                },],
774                num_rows: 4,
775            },
776            RangeMeta {
777                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
778                indices: smallvec![
779                    SourceIndex {
780                        index: 2,
781                        num_row_groups: 2,
782                    },
783                    SourceIndex {
784                        index: 3,
785                        num_row_groups: 0,
786                    }
787                ],
788                row_group_indices: smallvec![
789                    RowGroupIndex {
790                        index: 2,
791                        row_group_index: ALL_ROW_GROUPS,
792                    },
793                    RowGroupIndex {
794                        index: 3,
795                        row_group_index: ALL_ROW_GROUPS,
796                    }
797                ],
798                num_rows: 5,
799            },
800        ];
801        let output = maybe_split_ranges_for_seq_scan(ranges);
802        assert_eq!(
803            output,
804            vec![
805                RangeMeta {
806                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
807                    indices: smallvec![SourceIndex {
808                        index: 0,
809                        num_row_groups: 1,
810                    }],
811                    row_group_indices: smallvec![RowGroupIndex {
812                        index: 0,
813                        row_group_index: 0
814                    },],
815                    num_rows: 4,
816                },
817                RangeMeta {
818                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
819                    indices: smallvec![SourceIndex {
820                        index: 1,
821                        num_row_groups: 2,
822                    }],
823                    row_group_indices: smallvec![RowGroupIndex {
824                        index: 1,
825                        row_group_index: 0
826                    },],
827                    num_rows: 2,
828                },
829                RangeMeta {
830                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
831                    indices: smallvec![SourceIndex {
832                        index: 1,
833                        num_row_groups: 2,
834                    }],
835                    row_group_indices: smallvec![RowGroupIndex {
836                        index: 1,
837                        row_group_index: 1
838                    }],
839                    num_rows: 2,
840                },
841                RangeMeta {
842                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
843                    indices: smallvec![
844                        SourceIndex {
845                            index: 2,
846                            num_row_groups: 2
847                        },
848                        SourceIndex {
849                            index: 3,
850                            num_row_groups: 0,
851                        }
852                    ],
853                    row_group_indices: smallvec![
854                        RowGroupIndex {
855                            index: 2,
856                            row_group_index: ALL_ROW_GROUPS,
857                        },
858                        RowGroupIndex {
859                            index: 3,
860                            row_group_index: ALL_ROW_GROUPS,
861                        }
862                    ],
863                    num_rows: 5,
864                },
865            ]
866        )
867    }
868}