mito2/read/
range.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structs for partition ranges.
16
17use std::sync::{Arc, Mutex};
18
19use common_time::Timestamp;
20use smallvec::{SmallVec, smallvec};
21use store_api::region_engine::PartitionRange;
22use store_api::storage::TimeSeriesDistribution;
23
24use crate::cache::CacheStrategy;
25use crate::error::Result;
26use crate::memtable::{MemtableRange, MemtableStats};
27use crate::read::scan_region::ScanInput;
28use crate::sst::file::{FileHandle, FileTimeRange, overlaps};
29use crate::sst::parquet::DEFAULT_ROW_GROUP_SIZE;
30use crate::sst::parquet::file_range::{FileRange, FileRangeContextRef};
31use crate::sst::parquet::format::parquet_row_group_time_range;
32use crate::sst::parquet::reader::ReaderMetrics;
33use crate::sst::parquet::row_selection::RowGroupSelection;
34
35const ALL_ROW_GROUPS: i64 = -1;
36
37/// Index and metadata for a memtable or file.
38#[derive(Debug, Clone, Copy, PartialEq)]
39pub(crate) struct SourceIndex {
40    /// Index of the memtable and file.
41    pub(crate) index: usize,
42    /// Total number of row groups in this source. 0 if the metadata
43    /// is unavailable. We use this to split files.
44    pub(crate) num_row_groups: u64,
45}
46
47/// Index to access a row group.
48#[derive(Debug, Clone, Copy, PartialEq)]
49pub struct RowGroupIndex {
50    /// Index to the memtable/file.
51    pub(crate) index: usize,
52    /// Row group index in the file.
53    /// Negative index indicates all row groups.
54    pub row_group_index: i64,
55}
56
57/// Meta data of a partition range.
58/// If the scanner is [UnorderedScan], each meta only has one row group or memtable.
59/// If the scanner is [SeqScan], each meta may have multiple row groups and memtables.
60#[derive(Debug, PartialEq)]
61pub(crate) struct RangeMeta {
62    /// The time range of the range.
63    pub(crate) time_range: FileTimeRange,
64    /// Indices to memtables or files.
65    pub(crate) indices: SmallVec<[SourceIndex; 2]>,
66    /// Indices to memtable/file row groups that this range scans.
67    pub(crate) row_group_indices: SmallVec<[RowGroupIndex; 2]>,
68    /// Estimated number of rows in the range. This can be 0 if the statistics are not available.
69    pub(crate) num_rows: usize,
70}
71
72impl RangeMeta {
73    /// Creates a [PartitionRange] with specific identifier.
74    /// It converts the inclusive max timestamp to exclusive end timestamp.
75    pub(crate) fn new_partition_range(&self, identifier: usize) -> PartitionRange {
76        PartitionRange {
77            start: self.time_range.0,
78            end: Timestamp::new(
79                // The i64::MAX timestamp may be invisible but we don't guarantee to support this
80                // value now.
81                self.time_range
82                    .1
83                    .value()
84                    .checked_add(1)
85                    .unwrap_or(self.time_range.1.value()),
86                self.time_range.1.unit(),
87            ),
88            num_rows: self.num_rows,
89            identifier,
90        }
91    }
92
93    /// Creates a list of ranges from the `input` for seq scan.
94    /// If `input.compaction` is true, it doesn't split the ranges.
95    pub(crate) fn seq_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
96        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
97        Self::push_seq_mem_ranges(&input.memtables, &mut ranges);
98        Self::push_seq_file_ranges(input.memtables.len(), &input.files, &mut ranges);
99
100        #[cfg(feature = "enterprise")]
101        Self::push_extension_ranges(input, &mut ranges);
102
103        let ranges = group_ranges_for_seq_scan(ranges);
104        if input.compaction || input.distribution == Some(TimeSeriesDistribution::PerSeries) {
105            // We don't split ranges in compaction or TimeSeriesDistribution::PerSeries.
106            return ranges;
107        }
108        maybe_split_ranges_for_seq_scan(ranges)
109    }
110
111    /// Creates a list of ranges from the `input` for unordered scan.
112    pub(crate) fn unordered_scan_ranges(input: &ScanInput) -> Vec<RangeMeta> {
113        let mut ranges = Vec::with_capacity(input.memtables.len() + input.files.len());
114        Self::push_unordered_mem_ranges(&input.memtables, &mut ranges);
115        Self::push_unordered_file_ranges(
116            input.memtables.len(),
117            &input.files,
118            &input.cache_strategy,
119            &mut ranges,
120        );
121
122        #[cfg(feature = "enterprise")]
123        Self::push_extension_ranges(input, &mut ranges);
124
125        ranges
126    }
127
128    /// Returns true if the time range of given `meta` overlaps with the time range of this meta.
129    fn overlaps(&self, meta: &RangeMeta) -> bool {
130        overlaps(&self.time_range, &meta.time_range)
131    }
132
133    /// Merges given `meta` to this meta.
134    /// It assumes that the time ranges overlap and they don't have the same file or memtable index.
135    fn merge(&mut self, mut other: RangeMeta) {
136        debug_assert!(self.overlaps(&other));
137        debug_assert!(self.indices.iter().all(|idx| !other.indices.contains(idx)));
138        debug_assert!(
139            self.row_group_indices
140                .iter()
141                .all(|idx| !other.row_group_indices.contains(idx))
142        );
143
144        self.time_range = (
145            self.time_range.0.min(other.time_range.0),
146            self.time_range.1.max(other.time_range.1),
147        );
148        self.indices.append(&mut other.indices);
149        self.row_group_indices.append(&mut other.row_group_indices);
150        self.num_rows += other.num_rows;
151    }
152
153    /// Returns true if we can split the range into multiple smaller ranges and
154    /// still preserve the order for [SeqScan].
155    fn can_split_preserve_order(&self) -> bool {
156        self.indices.len() == 1 && self.indices[0].num_row_groups > 1
157    }
158
159    /// Splits the range if it can preserve the order.
160    fn maybe_split(self, output: &mut Vec<RangeMeta>) {
161        if self.can_split_preserve_order() {
162            let num_row_groups = self.indices[0].num_row_groups;
163            debug_assert_eq!(1, self.row_group_indices.len());
164            debug_assert_eq!(ALL_ROW_GROUPS, self.row_group_indices[0].row_group_index);
165
166            output.reserve(self.row_group_indices.len());
167            let num_rows = self.num_rows / num_row_groups as usize;
168            // Splits by row group.
169            for row_group_index in 0..num_row_groups {
170                output.push(RangeMeta {
171                    time_range: self.time_range,
172                    indices: self.indices.clone(),
173                    row_group_indices: smallvec![RowGroupIndex {
174                        index: self.indices[0].index,
175                        row_group_index: row_group_index as i64,
176                    }],
177                    num_rows,
178                });
179            }
180        } else {
181            output.push(self);
182        }
183    }
184
185    fn push_unordered_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
186        // For append mode, we can parallelize reading memtables.
187        for (memtable_index, memtable) in memtables.iter().enumerate() {
188            let stats = memtable.stats();
189            let Some(time_range) = stats.time_range() else {
190                continue;
191            };
192            for row_group_index in 0..stats.num_ranges() {
193                let num_rows = stats.num_rows() / stats.num_ranges();
194                ranges.push(RangeMeta {
195                    time_range,
196                    indices: smallvec![SourceIndex {
197                        index: memtable_index,
198                        num_row_groups: stats.num_ranges() as u64,
199                    }],
200                    row_group_indices: smallvec![RowGroupIndex {
201                        index: memtable_index,
202                        row_group_index: row_group_index as i64,
203                    }],
204                    num_rows,
205                });
206            }
207        }
208    }
209
210    fn push_unordered_file_ranges(
211        num_memtables: usize,
212        files: &[FileHandle],
213        cache: &CacheStrategy,
214        ranges: &mut Vec<RangeMeta>,
215    ) {
216        // For append mode, we can parallelize reading row groups.
217        for (i, file) in files.iter().enumerate() {
218            let file_index = num_memtables + i;
219            // Get parquet meta from the cache.
220            let parquet_meta = cache.get_parquet_meta_data_from_mem_cache(file.file_id());
221            if let Some(parquet_meta) = parquet_meta {
222                // Scans each row group.
223                for row_group_index in 0..file.meta_ref().num_row_groups {
224                    let time_range = parquet_row_group_time_range(
225                        file.meta_ref(),
226                        &parquet_meta,
227                        row_group_index as usize,
228                    );
229                    let num_rows = parquet_meta.row_group(row_group_index as usize).num_rows();
230                    ranges.push(RangeMeta {
231                        time_range: time_range.unwrap_or_else(|| file.time_range()),
232                        indices: smallvec![SourceIndex {
233                            index: file_index,
234                            num_row_groups: file.meta_ref().num_row_groups,
235                        }],
236                        row_group_indices: smallvec![RowGroupIndex {
237                            index: file_index,
238                            row_group_index: row_group_index as i64,
239                        }],
240                        num_rows: num_rows as usize,
241                    });
242                }
243            } else if file.meta_ref().num_row_groups > 0 {
244                // Scans each row group.
245                for row_group_index in 0..file.meta_ref().num_row_groups {
246                    ranges.push(RangeMeta {
247                        time_range: file.time_range(),
248                        indices: smallvec![SourceIndex {
249                            index: file_index,
250                            num_row_groups: file.meta_ref().num_row_groups,
251                        }],
252                        row_group_indices: smallvec![RowGroupIndex {
253                            index: file_index,
254                            row_group_index: row_group_index as i64,
255                        }],
256                        num_rows: DEFAULT_ROW_GROUP_SIZE,
257                    });
258                }
259            } else {
260                // If we don't known the number of row groups in advance, scan all row groups.
261                ranges.push(RangeMeta {
262                    time_range: file.time_range(),
263                    indices: smallvec![SourceIndex {
264                        index: file_index,
265                        num_row_groups: 0,
266                    }],
267                    row_group_indices: smallvec![RowGroupIndex {
268                        index: file_index,
269                        row_group_index: ALL_ROW_GROUPS,
270                    }],
271                    // This may be 0.
272                    num_rows: file.meta_ref().num_rows as usize,
273                });
274            }
275        }
276    }
277
278    fn push_seq_mem_ranges(memtables: &[MemRangeBuilder], ranges: &mut Vec<RangeMeta>) {
279        // For non append-only mode, each range only contains one memtable by default.
280        for (i, memtable) in memtables.iter().enumerate() {
281            let stats = memtable.stats();
282            let Some(time_range) = stats.time_range() else {
283                continue;
284            };
285            ranges.push(RangeMeta {
286                time_range,
287                indices: smallvec![SourceIndex {
288                    index: i,
289                    num_row_groups: stats.num_ranges() as u64,
290                }],
291                row_group_indices: smallvec![RowGroupIndex {
292                    index: i,
293                    row_group_index: ALL_ROW_GROUPS,
294                }],
295                num_rows: stats.num_rows(),
296            });
297        }
298    }
299
300    fn push_seq_file_ranges(
301        num_memtables: usize,
302        files: &[FileHandle],
303        ranges: &mut Vec<RangeMeta>,
304    ) {
305        // For non append-only mode, each range only contains one file.
306        for (i, file) in files.iter().enumerate() {
307            let file_index = num_memtables + i;
308            ranges.push(RangeMeta {
309                time_range: file.time_range(),
310                indices: smallvec![SourceIndex {
311                    index: file_index,
312                    num_row_groups: file.meta_ref().num_row_groups,
313                }],
314                row_group_indices: smallvec![RowGroupIndex {
315                    index: file_index,
316                    row_group_index: ALL_ROW_GROUPS,
317                }],
318                num_rows: file.meta_ref().num_rows as usize,
319            });
320        }
321    }
322
323    #[cfg(feature = "enterprise")]
324    fn push_extension_ranges(input: &ScanInput, metas: &mut Vec<RangeMeta>) {
325        for (i, range) in input.extension_ranges().iter().enumerate() {
326            let index = input.num_memtables() + input.num_files() + i;
327            metas.push(RangeMeta {
328                time_range: range.time_range(),
329                indices: smallvec![SourceIndex {
330                    index,
331                    num_row_groups: range.num_row_groups(),
332                }],
333                row_group_indices: smallvec![RowGroupIndex {
334                    index,
335                    row_group_index: ALL_ROW_GROUPS,
336                }],
337                num_rows: range.num_rows() as usize,
338            });
339        }
340    }
341}
342
343/// Groups ranges by time range.
344/// It assumes each input range only contains a file or a memtable.
345fn group_ranges_for_seq_scan(mut ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
346    if ranges.is_empty() {
347        return ranges;
348    }
349
350    // Sorts ranges by time range (start asc, end desc).
351    ranges.sort_unstable_by(|a, b| {
352        let l = a.time_range;
353        let r = b.time_range;
354        l.0.cmp(&r.0).then_with(|| r.1.cmp(&l.1))
355    });
356    let mut range_in_progress = None;
357    // Parts with exclusive time ranges.
358    let mut exclusive_ranges = Vec::with_capacity(ranges.len());
359    for range in ranges {
360        let Some(mut prev_range) = range_in_progress.take() else {
361            // This is the new range to process.
362            range_in_progress = Some(range);
363            continue;
364        };
365
366        if prev_range.overlaps(&range) {
367            prev_range.merge(range);
368            range_in_progress = Some(prev_range);
369        } else {
370            exclusive_ranges.push(prev_range);
371            range_in_progress = Some(range);
372        }
373    }
374    if let Some(range) = range_in_progress {
375        exclusive_ranges.push(range);
376    }
377
378    exclusive_ranges
379}
380
381/// Splits the range into multiple smaller ranges.
382/// It assumes the input `ranges` list is created by [group_ranges_for_seq_scan()].
383fn maybe_split_ranges_for_seq_scan(ranges: Vec<RangeMeta>) -> Vec<RangeMeta> {
384    let mut new_ranges = Vec::with_capacity(ranges.len());
385    for range in ranges {
386        range.maybe_split(&mut new_ranges);
387    }
388
389    new_ranges
390}
391
392/// Builder to create file ranges.
393#[derive(Default)]
394pub struct FileRangeBuilder {
395    /// Context for the file.
396    /// None indicates nothing to read.
397    context: Option<FileRangeContextRef>,
398    /// Row group selection for the file to read.
399    selection: RowGroupSelection,
400}
401
402impl FileRangeBuilder {
403    /// Builds a file range builder from context and row groups.
404    pub(crate) fn new(context: FileRangeContextRef, selection: RowGroupSelection) -> Self {
405        Self {
406            context: Some(context),
407            selection,
408        }
409    }
410
411    /// Builds file ranges to read.
412    /// Negative `row_group_index` indicates all row groups.
413    pub fn build_ranges(&self, row_group_index: i64, ranges: &mut SmallVec<[FileRange; 2]>) {
414        let Some(context) = self.context.clone() else {
415            return;
416        };
417        if row_group_index >= 0 {
418            let row_group_index = row_group_index as usize;
419            // Scans one row group.
420            let Some(row_selection) = self.selection.get(row_group_index) else {
421                return;
422            };
423            ranges.push(FileRange::new(
424                context,
425                row_group_index,
426                Some(row_selection.clone()),
427            ));
428        } else {
429            // Scans all row groups.
430            ranges.extend(
431                self.selection
432                    .iter()
433                    .map(|(row_group_index, row_selection)| {
434                        FileRange::new(
435                            context.clone(),
436                            *row_group_index,
437                            Some(row_selection.clone()),
438                        )
439                    }),
440            );
441        }
442    }
443
444    /// Returns the estimated memory size of this builder.
445    pub(crate) fn memory_size(&self) -> usize {
446        let context_size = self
447            .context
448            .as_ref()
449            .map(|ctx| ctx.memory_size())
450            .unwrap_or(0);
451        let selection_size = self.selection.mem_usage();
452        context_size + selection_size
453    }
454}
455
456/// Builder to create mem ranges.
457pub(crate) struct MemRangeBuilder {
458    /// Ranges of a memtable.
459    range: MemtableRange,
460    /// Stats of a memtable.
461    stats: MemtableStats,
462}
463
464impl MemRangeBuilder {
465    /// Builds a mem range builder from row groups.
466    pub(crate) fn new(range: MemtableRange, stats: MemtableStats) -> Self {
467        Self { range, stats }
468    }
469
470    /// Builds mem ranges to read in the memtable.
471    /// Negative `row_group_index` indicates all row groups.
472    pub(crate) fn build_ranges(
473        &self,
474        _row_group_index: i64,
475        ranges: &mut SmallVec<[MemtableRange; 2]>,
476    ) {
477        ranges.push(self.range.clone())
478    }
479
480    /// Returns the statistics of the memtable.
481    pub(crate) fn stats(&self) -> &MemtableStats {
482        &self.stats
483    }
484}
485
486/// Computes the number of ranges that reference each file.
487///
488/// # Arguments
489/// * `num_memtables` - Number of memtables
490/// * `num_files` - Number of files
491/// * `ranges` - All range metadata from StreamContext
492/// * `part_ranges` - Iterator of partition ranges to scan
493pub(crate) fn file_range_counts<'a>(
494    num_memtables: usize,
495    num_files: usize,
496    ranges: &[RangeMeta],
497    part_ranges: impl Iterator<Item = &'a PartitionRange>,
498) -> Vec<usize> {
499    let mut counts = vec![0usize; num_files];
500    for part_range in part_ranges {
501        let range_meta = &ranges[part_range.identifier];
502        for row_group_index in &range_meta.row_group_indices {
503            if row_group_index.index >= num_memtables {
504                let file_index = row_group_index.index - num_memtables;
505                if file_index < num_files {
506                    counts[file_index] += 1;
507                }
508            }
509        }
510    }
511    counts
512}
513
514/// Entry for a file builder with its remaining range count.
515struct FileBuilderEntry {
516    /// The builder for the file. None if not yet built or already cleared.
517    builder: Option<Arc<FileRangeBuilder>>,
518    /// Number of remaining ranges to scan for this file.
519    remaining_ranges: usize,
520}
521
522/// List to manages the builders to create file ranges.
523/// Each scan partition should have its own list. Mutex inside this list is used to allow moving
524/// the list to different streams in the same partition.
525pub(crate) struct RangeBuilderList {
526    num_memtables: usize,
527    file_entries: Mutex<Vec<FileBuilderEntry>>,
528}
529
530impl RangeBuilderList {
531    /// Creates a new [RangeBuilderList] with pre-computed file range counts.
532    ///
533    /// # Arguments
534    /// * `num_memtables` - Number of memtables
535    /// * `file_range_counts` - Pre-computed counts of ranges per file
536    pub(crate) fn new(num_memtables: usize, file_range_counts: Vec<usize>) -> Self {
537        let file_entries = file_range_counts
538            .into_iter()
539            .map(|count| FileBuilderEntry {
540                builder: None,
541                remaining_ranges: count,
542            })
543            .collect();
544
545        Self {
546            num_memtables,
547            file_entries: Mutex::new(file_entries),
548        }
549    }
550
551    /// Builds file ranges to read the row group at `index`.
552    pub(crate) async fn build_file_ranges(
553        &self,
554        input: &ScanInput,
555        index: RowGroupIndex,
556        reader_metrics: &mut ReaderMetrics,
557    ) -> Result<SmallVec<[FileRange; 2]>> {
558        let mut ranges = SmallVec::new();
559        let file_index = index.index - self.num_memtables;
560        let builder_opt = self.get_file_builder(file_index);
561        match builder_opt {
562            Some(builder) => builder.build_ranges(index.row_group_index, &mut ranges),
563            None => {
564                let file = &input.files[file_index];
565                let builder = input.prune_file(file, reader_metrics).await?;
566                builder.build_ranges(index.row_group_index, &mut ranges);
567
568                // Record memory size and count of newly built builder.
569                reader_metrics.metadata_mem_size += builder.memory_size() as isize;
570                reader_metrics.num_range_builders += 1;
571
572                self.set_file_builder(file_index, Arc::new(builder));
573            }
574        }
575
576        // Decrement remaining count and auto-clear if all ranges are scanned
577        self.decrement_and_maybe_clear(file_index, reader_metrics);
578
579        Ok(ranges)
580    }
581
582    fn get_file_builder(&self, file_index: usize) -> Option<Arc<FileRangeBuilder>> {
583        let entries = self.file_entries.lock().unwrap();
584        entries
585            .get(file_index)
586            .and_then(|entry| entry.builder.clone())
587    }
588
589    fn set_file_builder(&self, file_index: usize, builder: Arc<FileRangeBuilder>) {
590        let mut entries = self.file_entries.lock().unwrap();
591        if let Some(entry) = entries.get_mut(file_index) {
592            entry.builder = Some(builder);
593        }
594    }
595
596    /// Decrements the remaining range count for a file and clears the builder if done.
597    fn decrement_and_maybe_clear(&self, file_index: usize, reader_metrics: &mut ReaderMetrics) {
598        let mut entries = self.file_entries.lock().unwrap();
599        if let Some(entry) = entries.get_mut(file_index)
600            && entry.remaining_ranges > 0
601        {
602            entry.remaining_ranges -= 1;
603            if entry.remaining_ranges == 0
604                && let Some(builder) = entry.builder.take()
605            {
606                reader_metrics.metadata_mem_size -= builder.memory_size() as isize;
607                reader_metrics.num_range_builders -= 1;
608            }
609        }
610    }
611}
612
613#[cfg(test)]
614mod tests {
615    use common_time::Timestamp;
616    use common_time::timestamp::TimeUnit;
617
618    use super::*;
619
620    type Output = (Vec<usize>, i64, i64);
621
622    fn run_group_ranges_test(input: &[(usize, i64, i64)], expect: &[Output]) {
623        let ranges = input
624            .iter()
625            .map(|(idx, start, end)| {
626                let time_range = (
627                    Timestamp::new(*start, TimeUnit::Second),
628                    Timestamp::new(*end, TimeUnit::Second),
629                );
630                RangeMeta {
631                    time_range,
632                    indices: smallvec![SourceIndex {
633                        index: *idx,
634                        num_row_groups: 0,
635                    }],
636                    row_group_indices: smallvec![RowGroupIndex {
637                        index: *idx,
638                        row_group_index: 0
639                    }],
640                    num_rows: 1,
641                }
642            })
643            .collect();
644        let output = group_ranges_for_seq_scan(ranges);
645        let actual: Vec<_> = output
646            .iter()
647            .map(|range| {
648                let indices = range.indices.iter().map(|index| index.index).collect();
649                let group_indices: Vec<_> = range
650                    .row_group_indices
651                    .iter()
652                    .map(|idx| idx.index)
653                    .collect();
654                assert_eq!(indices, group_indices);
655                let range = range.time_range;
656                (indices, range.0.value(), range.1.value())
657            })
658            .collect();
659        assert_eq!(expect, actual);
660    }
661
662    #[test]
663    fn test_group_ranges() {
664        // Group 1 part.
665        run_group_ranges_test(&[(1, 0, 2000)], &[(vec![1], 0, 2000)]);
666
667        // 1, 2, 3, 4 => [3, 1, 4], [2]
668        run_group_ranges_test(
669            &[
670                (1, 1000, 2000),
671                (2, 6000, 7000),
672                (3, 0, 1500),
673                (4, 1500, 3000),
674            ],
675            &[(vec![3, 1, 4], 0, 3000), (vec![2], 6000, 7000)],
676        );
677
678        // 1, 2, 3 => [3], [1], [2],
679        run_group_ranges_test(
680            &[(1, 3000, 4000), (2, 4001, 6000), (3, 0, 1000)],
681            &[
682                (vec![3], 0, 1000),
683                (vec![1], 3000, 4000),
684                (vec![2], 4001, 6000),
685            ],
686        );
687
688        // 1, 2, 3 => [3], [1, 2]
689        run_group_ranges_test(
690            &[(1, 3000, 4000), (2, 4000, 6000), (3, 0, 1000)],
691            &[(vec![3], 0, 1000), (vec![1, 2], 3000, 6000)],
692        );
693    }
694
695    #[test]
696    fn test_merge_range() {
697        let mut left = RangeMeta {
698            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
699            indices: smallvec![SourceIndex {
700                index: 1,
701                num_row_groups: 2,
702            }],
703            row_group_indices: smallvec![
704                RowGroupIndex {
705                    index: 1,
706                    row_group_index: 1
707                },
708                RowGroupIndex {
709                    index: 1,
710                    row_group_index: 2
711                }
712            ],
713            num_rows: 5,
714        };
715        let right = RangeMeta {
716            time_range: (Timestamp::new_second(800), Timestamp::new_second(1200)),
717            indices: smallvec![SourceIndex {
718                index: 2,
719                num_row_groups: 2,
720            }],
721            row_group_indices: smallvec![
722                RowGroupIndex {
723                    index: 2,
724                    row_group_index: 1
725                },
726                RowGroupIndex {
727                    index: 2,
728                    row_group_index: 2
729                }
730            ],
731            num_rows: 4,
732        };
733        left.merge(right);
734
735        assert_eq!(
736            left,
737            RangeMeta {
738                time_range: (Timestamp::new_second(800), Timestamp::new_second(2000)),
739                indices: smallvec![
740                    SourceIndex {
741                        index: 1,
742                        num_row_groups: 2
743                    },
744                    SourceIndex {
745                        index: 2,
746                        num_row_groups: 2
747                    }
748                ],
749                row_group_indices: smallvec![
750                    RowGroupIndex {
751                        index: 1,
752                        row_group_index: 1
753                    },
754                    RowGroupIndex {
755                        index: 1,
756                        row_group_index: 2
757                    },
758                    RowGroupIndex {
759                        index: 2,
760                        row_group_index: 1
761                    },
762                    RowGroupIndex {
763                        index: 2,
764                        row_group_index: 2
765                    },
766                ],
767                num_rows: 9,
768            }
769        );
770    }
771
772    #[test]
773    fn test_split_range() {
774        let range = RangeMeta {
775            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
776            indices: smallvec![SourceIndex {
777                index: 1,
778                num_row_groups: 2,
779            }],
780            row_group_indices: smallvec![RowGroupIndex {
781                index: 1,
782                row_group_index: ALL_ROW_GROUPS,
783            }],
784            num_rows: 5,
785        };
786
787        assert!(range.can_split_preserve_order());
788        let mut output = Vec::new();
789        range.maybe_split(&mut output);
790
791        assert_eq!(
792            output,
793            &[
794                RangeMeta {
795                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
796                    indices: smallvec![SourceIndex {
797                        index: 1,
798                        num_row_groups: 2,
799                    }],
800                    row_group_indices: smallvec![RowGroupIndex {
801                        index: 1,
802                        row_group_index: 0
803                    },],
804                    num_rows: 2,
805                },
806                RangeMeta {
807                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
808                    indices: smallvec![SourceIndex {
809                        index: 1,
810                        num_row_groups: 2,
811                    }],
812                    row_group_indices: smallvec![RowGroupIndex {
813                        index: 1,
814                        row_group_index: 1
815                    }],
816                    num_rows: 2,
817                }
818            ]
819        );
820    }
821
822    #[test]
823    fn test_not_split_range() {
824        let range = RangeMeta {
825            time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
826            indices: smallvec![
827                SourceIndex {
828                    index: 1,
829                    num_row_groups: 1,
830                },
831                SourceIndex {
832                    index: 2,
833                    num_row_groups: 1,
834                }
835            ],
836            row_group_indices: smallvec![
837                RowGroupIndex {
838                    index: 1,
839                    row_group_index: 1
840                },
841                RowGroupIndex {
842                    index: 2,
843                    row_group_index: 1
844                }
845            ],
846            num_rows: 5,
847        };
848
849        assert!(!range.can_split_preserve_order());
850        let mut output = Vec::new();
851        range.maybe_split(&mut output);
852        assert_eq!(1, output.len());
853    }
854
855    #[test]
856    fn test_maybe_split_ranges() {
857        let ranges = vec![
858            RangeMeta {
859                time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
860                indices: smallvec![SourceIndex {
861                    index: 0,
862                    num_row_groups: 1,
863                }],
864                row_group_indices: smallvec![RowGroupIndex {
865                    index: 0,
866                    row_group_index: 0,
867                },],
868                num_rows: 4,
869            },
870            RangeMeta {
871                time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
872                indices: smallvec![SourceIndex {
873                    index: 1,
874                    num_row_groups: 2,
875                }],
876                row_group_indices: smallvec![RowGroupIndex {
877                    index: 1,
878                    row_group_index: ALL_ROW_GROUPS,
879                },],
880                num_rows: 4,
881            },
882            RangeMeta {
883                time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
884                indices: smallvec![
885                    SourceIndex {
886                        index: 2,
887                        num_row_groups: 2,
888                    },
889                    SourceIndex {
890                        index: 3,
891                        num_row_groups: 0,
892                    }
893                ],
894                row_group_indices: smallvec![
895                    RowGroupIndex {
896                        index: 2,
897                        row_group_index: ALL_ROW_GROUPS,
898                    },
899                    RowGroupIndex {
900                        index: 3,
901                        row_group_index: ALL_ROW_GROUPS,
902                    }
903                ],
904                num_rows: 5,
905            },
906        ];
907        let output = maybe_split_ranges_for_seq_scan(ranges);
908        assert_eq!(
909            output,
910            vec![
911                RangeMeta {
912                    time_range: (Timestamp::new_second(0), Timestamp::new_second(500)),
913                    indices: smallvec![SourceIndex {
914                        index: 0,
915                        num_row_groups: 1,
916                    }],
917                    row_group_indices: smallvec![RowGroupIndex {
918                        index: 0,
919                        row_group_index: 0
920                    },],
921                    num_rows: 4,
922                },
923                RangeMeta {
924                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
925                    indices: smallvec![SourceIndex {
926                        index: 1,
927                        num_row_groups: 2,
928                    }],
929                    row_group_indices: smallvec![RowGroupIndex {
930                        index: 1,
931                        row_group_index: 0
932                    },],
933                    num_rows: 2,
934                },
935                RangeMeta {
936                    time_range: (Timestamp::new_second(1000), Timestamp::new_second(2000)),
937                    indices: smallvec![SourceIndex {
938                        index: 1,
939                        num_row_groups: 2,
940                    }],
941                    row_group_indices: smallvec![RowGroupIndex {
942                        index: 1,
943                        row_group_index: 1
944                    }],
945                    num_rows: 2,
946                },
947                RangeMeta {
948                    time_range: (Timestamp::new_second(3000), Timestamp::new_second(4000)),
949                    indices: smallvec![
950                        SourceIndex {
951                            index: 2,
952                            num_row_groups: 2
953                        },
954                        SourceIndex {
955                            index: 3,
956                            num_row_groups: 0,
957                        }
958                    ],
959                    row_group_indices: smallvec![
960                        RowGroupIndex {
961                            index: 2,
962                            row_group_index: ALL_ROW_GROUPS,
963                        },
964                        RowGroupIndex {
965                            index: 3,
966                            row_group_index: ALL_ROW_GROUPS,
967                        }
968                    ],
969                    num_rows: 5,
970                },
971            ]
972        )
973    }
974}