mito2/sst/parquet/
row_selection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17
18use index::inverted_index::search::index_apply::ApplyOutput;
19use itertools::Itertools;
20use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
21
22/// A selection of row groups.
23#[derive(Debug, Clone, Default)]
24pub struct RowGroupSelection {
25    /// Row group id to row selection.
26    selection_in_rg: BTreeMap<usize, RowSelectionWithCount>,
27    /// Total number of rows in the selection.
28    row_count: usize,
29    /// Total length of the selectors.
30    selector_len: usize,
31}
32
33/// A row selection with its count.
34#[derive(Debug, Clone, Default)]
35struct RowSelectionWithCount {
36    /// Row selection.
37    selection: RowSelection,
38    /// Number of rows in the selection.
39    row_count: usize,
40    /// Length of the selectors.
41    selector_len: usize,
42}
43
44impl RowGroupSelection {
45    /// Creates a new `RowGroupSelection` with all row groups selected.
46    ///
47    /// # Arguments
48    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
49    /// * `total_row_count` - Total number of rows
50    pub fn new(row_group_size: usize, total_row_count: usize) -> Self {
51        let mut selection_in_rg = BTreeMap::new();
52
53        let row_group_count = total_row_count.div_ceil(row_group_size);
54        for rg_id in 0..row_group_count {
55            // The last row group may have fewer rows than `row_group_size`
56            let row_group_size = if rg_id == row_group_count - 1 {
57                total_row_count - (row_group_count - 1) * row_group_size
58            } else {
59                row_group_size
60            };
61
62            let selection = RowSelection::from(vec![RowSelector::select(row_group_size)]);
63            selection_in_rg.insert(
64                rg_id,
65                RowSelectionWithCount {
66                    selection,
67                    row_count: row_group_size,
68                    selector_len: 1,
69                },
70            );
71        }
72
73        Self {
74            selection_in_rg,
75            row_count: total_row_count,
76            selector_len: row_group_count,
77        }
78    }
79
80    /// Returns the row selection for a given row group.
81    ///
82    /// `None` indicates not selected.
83    pub fn get(&self, rg_id: usize) -> Option<&RowSelection> {
84        self.selection_in_rg.get(&rg_id).map(|x| &x.selection)
85    }
86
87    /// Creates a new `RowGroupSelection` from the output of inverted index application.
88    ///
89    /// # Arguments
90    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
91    /// * `apply_output` - The output from applying the inverted index
92    ///
93    /// # Assumptions
94    /// * All row groups (except possibly the last one) have the same number of rows
95    /// * The last row group may have fewer rows than `row_group_size`
96    pub fn from_inverted_index_apply_output(
97        row_group_size: usize,
98        num_row_groups: usize,
99        apply_output: ApplyOutput,
100    ) -> Self {
101        // Step 1: Convert segment IDs to row ranges within row groups
102        // For each segment ID, calculate its corresponding row range in the row group
103        let segment_row_count = apply_output.segment_row_count;
104        let row_group_ranges = apply_output.matched_segment_ids.iter_ones().map(|seg_id| {
105            // Calculate the global row ID where this segment starts
106            let begin_row_id = seg_id * segment_row_count;
107            // Determine which row group this segment belongs to
108            let row_group_id = begin_row_id / row_group_size;
109            // Calculate the offset within the row group
110            let rg_begin_row_id = begin_row_id % row_group_size;
111            // Ensure the end row ID doesn't exceed the row group size
112            let rg_end_row_id = (rg_begin_row_id + segment_row_count).min(row_group_size);
113
114            (row_group_id, rg_begin_row_id..rg_end_row_id)
115        });
116
117        // Step 2: Group ranges by row group ID and create row selections
118        let mut total_row_count = 0;
119        let mut total_selector_len = 0;
120        let mut selection_in_rg = row_group_ranges
121            .chunk_by(|(row_group_id, _)| *row_group_id)
122            .into_iter()
123            .map(|(row_group_id, group)| {
124                // Extract just the ranges from the group
125                let ranges = group.map(|(_, ranges)| ranges);
126                // Create row selection from the ranges
127                // Note: We use `row_group_size` here, which is safe because:
128                // 1. For non-last row groups, it's the actual size
129                // 2. For the last row group, any ranges beyond the actual size will be clipped
130                //    by the min() operation above
131                let selection = row_selection_from_row_ranges(ranges, row_group_size);
132                let row_count = selection.row_count();
133                let selector_len = selector_len(&selection);
134                total_row_count += row_count;
135                total_selector_len += selector_len;
136                (
137                    row_group_id,
138                    RowSelectionWithCount {
139                        selection,
140                        row_count,
141                        selector_len,
142                    },
143                )
144            })
145            .collect::<BTreeMap<_, _>>();
146
147        Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
148
149        Self {
150            selection_in_rg,
151            row_count: total_row_count,
152            selector_len: total_selector_len,
153        }
154    }
155
156    /// Creates a new `RowGroupSelection` from a set of row IDs.
157    ///
158    /// # Arguments
159    /// * `row_ids` - Set of row IDs to select
160    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
161    /// * `num_row_groups` - Total number of row groups
162    ///
163    /// # Assumptions
164    /// * All row groups (except possibly the last one) have the same number of rows
165    /// * The last row group may have fewer rows than `row_group_size`
166    /// * All row IDs must within the range of [0, num_row_groups * row_group_size)
167    pub fn from_row_ids(
168        row_ids: BTreeSet<u32>,
169        row_group_size: usize,
170        num_row_groups: usize,
171    ) -> Self {
172        // Step 1: Group row IDs by their row group
173        let row_group_to_row_ids =
174            Self::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
175
176        // Step 2: Create row selections for each row group
177        let mut total_row_count = 0;
178        let mut total_selector_len = 0;
179        let mut selection_in_rg = row_group_to_row_ids
180            .into_iter()
181            .map(|(row_group_id, row_ids)| {
182                let selection =
183                    row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size);
184                let row_count = selection.row_count();
185                let selector_len = selector_len(&selection);
186                total_row_count += row_count;
187                total_selector_len += selector_len;
188                (
189                    row_group_id,
190                    RowSelectionWithCount {
191                        selection,
192                        row_count,
193                        selector_len,
194                    },
195                )
196            })
197            .collect::<BTreeMap<_, _>>();
198
199        Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
200
201        Self {
202            selection_in_rg,
203            row_count: total_row_count,
204            selector_len: total_selector_len,
205        }
206    }
207
208    /// Creates a new `RowGroupSelection` from a set of row ranges.
209    ///
210    /// # Arguments
211    /// * `row_ranges` - A vector of (row_group_id, row_ranges) pairs
212    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
213    ///
214    /// # Assumptions
215    /// * All row groups (except possibly the last one) have the same number of rows
216    /// * The last row group may have fewer rows than `row_group_size`
217    /// * All ranges in `row_ranges` must be within the bounds of their respective row groups
218    ///   (i.e., for row group i, all ranges must be within [0, row_group_size) or [0, remaining_rows) for the last row group)
219    /// * Ranges within the same row group must not overlap. Overlapping ranges will result in undefined behavior.
220    pub fn from_row_ranges(
221        row_ranges: Vec<(usize, Vec<Range<usize>>)>,
222        row_group_size: usize,
223    ) -> Self {
224        let mut total_row_count = 0;
225        let mut total_selector_len = 0;
226        let selection_in_rg = row_ranges
227            .into_iter()
228            .map(|(row_group_id, ranges)| {
229                let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size);
230                let row_count = selection.row_count();
231                let selector_len = selector_len(&selection);
232                total_row_count += row_count;
233                total_selector_len += selector_len;
234                (
235                    row_group_id,
236                    RowSelectionWithCount {
237                        selection,
238                        row_count,
239                        selector_len,
240                    },
241                )
242            })
243            .collect();
244
245        Self {
246            selection_in_rg,
247            row_count: total_row_count,
248            selector_len: total_selector_len,
249        }
250    }
251
252    /// Groups row IDs by their row group.
253    ///
254    /// # Arguments
255    /// * `row_ids` - Set of row IDs to group
256    /// * `row_group_size` - Size of each row group
257    /// * `num_row_groups` - Total number of row groups
258    ///
259    /// # Returns
260    /// A vector of (row_group_id, row_ids) pairs, where row_ids are the IDs within that row group.
261    fn group_row_ids_by_row_group(
262        row_ids: BTreeSet<u32>,
263        row_group_size: usize,
264        num_row_groups: usize,
265    ) -> Vec<(usize, Vec<usize>)> {
266        let est_rows_per_group = row_ids.len() / num_row_groups;
267        let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
268
269        for row_id in row_ids {
270            let row_group_id = row_id as usize / row_group_size;
271            let row_id_in_group = row_id as usize % row_group_size;
272
273            if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
274                && *rg_id == row_group_id
275            {
276                row_ids.push(row_id_in_group);
277            } else {
278                let mut row_ids = Vec::with_capacity(est_rows_per_group);
279                row_ids.push(row_id_in_group);
280                row_group_to_row_ids.push((row_group_id, row_ids));
281            }
282        }
283
284        row_group_to_row_ids
285    }
286
287    /// Intersects two `RowGroupSelection`s.
288    pub fn intersect(&self, other: &Self) -> Self {
289        let mut res = BTreeMap::new();
290        let mut total_row_count = 0;
291        let mut total_selector_len = 0;
292
293        for (rg_id, x) in other.selection_in_rg.iter() {
294            let Some(y) = self.selection_in_rg.get(rg_id) else {
295                continue;
296            };
297            let selection = x.selection.intersection(&y.selection);
298            let row_count = selection.row_count();
299            let selector_len = selector_len(&selection);
300            if row_count > 0 {
301                total_row_count += row_count;
302                total_selector_len += selector_len;
303                res.insert(
304                    *rg_id,
305                    RowSelectionWithCount {
306                        selection,
307                        row_count,
308                        selector_len,
309                    },
310                );
311            }
312        }
313
314        Self {
315            selection_in_rg: res,
316            row_count: total_row_count,
317            selector_len: total_selector_len,
318        }
319    }
320
321    /// Returns the number of row groups in the selection.
322    pub fn row_group_count(&self) -> usize {
323        self.selection_in_rg.len()
324    }
325
326    /// Returns the number of rows in the selection.
327    pub fn row_count(&self) -> usize {
328        self.row_count
329    }
330
331    /// Returns the first row group in the selection.
332    ///
333    /// Skip the row group if the row count is 0.
334    pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
335        while let Some((
336            row_group_id,
337            RowSelectionWithCount {
338                selection,
339                row_count,
340                selector_len,
341            },
342        )) = self.selection_in_rg.pop_first()
343        {
344            if row_count > 0 {
345                self.row_count -= row_count;
346                self.selector_len -= selector_len;
347                return Some((row_group_id, selection));
348            }
349        }
350
351        None
352    }
353
354    /// Removes a row group from the selection.
355    pub fn remove_row_group(&mut self, row_group_id: usize) {
356        let Some(RowSelectionWithCount {
357            row_count,
358            selector_len,
359            ..
360        }) = self.selection_in_rg.remove(&row_group_id)
361        else {
362            return;
363        };
364        self.row_count -= row_count;
365        self.selector_len -= selector_len;
366    }
367
368    /// Returns true if the selection is empty.
369    pub fn is_empty(&self) -> bool {
370        self.selection_in_rg.is_empty()
371    }
372
373    /// Returns true if the selection contains a row group with the given ID.
374    pub fn contains_row_group(&self, row_group_id: usize) -> bool {
375        self.selection_in_rg.contains_key(&row_group_id)
376    }
377
378    /// Returns true if the selection contains a row group with the given ID and the row selection is not empty.
379    pub fn contains_non_empty_row_group(&self, row_group_id: usize) -> bool {
380        self.selection_in_rg
381            .get(&row_group_id)
382            .map(|r| r.row_count > 0)
383            .unwrap_or(false)
384    }
385
386    /// Returns an iterator over the row groups in the selection.
387    pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
388        self.selection_in_rg
389            .iter()
390            .map(|(row_group_id, x)| (row_group_id, &x.selection))
391    }
392
393    /// Returns the memory usage of the selection.
394    pub fn mem_usage(&self) -> usize {
395        self.selector_len * size_of::<RowSelector>()
396            + self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
397    }
398
399    /// Concatenates `other` into `self`. `other` must not contain row groups that `self` contains.
400    ///
401    /// Panics if `self` contains row groups that `other` contains.
402    pub fn concat(&mut self, other: &Self) {
403        for (rg_id, other_rs) in other.selection_in_rg.iter() {
404            if self.selection_in_rg.contains_key(rg_id) {
405                panic!("row group {} is already in `self`", rg_id);
406            }
407
408            self.selection_in_rg.insert(*rg_id, other_rs.clone());
409            self.row_count += other_rs.row_count;
410            self.selector_len += other_rs.selector_len;
411        }
412    }
413
414    /// Fills the missing row groups with empty selections.
415    /// This is to indicate that the row groups are searched even if no rows are found.
416    fn fill_missing_row_groups(
417        selection_in_rg: &mut BTreeMap<usize, RowSelectionWithCount>,
418        num_row_groups: usize,
419    ) {
420        for rg_id in 0..num_row_groups {
421            selection_in_rg.entry(rg_id).or_default();
422        }
423    }
424}
425
426/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
427///
428/// This function processes each range in the input and either creates a new selector or merges
429/// with the existing one, depending on whether the current range is contiguous with the preceding one
430/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
431/// optimizing the list of selectors by merging contiguous actions of the same type.
432///
433/// Note: overlapping ranges are not supported and will result in an incorrect selection.
434pub(crate) fn row_selection_from_row_ranges(
435    row_ranges: impl Iterator<Item = Range<usize>>,
436    total_row_count: usize,
437) -> RowSelection {
438    let mut selectors: Vec<RowSelector> = Vec::new();
439    let mut last_processed_end = 0;
440
441    for Range { start, end } in row_ranges {
442        let end = end.min(total_row_count);
443        if start > last_processed_end {
444            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
445        }
446
447        add_or_merge_selector(&mut selectors, end - start, false);
448        last_processed_end = end;
449    }
450
451    if last_processed_end < total_row_count {
452        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
453    }
454
455    RowSelection::from(selectors)
456}
457
458/// Converts an iterator of sorted row IDs into a `RowSelection`.
459///
460/// Note: the input iterator must be sorted in ascending order and
461///       contain unique row IDs in the range [0, total_row_count).
462pub(crate) fn row_selection_from_sorted_row_ids(
463    row_ids: impl Iterator<Item = usize>,
464    total_row_count: usize,
465) -> RowSelection {
466    let mut selectors: Vec<RowSelector> = Vec::new();
467    let mut last_processed_end = 0;
468
469    for row_id in row_ids {
470        let start = row_id;
471        let end = start + 1;
472
473        if start > last_processed_end {
474            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
475        }
476
477        add_or_merge_selector(&mut selectors, end - start, false);
478        last_processed_end = end;
479    }
480
481    if last_processed_end < total_row_count {
482        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
483    }
484
485    RowSelection::from(selectors)
486}
487
488/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
489/// if they are of the same type (both skip or both select).
490fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
491    if let Some(last) = selectors.last_mut() {
492        // Merge with last if both actions are same
493        if last.skip == is_skip {
494            last.row_count += count;
495            return;
496        }
497    }
498    // Add new selector otherwise
499    let new_selector = if is_skip {
500        RowSelector::skip(count)
501    } else {
502        RowSelector::select(count)
503    };
504    selectors.push(new_selector);
505}
506
507/// Returns the length of the selectors in the selection.
508fn selector_len(selection: &RowSelection) -> usize {
509    selection.iter().size_hint().0
510}
511
512#[cfg(test)]
513#[allow(clippy::single_range_in_vec_init)]
514mod tests {
515    use super::*;
516
517    #[test]
518    fn test_selector_len() {
519        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
520        assert_eq!(selector_len(&selection), 2);
521
522        let selection = RowSelection::from(vec![
523            RowSelector::select(5),
524            RowSelector::skip(5),
525            RowSelector::select(5),
526        ]);
527        assert_eq!(selector_len(&selection), 3);
528
529        let selection = RowSelection::from(vec![]);
530        assert_eq!(selector_len(&selection), 0);
531    }
532
533    #[test]
534    fn test_single_contiguous_range() {
535        let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
536        let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
537        assert_eq!(selection, expected);
538    }
539
540    #[test]
541    fn test_non_contiguous_ranges() {
542        let ranges = [1..3, 5..8];
543        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
544        let expected = RowSelection::from(vec![
545            RowSelector::skip(1),
546            RowSelector::select(2),
547            RowSelector::skip(2),
548            RowSelector::select(3),
549            RowSelector::skip(2),
550        ]);
551        assert_eq!(selection, expected);
552    }
553
554    #[test]
555    fn test_empty_range() {
556        let ranges = [];
557        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
558        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
559        assert_eq!(selection, expected);
560    }
561
562    #[test]
563    fn test_adjacent_ranges() {
564        let ranges = [1..2, 2..3];
565        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
566        let expected = RowSelection::from(vec![
567            RowSelector::skip(1),
568            RowSelector::select(2),
569            RowSelector::skip(7),
570        ]);
571        assert_eq!(selection, expected);
572    }
573
574    #[test]
575    fn test_large_gap_between_ranges() {
576        let ranges = [1..2, 100..101];
577        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
578        let expected = RowSelection::from(vec![
579            RowSelector::skip(1),
580            RowSelector::select(1),
581            RowSelector::skip(98),
582            RowSelector::select(1),
583            RowSelector::skip(10139),
584        ]);
585        assert_eq!(selection, expected);
586    }
587
588    #[test]
589    fn test_range_end_over_total_row_count() {
590        let ranges = Some(1..10);
591        let selection = row_selection_from_row_ranges(ranges.into_iter(), 5);
592        let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]);
593        assert_eq!(selection, expected);
594    }
595
596    #[test]
597    fn test_row_ids_to_selection() {
598        let row_ids = [1, 3, 5, 7, 9].into_iter();
599        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
600        let expected = RowSelection::from(vec![
601            RowSelector::skip(1),
602            RowSelector::select(1),
603            RowSelector::skip(1),
604            RowSelector::select(1),
605            RowSelector::skip(1),
606            RowSelector::select(1),
607            RowSelector::skip(1),
608            RowSelector::select(1),
609            RowSelector::skip(1),
610            RowSelector::select(1),
611        ]);
612        assert_eq!(selection, expected);
613    }
614
615    #[test]
616    fn test_row_ids_to_selection_full() {
617        let row_ids = 0..10;
618        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
619        let expected = RowSelection::from(vec![RowSelector::select(10)]);
620        assert_eq!(selection, expected);
621    }
622
623    #[test]
624    fn test_row_ids_to_selection_empty() {
625        let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10);
626        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
627        assert_eq!(selection, expected);
628    }
629
630    #[test]
631    fn test_group_row_ids() {
632        let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
633        let row_group_size = 5;
634        let num_row_groups = 3;
635
636        let row_group_to_row_ids =
637            RowGroupSelection::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
638
639        assert_eq!(
640            row_group_to_row_ids,
641            vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
642        );
643    }
644
645    #[test]
646    fn test_row_group_selection_new() {
647        // Test with regular case
648        let selection = RowGroupSelection::new(100, 250);
649        assert_eq!(selection.row_count(), 250);
650        assert_eq!(selection.row_group_count(), 3);
651
652        // Check content of each row group
653        let row_selection = selection.get(0).unwrap();
654        assert_eq!(row_selection.row_count(), 100);
655
656        let row_selection = selection.get(1).unwrap();
657        assert_eq!(row_selection.row_count(), 100);
658
659        let row_selection = selection.get(2).unwrap();
660        assert_eq!(row_selection.row_count(), 50);
661
662        // Test with empty selection
663        let selection = RowGroupSelection::new(100, 0);
664        assert_eq!(selection.row_count(), 0);
665        assert_eq!(selection.row_group_count(), 0);
666        assert!(selection.get(0).is_none());
667
668        // Test with single row group
669        let selection = RowGroupSelection::new(100, 50);
670        assert_eq!(selection.row_count(), 50);
671        assert_eq!(selection.row_group_count(), 1);
672
673        let row_selection = selection.get(0).unwrap();
674        assert_eq!(row_selection.row_count(), 50);
675
676        // Test with row count that doesn't divide evenly
677        let selection = RowGroupSelection::new(100, 150);
678        assert_eq!(selection.row_count(), 150);
679        assert_eq!(selection.row_group_count(), 2);
680
681        let row_selection = selection.get(0).unwrap();
682        assert_eq!(row_selection.row_count(), 100);
683
684        let row_selection = selection.get(1).unwrap();
685        assert_eq!(row_selection.row_count(), 50);
686
687        // Test with row count that's just over a multiple of row_group_size
688        let selection = RowGroupSelection::new(100, 101);
689        assert_eq!(selection.row_count(), 101);
690        assert_eq!(selection.row_group_count(), 2);
691
692        let row_selection = selection.get(0).unwrap();
693        assert_eq!(row_selection.row_count(), 100);
694
695        let row_selection = selection.get(1).unwrap();
696        assert_eq!(row_selection.row_count(), 1);
697    }
698
699    #[test]
700    fn test_from_row_ids() {
701        let row_group_size = 100;
702        let num_row_groups = 3;
703
704        // Test with regular case
705        let row_ids: BTreeSet<u32> = vec![5, 15, 25, 35, 105, 115, 205, 215]
706            .into_iter()
707            .collect();
708        let selection = RowGroupSelection::from_row_ids(row_ids, row_group_size, num_row_groups);
709        assert_eq!(selection.row_count(), 8);
710        assert_eq!(selection.row_group_count(), 3);
711
712        // Check content of each row group
713        let row_selection = selection.get(0).unwrap();
714        assert_eq!(row_selection.row_count(), 4); // 5, 15, 25, 35
715
716        let row_selection = selection.get(1).unwrap();
717        assert_eq!(row_selection.row_count(), 2); // 105, 115
718
719        let row_selection = selection.get(2).unwrap();
720        assert_eq!(row_selection.row_count(), 2); // 205, 215
721
722        // Test with empty row IDs
723        let empty_row_ids: BTreeSet<u32> = BTreeSet::new();
724        let selection =
725            RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
726        assert_eq!(selection.row_count(), 0);
727        assert_eq!(selection.row_group_count(), 3);
728
729        // Test with consecutive row IDs
730        let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
731        let selection =
732            RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
733        assert_eq!(selection.row_count(), 5);
734        assert_eq!(selection.row_group_count(), 3);
735
736        let row_selection = selection.get(0).unwrap();
737        assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
738
739        // Test with row IDs at row group boundaries
740        let boundary_row_ids: BTreeSet<u32> = vec![0, 99, 100, 199, 200, 249].into_iter().collect();
741        let selection =
742            RowGroupSelection::from_row_ids(boundary_row_ids, row_group_size, num_row_groups);
743        assert_eq!(selection.row_count(), 6);
744        assert_eq!(selection.row_group_count(), 3);
745
746        let row_selection = selection.get(0).unwrap();
747        assert_eq!(row_selection.row_count(), 2); // 0, 99
748
749        let row_selection = selection.get(1).unwrap();
750        assert_eq!(row_selection.row_count(), 2); // 100, 199
751
752        let row_selection = selection.get(2).unwrap();
753        assert_eq!(row_selection.row_count(), 2); // 200, 249
754
755        // Test with single row group
756        let single_group_row_ids: BTreeSet<u32> = vec![5, 10, 15].into_iter().collect();
757        let selection = RowGroupSelection::from_row_ids(single_group_row_ids, row_group_size, 1);
758        assert_eq!(selection.row_count(), 3);
759        assert_eq!(selection.row_group_count(), 1);
760
761        let row_selection = selection.get(0).unwrap();
762        assert_eq!(row_selection.row_count(), 3); // 5, 10, 15
763    }
764
765    #[test]
766    fn test_from_row_ranges() {
767        let row_group_size = 100;
768
769        // Test with regular case
770        let ranges = vec![
771            (0, vec![5..15, 25..35]), // Within [0, 100)
772            (1, vec![5..15]),         // Within [0, 100)
773            (2, vec![0..5, 10..15]),  // Within [0, 50) for last row group
774        ];
775        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
776        assert_eq!(selection.row_count(), 40);
777        assert_eq!(selection.row_group_count(), 3);
778
779        // Check content of each row group
780        let row_selection = selection.get(0).unwrap();
781        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 25..35 (10)
782
783        let row_selection = selection.get(1).unwrap();
784        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
785
786        let row_selection = selection.get(2).unwrap();
787        assert_eq!(row_selection.row_count(), 10); // 0..5 (5) + 10..15 (5)
788
789        // Test with empty ranges
790        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
791        let selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
792        assert_eq!(selection.row_count(), 0);
793        assert_eq!(selection.row_group_count(), 0);
794        assert!(selection.get(0).is_none());
795
796        // Test with adjacent ranges within same row group
797        let adjacent_ranges = vec![
798            (0, vec![5..15, 15..25]), // Adjacent ranges within [0, 100)
799        ];
800        let selection = RowGroupSelection::from_row_ranges(adjacent_ranges, row_group_size);
801        assert_eq!(selection.row_count(), 20);
802        assert_eq!(selection.row_group_count(), 1);
803
804        let row_selection = selection.get(0).unwrap();
805        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 15..25 (10)
806
807        // Test with ranges at row group boundaries
808        let boundary_ranges = vec![
809            (0, vec![0..10, 90..100]), // Ranges at start and end of first row group
810            (1, vec![0..100]),         // Full range of second row group
811            (2, vec![0..50]),          // Full range of last row group
812        ];
813        let selection = RowGroupSelection::from_row_ranges(boundary_ranges, row_group_size);
814        assert_eq!(selection.row_count(), 170);
815        assert_eq!(selection.row_group_count(), 3);
816
817        let row_selection = selection.get(0).unwrap();
818        assert_eq!(row_selection.row_count(), 20); // 0..10 (10) + 90..100 (10)
819
820        let row_selection = selection.get(1).unwrap();
821        assert_eq!(row_selection.row_count(), 100); // 0..100 (100)
822
823        let row_selection = selection.get(2).unwrap();
824        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
825
826        // Test with single row group
827        let single_group_ranges = vec![
828            (0, vec![0..50]), // Half of first row group
829        ];
830        let selection = RowGroupSelection::from_row_ranges(single_group_ranges, row_group_size);
831        assert_eq!(selection.row_count(), 50);
832        assert_eq!(selection.row_group_count(), 1);
833
834        let row_selection = selection.get(0).unwrap();
835        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
836    }
837
838    #[test]
839    fn test_intersect() {
840        let row_group_size = 100;
841
842        // Test case 1: Regular intersection with partial overlap
843        let ranges1 = vec![
844            (0, vec![5..15, 25..35]), // Within [0, 100)
845            (1, vec![5..15]),         // Within [0, 100)
846        ];
847        let selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
848
849        let ranges2 = vec![
850            (0, vec![10..20]), // Within [0, 100)
851            (1, vec![10..20]), // Within [0, 100)
852            (2, vec![0..10]),  // Within [0, 50) for last row group
853        ];
854        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
855
856        let intersection = selection1.intersect(&selection2);
857        assert_eq!(intersection.row_count(), 10);
858        assert_eq!(intersection.row_group_count(), 2);
859
860        let row_selection = intersection.get(0).unwrap();
861        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
862
863        let row_selection = intersection.get(1).unwrap();
864        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
865
866        // Test case 2: Empty intersection with empty selection
867        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
868        let empty_selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
869        let intersection = selection1.intersect(&empty_selection);
870        assert_eq!(intersection.row_count(), 0);
871        assert_eq!(intersection.row_group_count(), 0);
872        assert!(intersection.get(0).is_none());
873
874        // Test case 3: No overlapping row groups
875        let non_overlapping_ranges = vec![
876            (3, vec![0..10]), // Within [0, 50) for last row group
877        ];
878        let non_overlapping_selection =
879            RowGroupSelection::from_row_ranges(non_overlapping_ranges, row_group_size);
880        let intersection = selection1.intersect(&non_overlapping_selection);
881        assert_eq!(intersection.row_count(), 0);
882        assert_eq!(intersection.row_group_count(), 0);
883        assert!(intersection.get(0).is_none());
884
885        // Test case 4: Full overlap within same row group
886        let full_overlap_ranges1 = vec![
887            (0, vec![0..50]), // Within [0, 100)
888        ];
889        let full_overlap_ranges2 = vec![
890            (0, vec![0..50]), // Within [0, 100)
891        ];
892        let selection1 = RowGroupSelection::from_row_ranges(full_overlap_ranges1, row_group_size);
893        let selection2 = RowGroupSelection::from_row_ranges(full_overlap_ranges2, row_group_size);
894        let intersection = selection1.intersect(&selection2);
895        assert_eq!(intersection.row_count(), 50);
896        assert_eq!(intersection.row_group_count(), 1);
897
898        let row_selection = intersection.get(0).unwrap();
899        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
900
901        // Test case 5: Partial overlap at row group boundaries
902        let boundary_ranges1 = vec![
903            (0, vec![0..10, 90..100]), // Within [0, 100)
904            (1, vec![0..100]),         // Within [0, 100)
905        ];
906        let boundary_ranges2 = vec![
907            (0, vec![5..15, 95..100]), // Within [0, 100)
908            (1, vec![50..100]),        // Within [0, 100)
909        ];
910        let selection1 = RowGroupSelection::from_row_ranges(boundary_ranges1, row_group_size);
911        let selection2 = RowGroupSelection::from_row_ranges(boundary_ranges2, row_group_size);
912        let intersection = selection1.intersect(&selection2);
913        assert_eq!(intersection.row_count(), 60);
914        assert_eq!(intersection.row_group_count(), 2);
915
916        let row_selection = intersection.get(0).unwrap();
917        assert_eq!(row_selection.row_count(), 10); // 5..10 (5) + 95..100 (5)
918
919        let row_selection = intersection.get(1).unwrap();
920        assert_eq!(row_selection.row_count(), 50); // 50..100 (50)
921
922        // Test case 6: Multiple ranges with complex overlap
923        let complex_ranges1 = vec![
924            (0, vec![5..15, 25..35, 45..55]), // Within [0, 100)
925            (1, vec![10..20, 30..40]),        // Within [0, 100)
926        ];
927        let complex_ranges2 = vec![
928            (0, vec![10..20, 30..40, 50..60]), // Within [0, 100)
929            (1, vec![15..25, 35..45]),         // Within [0, 100)
930        ];
931        let selection1 = RowGroupSelection::from_row_ranges(complex_ranges1, row_group_size);
932        let selection2 = RowGroupSelection::from_row_ranges(complex_ranges2, row_group_size);
933        let intersection = selection1.intersect(&selection2);
934        assert_eq!(intersection.row_count(), 25);
935        assert_eq!(intersection.row_group_count(), 2);
936
937        let row_selection = intersection.get(0).unwrap();
938        assert_eq!(row_selection.row_count(), 15); // 10..15 (5) + 30..35 (5) + 50..55 (5)
939
940        let row_selection = intersection.get(1).unwrap();
941        assert_eq!(row_selection.row_count(), 10); // 15..20 (5) + 35..40 (5)
942
943        // Test case 7: Intersection with last row group (smaller size)
944        let last_rg_ranges1 = vec![
945            (2, vec![0..25, 30..40]), // Within [0, 50) for last row group
946        ];
947        let last_rg_ranges2 = vec![
948            (2, vec![20..30, 35..45]), // Within [0, 50) for last row group
949        ];
950        let selection1 = RowGroupSelection::from_row_ranges(last_rg_ranges1, row_group_size);
951        let selection2 = RowGroupSelection::from_row_ranges(last_rg_ranges2, row_group_size);
952        let intersection = selection1.intersect(&selection2);
953        assert_eq!(intersection.row_count(), 10);
954        assert_eq!(intersection.row_group_count(), 1);
955
956        let row_selection = intersection.get(2).unwrap();
957        assert_eq!(row_selection.row_count(), 10); // 20..25 (5) + 35..40 (5)
958
959        // Test case 8: Intersection with empty ranges in one selection
960        let empty_ranges = vec![
961            (0, vec![]),      // Empty ranges
962            (1, vec![5..15]), // Within [0, 100)
963        ];
964        let selection1 = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
965        let ranges2 = vec![
966            (0, vec![5..15, 25..35]), // Within [0, 100)
967            (1, vec![5..15]),         // Within [0, 100)
968        ];
969        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
970        let intersection = selection1.intersect(&selection2);
971        assert_eq!(intersection.row_count(), 10);
972        assert_eq!(intersection.row_group_count(), 1);
973
974        let row_selection = intersection.get(1).unwrap();
975        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
976    }
977
978    #[test]
979    fn test_pop_first() {
980        let row_group_size = 100;
981        let ranges = vec![
982            (0, vec![5..15]), // Within [0, 100)
983            (1, vec![5..15]), // Within [0, 100)
984            (2, vec![0..5]),  // Within [0, 50) for last row group
985        ];
986        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
987
988        // Test popping first row group
989        let (rg_id, row_selection) = selection.pop_first().unwrap();
990        assert_eq!(rg_id, 0);
991        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
992        assert_eq!(selection.row_count(), 15);
993        assert_eq!(selection.row_group_count(), 2);
994
995        // Verify remaining row groups' content
996        let row_selection = selection.get(1).unwrap();
997        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
998
999        let row_selection = selection.get(2).unwrap();
1000        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1001
1002        // Test popping second row group
1003        let (rg_id, row_selection) = selection.pop_first().unwrap();
1004        assert_eq!(rg_id, 1);
1005        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1006        assert_eq!(selection.row_count(), 5);
1007        assert_eq!(selection.row_group_count(), 1);
1008
1009        // Verify remaining row group's content
1010        let row_selection = selection.get(2).unwrap();
1011        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1012
1013        // Test popping last row group
1014        let (rg_id, row_selection) = selection.pop_first().unwrap();
1015        assert_eq!(rg_id, 2);
1016        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1017        assert_eq!(selection.row_count(), 0);
1018        assert_eq!(selection.row_group_count(), 0);
1019        assert!(selection.is_empty());
1020
1021        // Test popping from empty selection
1022        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1023        assert!(empty_selection.pop_first().is_none());
1024        assert_eq!(empty_selection.row_count(), 0);
1025        assert_eq!(empty_selection.row_group_count(), 0);
1026        assert!(empty_selection.is_empty());
1027    }
1028
1029    #[test]
1030    fn test_remove_row_group() {
1031        let row_group_size = 100;
1032        let ranges = vec![
1033            (0, vec![5..15]), // Within [0, 100)
1034            (1, vec![5..15]), // Within [0, 100)
1035            (2, vec![0..5]),  // Within [0, 50) for last row group
1036        ];
1037        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1038
1039        // Test removing existing row group
1040        selection.remove_row_group(1);
1041        assert_eq!(selection.row_count(), 15);
1042        assert_eq!(selection.row_group_count(), 2);
1043        assert!(!selection.contains_row_group(1));
1044
1045        // Verify remaining row groups' content
1046        let row_selection = selection.get(0).unwrap();
1047        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1048
1049        let row_selection = selection.get(2).unwrap();
1050        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1051
1052        // Test removing non-existent row group
1053        selection.remove_row_group(5);
1054        assert_eq!(selection.row_count(), 15);
1055        assert_eq!(selection.row_group_count(), 2);
1056
1057        // Test removing all row groups
1058        selection.remove_row_group(0);
1059        assert_eq!(selection.row_count(), 5);
1060        assert_eq!(selection.row_group_count(), 1);
1061
1062        let row_selection = selection.get(2).unwrap();
1063        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1064
1065        selection.remove_row_group(2);
1066        assert_eq!(selection.row_count(), 0);
1067        assert_eq!(selection.row_group_count(), 0);
1068        assert!(selection.is_empty());
1069
1070        // Test removing from empty selection
1071        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1072        empty_selection.remove_row_group(0);
1073        assert_eq!(empty_selection.row_count(), 0);
1074        assert_eq!(empty_selection.row_group_count(), 0);
1075        assert!(empty_selection.is_empty());
1076    }
1077
1078    #[test]
1079    fn test_contains_row_group() {
1080        let row_group_size = 100;
1081        let ranges = vec![
1082            (0, vec![5..15]), // Within [0, 100)
1083            (1, vec![5..15]), // Within [0, 100)
1084        ];
1085        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1086
1087        assert!(selection.contains_row_group(0));
1088        assert!(selection.contains_row_group(1));
1089        assert!(!selection.contains_row_group(2));
1090
1091        // Test empty selection
1092        let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1093        assert!(!empty_selection.contains_row_group(0));
1094    }
1095
1096    #[test]
1097    fn test_concat() {
1098        let row_group_size = 100;
1099        let ranges1 = vec![
1100            (0, vec![5..15]), // Within [0, 100)
1101            (1, vec![5..15]), // Within [0, 100)
1102        ];
1103
1104        let ranges2 = vec![
1105            (2, vec![5..15]), // Within [0, 100)
1106            (3, vec![5..15]), // Within [0, 100)
1107        ];
1108
1109        let mut selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
1110        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
1111
1112        selection1.concat(&selection2);
1113        assert_eq!(selection1.row_count(), 40);
1114        assert_eq!(selection1.row_group_count(), 4);
1115    }
1116}