mito2/sst/parquet/
row_selection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16
17use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
18
19/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
20///
21/// This function processes each range in the input and either creates a new selector or merges
22/// with the existing one, depending on whether the current range is contiguous with the preceding one
23/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
24/// optimizing the list of selectors by merging contiguous actions of the same type.
25///
26/// Note: overlapping ranges are not supported and will result in an incorrect selection.
27pub(crate) fn row_selection_from_row_ranges(
28    row_ranges: impl Iterator<Item = Range<usize>>,
29    total_row_count: usize,
30) -> RowSelection {
31    let mut selectors: Vec<RowSelector> = Vec::new();
32    let mut last_processed_end = 0;
33
34    for Range { start, end } in row_ranges {
35        let end = end.min(total_row_count);
36        if start > last_processed_end {
37            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
38        }
39
40        add_or_merge_selector(&mut selectors, end - start, false);
41        last_processed_end = end;
42    }
43
44    if last_processed_end < total_row_count {
45        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
46    }
47
48    RowSelection::from(selectors)
49}
50
51/// Converts an iterator of sorted row IDs into a `RowSelection`.
52///
53/// Note: the input iterator must be sorted in ascending order and
54///       contain unique row IDs in the range [0, total_row_count).
55pub(crate) fn row_selection_from_sorted_row_ids(
56    row_ids: impl Iterator<Item = usize>,
57    total_row_count: usize,
58) -> RowSelection {
59    let mut selectors: Vec<RowSelector> = Vec::new();
60    let mut last_processed_end = 0;
61
62    for row_id in row_ids {
63        let start = row_id;
64        let end = start + 1;
65
66        if start > last_processed_end {
67            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
68        }
69
70        add_or_merge_selector(&mut selectors, end - start, false);
71        last_processed_end = end;
72    }
73
74    if last_processed_end < total_row_count {
75        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
76    }
77
78    RowSelection::from(selectors)
79}
80
81/// Intersects two `RowSelection`s.
82pub(crate) fn intersect_row_selections(
83    a: Option<RowSelection>,
84    b: Option<RowSelection>,
85) -> Option<RowSelection> {
86    match (a, b) {
87        (Some(a), Some(b)) => Some(a.intersection(&b)),
88        (Some(a), None) => Some(a),
89        (None, Some(b)) => Some(b),
90        (None, None) => None,
91    }
92}
93
94/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
95/// if they are of the same type (both skip or both select).
96fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
97    if let Some(last) = selectors.last_mut() {
98        // Merge with last if both actions are same
99        if last.skip == is_skip {
100            last.row_count += count;
101            return;
102        }
103    }
104    // Add new selector otherwise
105    let new_selector = if is_skip {
106        RowSelector::skip(count)
107    } else {
108        RowSelector::select(count)
109    };
110    selectors.push(new_selector);
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_single_contiguous_range() {
119        let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
120        let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
121        assert_eq!(selection, expected);
122    }
123
124    #[test]
125    fn test_non_contiguous_ranges() {
126        let ranges = [1..3, 5..8];
127        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
128        let expected = RowSelection::from(vec![
129            RowSelector::skip(1),
130            RowSelector::select(2),
131            RowSelector::skip(2),
132            RowSelector::select(3),
133            RowSelector::skip(2),
134        ]);
135        assert_eq!(selection, expected);
136    }
137
138    #[test]
139    fn test_empty_range() {
140        let ranges = [];
141        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
142        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
143        assert_eq!(selection, expected);
144    }
145
146    #[test]
147    fn test_adjacent_ranges() {
148        let ranges = [1..2, 2..3];
149        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
150        let expected = RowSelection::from(vec![
151            RowSelector::skip(1),
152            RowSelector::select(2),
153            RowSelector::skip(7),
154        ]);
155        assert_eq!(selection, expected);
156    }
157
158    #[test]
159    fn test_large_gap_between_ranges() {
160        let ranges = [1..2, 100..101];
161        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
162        let expected = RowSelection::from(vec![
163            RowSelector::skip(1),
164            RowSelector::select(1),
165            RowSelector::skip(98),
166            RowSelector::select(1),
167            RowSelector::skip(10139),
168        ]);
169        assert_eq!(selection, expected);
170    }
171
172    #[test]
173    fn test_range_end_over_total_row_count() {
174        let ranges = Some(1..10);
175        let selection = row_selection_from_row_ranges(ranges.into_iter(), 5);
176        let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]);
177        assert_eq!(selection, expected);
178    }
179
180    #[test]
181    fn test_row_ids_to_selection() {
182        let row_ids = [1, 3, 5, 7, 9].into_iter();
183        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
184        let expected = RowSelection::from(vec![
185            RowSelector::skip(1),
186            RowSelector::select(1),
187            RowSelector::skip(1),
188            RowSelector::select(1),
189            RowSelector::skip(1),
190            RowSelector::select(1),
191            RowSelector::skip(1),
192            RowSelector::select(1),
193            RowSelector::skip(1),
194            RowSelector::select(1),
195        ]);
196        assert_eq!(selection, expected);
197    }
198
199    #[test]
200    fn test_row_ids_to_selection_full() {
201        let row_ids = 0..10;
202        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
203        let expected = RowSelection::from(vec![RowSelector::select(10)]);
204        assert_eq!(selection, expected);
205    }
206
207    #[test]
208    fn test_row_ids_to_selection_empty() {
209        let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10);
210        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
211        assert_eq!(selection, expected);
212    }
213}