mito2/sst/parquet/
row_selection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17
18use index::inverted_index::search::index_apply::ApplyOutput;
19use itertools::Itertools;
20use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
21
22/// A selection of row groups.
23#[derive(Debug, Clone, Default)]
24pub struct RowGroupSelection {
25    /// Row group id to row selection.
26    selection_in_rg: BTreeMap<usize, RowSelectionWithCount>,
27    /// Total number of rows in the selection.
28    row_count: usize,
29    /// Total length of the selectors.
30    selector_len: usize,
31}
32
33/// A row selection with its count.
34#[derive(Debug, Clone)]
35struct RowSelectionWithCount {
36    /// Row selection.
37    selection: RowSelection,
38    /// Number of rows in the selection.
39    row_count: usize,
40    /// Length of the selectors.
41    selector_len: usize,
42}
43
44impl RowGroupSelection {
45    /// Creates a new `RowGroupSelection` with all row groups selected.
46    ///
47    /// # Arguments
48    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
49    /// * `total_row_count` - Total number of rows
50    pub fn new(row_group_size: usize, total_row_count: usize) -> Self {
51        let mut selection_in_rg = BTreeMap::new();
52
53        let row_group_count = total_row_count.div_ceil(row_group_size);
54        for rg_id in 0..row_group_count {
55            // The last row group may have fewer rows than `row_group_size`
56            let row_group_size = if rg_id == row_group_count - 1 {
57                total_row_count - (row_group_count - 1) * row_group_size
58            } else {
59                row_group_size
60            };
61
62            let selection = RowSelection::from(vec![RowSelector::select(row_group_size)]);
63            selection_in_rg.insert(
64                rg_id,
65                RowSelectionWithCount {
66                    selection,
67                    row_count: row_group_size,
68                    selector_len: 1,
69                },
70            );
71        }
72
73        Self {
74            selection_in_rg,
75            row_count: total_row_count,
76            selector_len: row_group_count,
77        }
78    }
79
80    /// Returns the row selection for a given row group.
81    ///
82    /// `None` indicates not selected.
83    pub fn get(&self, rg_id: usize) -> Option<&RowSelection> {
84        self.selection_in_rg.get(&rg_id).map(|x| &x.selection)
85    }
86
87    /// Creates a new `RowGroupSelection` from the output of inverted index application.
88    ///
89    /// # Arguments
90    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
91    /// * `apply_output` - The output from applying the inverted index
92    ///
93    /// # Assumptions
94    /// * All row groups (except possibly the last one) have the same number of rows
95    /// * The last row group may have fewer rows than `row_group_size`
96    pub fn from_inverted_index_apply_output(
97        row_group_size: usize,
98        apply_output: ApplyOutput,
99    ) -> Self {
100        // Step 1: Convert segment IDs to row ranges within row groups
101        // For each segment ID, calculate its corresponding row range in the row group
102        let segment_row_count = apply_output.segment_row_count;
103        let row_group_ranges = apply_output.matched_segment_ids.iter_ones().map(|seg_id| {
104            // Calculate the global row ID where this segment starts
105            let begin_row_id = seg_id * segment_row_count;
106            // Determine which row group this segment belongs to
107            let row_group_id = begin_row_id / row_group_size;
108            // Calculate the offset within the row group
109            let rg_begin_row_id = begin_row_id % row_group_size;
110            // Ensure the end row ID doesn't exceed the row group size
111            let rg_end_row_id = (rg_begin_row_id + segment_row_count).min(row_group_size);
112
113            (row_group_id, rg_begin_row_id..rg_end_row_id)
114        });
115
116        // Step 2: Group ranges by row group ID and create row selections
117        let mut total_row_count = 0;
118        let mut total_selector_len = 0;
119        let selection_in_rg = row_group_ranges
120            .chunk_by(|(row_group_id, _)| *row_group_id)
121            .into_iter()
122            .map(|(row_group_id, group)| {
123                // Extract just the ranges from the group
124                let ranges = group.map(|(_, ranges)| ranges);
125                // Create row selection from the ranges
126                // Note: We use `row_group_size` here, which is safe because:
127                // 1. For non-last row groups, it's the actual size
128                // 2. For the last row group, any ranges beyond the actual size will be clipped
129                //    by the min() operation above
130                let selection = row_selection_from_row_ranges(ranges, row_group_size);
131                let row_count = selection.row_count();
132                let selector_len = selector_len(&selection);
133                total_row_count += row_count;
134                total_selector_len += selector_len;
135                (
136                    row_group_id,
137                    RowSelectionWithCount {
138                        selection,
139                        row_count,
140                        selector_len,
141                    },
142                )
143            })
144            .collect();
145
146        Self {
147            selection_in_rg,
148            row_count: total_row_count,
149            selector_len: total_selector_len,
150        }
151    }
152
153    /// Creates a new `RowGroupSelection` from a set of row IDs.
154    ///
155    /// # Arguments
156    /// * `row_ids` - Set of row IDs to select
157    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
158    /// * `num_row_groups` - Total number of row groups
159    ///
160    /// # Assumptions
161    /// * All row groups (except possibly the last one) have the same number of rows
162    /// * The last row group may have fewer rows than `row_group_size`
163    /// * All row IDs must within the range of [0, num_row_groups * row_group_size)
164    pub fn from_row_ids(
165        row_ids: BTreeSet<u32>,
166        row_group_size: usize,
167        num_row_groups: usize,
168    ) -> Self {
169        // Step 1: Group row IDs by their row group
170        let row_group_to_row_ids =
171            Self::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
172
173        // Step 2: Create row selections for each row group
174        let mut total_row_count = 0;
175        let mut total_selector_len = 0;
176        let selection_in_rg = row_group_to_row_ids
177            .into_iter()
178            .map(|(row_group_id, row_ids)| {
179                let selection =
180                    row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size);
181                let row_count = selection.row_count();
182                let selector_len = selector_len(&selection);
183                total_row_count += row_count;
184                total_selector_len += selector_len;
185                (
186                    row_group_id,
187                    RowSelectionWithCount {
188                        selection,
189                        row_count,
190                        selector_len,
191                    },
192                )
193            })
194            .collect();
195
196        Self {
197            selection_in_rg,
198            row_count: total_row_count,
199            selector_len: total_selector_len,
200        }
201    }
202
203    /// Creates a new `RowGroupSelection` from a set of row ranges.
204    ///
205    /// # Arguments
206    /// * `row_ranges` - A vector of (row_group_id, row_ranges) pairs
207    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
208    ///
209    /// # Assumptions
210    /// * All row groups (except possibly the last one) have the same number of rows
211    /// * The last row group may have fewer rows than `row_group_size`
212    /// * All ranges in `row_ranges` must be within the bounds of their respective row groups
213    ///   (i.e., for row group i, all ranges must be within [0, row_group_size) or [0, remaining_rows) for the last row group)
214    /// * Ranges within the same row group must not overlap. Overlapping ranges will result in undefined behavior.
215    pub fn from_row_ranges(
216        row_ranges: Vec<(usize, Vec<Range<usize>>)>,
217        row_group_size: usize,
218    ) -> Self {
219        let mut total_row_count = 0;
220        let mut total_selector_len = 0;
221        let selection_in_rg = row_ranges
222            .into_iter()
223            .map(|(row_group_id, ranges)| {
224                let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size);
225                let row_count = selection.row_count();
226                let selector_len = selector_len(&selection);
227                total_row_count += row_count;
228                total_selector_len += selector_len;
229                (
230                    row_group_id,
231                    RowSelectionWithCount {
232                        selection,
233                        row_count,
234                        selector_len,
235                    },
236                )
237            })
238            .collect();
239
240        Self {
241            selection_in_rg,
242            row_count: total_row_count,
243            selector_len: total_selector_len,
244        }
245    }
246
247    /// Groups row IDs by their row group.
248    ///
249    /// # Arguments
250    /// * `row_ids` - Set of row IDs to group
251    /// * `row_group_size` - Size of each row group
252    /// * `num_row_groups` - Total number of row groups
253    ///
254    /// # Returns
255    /// A vector of (row_group_id, row_ids) pairs, where row_ids are the IDs within that row group.
256    fn group_row_ids_by_row_group(
257        row_ids: BTreeSet<u32>,
258        row_group_size: usize,
259        num_row_groups: usize,
260    ) -> Vec<(usize, Vec<usize>)> {
261        let est_rows_per_group = row_ids.len() / num_row_groups;
262        let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
263
264        for row_id in row_ids {
265            let row_group_id = row_id as usize / row_group_size;
266            let row_id_in_group = row_id as usize % row_group_size;
267
268            if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
269                && *rg_id == row_group_id
270            {
271                row_ids.push(row_id_in_group);
272            } else {
273                let mut row_ids = Vec::with_capacity(est_rows_per_group);
274                row_ids.push(row_id_in_group);
275                row_group_to_row_ids.push((row_group_id, row_ids));
276            }
277        }
278
279        row_group_to_row_ids
280    }
281
282    /// Intersects two `RowGroupSelection`s.
283    pub fn intersect(&self, other: &Self) -> Self {
284        let mut res = BTreeMap::new();
285        let mut total_row_count = 0;
286        let mut total_selector_len = 0;
287
288        for (rg_id, x) in other.selection_in_rg.iter() {
289            let Some(y) = self.selection_in_rg.get(rg_id) else {
290                continue;
291            };
292            let selection = x.selection.intersection(&y.selection);
293            let row_count = selection.row_count();
294            let selector_len = selector_len(&selection);
295            if row_count > 0 {
296                total_row_count += row_count;
297                total_selector_len += selector_len;
298                res.insert(
299                    *rg_id,
300                    RowSelectionWithCount {
301                        selection,
302                        row_count,
303                        selector_len,
304                    },
305                );
306            }
307        }
308
309        Self {
310            selection_in_rg: res,
311            row_count: total_row_count,
312            selector_len: total_selector_len,
313        }
314    }
315
316    /// Returns the number of row groups in the selection.
317    pub fn row_group_count(&self) -> usize {
318        self.selection_in_rg.len()
319    }
320
321    /// Returns the number of rows in the selection.
322    pub fn row_count(&self) -> usize {
323        self.row_count
324    }
325
326    /// Returns the first row group in the selection.
327    pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
328        let (
329            row_group_id,
330            RowSelectionWithCount {
331                selection,
332                row_count,
333                selector_len,
334            },
335        ) = self.selection_in_rg.pop_first()?;
336
337        self.row_count -= row_count;
338        self.selector_len -= selector_len;
339        Some((row_group_id, selection))
340    }
341
342    /// Removes a row group from the selection.
343    pub fn remove_row_group(&mut self, row_group_id: usize) {
344        let Some(RowSelectionWithCount {
345            row_count,
346            selector_len,
347            ..
348        }) = self.selection_in_rg.remove(&row_group_id)
349        else {
350            return;
351        };
352        self.row_count -= row_count;
353        self.selector_len -= selector_len;
354    }
355
356    /// Returns true if the selection is empty.
357    pub fn is_empty(&self) -> bool {
358        self.selection_in_rg.is_empty()
359    }
360
361    /// Returns true if the selection contains a row group with the given ID.
362    pub fn contains_row_group(&self, row_group_id: usize) -> bool {
363        self.selection_in_rg.contains_key(&row_group_id)
364    }
365
366    /// Returns an iterator over the row groups in the selection.
367    pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
368        self.selection_in_rg
369            .iter()
370            .map(|(row_group_id, x)| (row_group_id, &x.selection))
371    }
372
373    /// Returns the memory usage of the selection.
374    pub fn mem_usage(&self) -> usize {
375        self.selector_len * size_of::<RowSelector>()
376            + self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
377    }
378}
379
380/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
381///
382/// This function processes each range in the input and either creates a new selector or merges
383/// with the existing one, depending on whether the current range is contiguous with the preceding one
384/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
385/// optimizing the list of selectors by merging contiguous actions of the same type.
386///
387/// Note: overlapping ranges are not supported and will result in an incorrect selection.
388pub(crate) fn row_selection_from_row_ranges(
389    row_ranges: impl Iterator<Item = Range<usize>>,
390    total_row_count: usize,
391) -> RowSelection {
392    let mut selectors: Vec<RowSelector> = Vec::new();
393    let mut last_processed_end = 0;
394
395    for Range { start, end } in row_ranges {
396        let end = end.min(total_row_count);
397        if start > last_processed_end {
398            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
399        }
400
401        add_or_merge_selector(&mut selectors, end - start, false);
402        last_processed_end = end;
403    }
404
405    if last_processed_end < total_row_count {
406        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
407    }
408
409    RowSelection::from(selectors)
410}
411
412/// Converts an iterator of sorted row IDs into a `RowSelection`.
413///
414/// Note: the input iterator must be sorted in ascending order and
415///       contain unique row IDs in the range [0, total_row_count).
416pub(crate) fn row_selection_from_sorted_row_ids(
417    row_ids: impl Iterator<Item = usize>,
418    total_row_count: usize,
419) -> RowSelection {
420    let mut selectors: Vec<RowSelector> = Vec::new();
421    let mut last_processed_end = 0;
422
423    for row_id in row_ids {
424        let start = row_id;
425        let end = start + 1;
426
427        if start > last_processed_end {
428            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
429        }
430
431        add_or_merge_selector(&mut selectors, end - start, false);
432        last_processed_end = end;
433    }
434
435    if last_processed_end < total_row_count {
436        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
437    }
438
439    RowSelection::from(selectors)
440}
441
442/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
443/// if they are of the same type (both skip or both select).
444fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
445    if let Some(last) = selectors.last_mut() {
446        // Merge with last if both actions are same
447        if last.skip == is_skip {
448            last.row_count += count;
449            return;
450        }
451    }
452    // Add new selector otherwise
453    let new_selector = if is_skip {
454        RowSelector::skip(count)
455    } else {
456        RowSelector::select(count)
457    };
458    selectors.push(new_selector);
459}
460
461/// Returns the length of the selectors in the selection.
462fn selector_len(selection: &RowSelection) -> usize {
463    selection.iter().size_hint().0
464}
465
466#[cfg(test)]
467#[allow(clippy::single_range_in_vec_init)]
468mod tests {
469    use super::*;
470
471    #[test]
472    fn test_selector_len() {
473        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
474        assert_eq!(selector_len(&selection), 2);
475
476        let selection = RowSelection::from(vec![
477            RowSelector::select(5),
478            RowSelector::skip(5),
479            RowSelector::select(5),
480        ]);
481        assert_eq!(selector_len(&selection), 3);
482
483        let selection = RowSelection::from(vec![]);
484        assert_eq!(selector_len(&selection), 0);
485    }
486
487    #[test]
488    fn test_single_contiguous_range() {
489        let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
490        let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
491        assert_eq!(selection, expected);
492    }
493
494    #[test]
495    fn test_non_contiguous_ranges() {
496        let ranges = [1..3, 5..8];
497        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
498        let expected = RowSelection::from(vec![
499            RowSelector::skip(1),
500            RowSelector::select(2),
501            RowSelector::skip(2),
502            RowSelector::select(3),
503            RowSelector::skip(2),
504        ]);
505        assert_eq!(selection, expected);
506    }
507
508    #[test]
509    fn test_empty_range() {
510        let ranges = [];
511        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
512        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
513        assert_eq!(selection, expected);
514    }
515
516    #[test]
517    fn test_adjacent_ranges() {
518        let ranges = [1..2, 2..3];
519        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
520        let expected = RowSelection::from(vec![
521            RowSelector::skip(1),
522            RowSelector::select(2),
523            RowSelector::skip(7),
524        ]);
525        assert_eq!(selection, expected);
526    }
527
528    #[test]
529    fn test_large_gap_between_ranges() {
530        let ranges = [1..2, 100..101];
531        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
532        let expected = RowSelection::from(vec![
533            RowSelector::skip(1),
534            RowSelector::select(1),
535            RowSelector::skip(98),
536            RowSelector::select(1),
537            RowSelector::skip(10139),
538        ]);
539        assert_eq!(selection, expected);
540    }
541
542    #[test]
543    fn test_range_end_over_total_row_count() {
544        let ranges = Some(1..10);
545        let selection = row_selection_from_row_ranges(ranges.into_iter(), 5);
546        let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]);
547        assert_eq!(selection, expected);
548    }
549
550    #[test]
551    fn test_row_ids_to_selection() {
552        let row_ids = [1, 3, 5, 7, 9].into_iter();
553        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
554        let expected = RowSelection::from(vec![
555            RowSelector::skip(1),
556            RowSelector::select(1),
557            RowSelector::skip(1),
558            RowSelector::select(1),
559            RowSelector::skip(1),
560            RowSelector::select(1),
561            RowSelector::skip(1),
562            RowSelector::select(1),
563            RowSelector::skip(1),
564            RowSelector::select(1),
565        ]);
566        assert_eq!(selection, expected);
567    }
568
569    #[test]
570    fn test_row_ids_to_selection_full() {
571        let row_ids = 0..10;
572        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
573        let expected = RowSelection::from(vec![RowSelector::select(10)]);
574        assert_eq!(selection, expected);
575    }
576
577    #[test]
578    fn test_row_ids_to_selection_empty() {
579        let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10);
580        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
581        assert_eq!(selection, expected);
582    }
583
584    #[test]
585    fn test_group_row_ids() {
586        let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
587        let row_group_size = 5;
588        let num_row_groups = 3;
589
590        let row_group_to_row_ids =
591            RowGroupSelection::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
592
593        assert_eq!(
594            row_group_to_row_ids,
595            vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
596        );
597    }
598
599    #[test]
600    fn test_row_group_selection_new() {
601        // Test with regular case
602        let selection = RowGroupSelection::new(100, 250);
603        assert_eq!(selection.row_count(), 250);
604        assert_eq!(selection.row_group_count(), 3);
605
606        // Check content of each row group
607        let row_selection = selection.get(0).unwrap();
608        assert_eq!(row_selection.row_count(), 100);
609
610        let row_selection = selection.get(1).unwrap();
611        assert_eq!(row_selection.row_count(), 100);
612
613        let row_selection = selection.get(2).unwrap();
614        assert_eq!(row_selection.row_count(), 50);
615
616        // Test with empty selection
617        let selection = RowGroupSelection::new(100, 0);
618        assert_eq!(selection.row_count(), 0);
619        assert_eq!(selection.row_group_count(), 0);
620        assert!(selection.get(0).is_none());
621
622        // Test with single row group
623        let selection = RowGroupSelection::new(100, 50);
624        assert_eq!(selection.row_count(), 50);
625        assert_eq!(selection.row_group_count(), 1);
626
627        let row_selection = selection.get(0).unwrap();
628        assert_eq!(row_selection.row_count(), 50);
629
630        // Test with row count that doesn't divide evenly
631        let selection = RowGroupSelection::new(100, 150);
632        assert_eq!(selection.row_count(), 150);
633        assert_eq!(selection.row_group_count(), 2);
634
635        let row_selection = selection.get(0).unwrap();
636        assert_eq!(row_selection.row_count(), 100);
637
638        let row_selection = selection.get(1).unwrap();
639        assert_eq!(row_selection.row_count(), 50);
640
641        // Test with row count that's just over a multiple of row_group_size
642        let selection = RowGroupSelection::new(100, 101);
643        assert_eq!(selection.row_count(), 101);
644        assert_eq!(selection.row_group_count(), 2);
645
646        let row_selection = selection.get(0).unwrap();
647        assert_eq!(row_selection.row_count(), 100);
648
649        let row_selection = selection.get(1).unwrap();
650        assert_eq!(row_selection.row_count(), 1);
651    }
652
653    #[test]
654    fn test_from_row_ids() {
655        let row_group_size = 100;
656        let num_row_groups = 3;
657
658        // Test with regular case
659        let row_ids: BTreeSet<u32> = vec![5, 15, 25, 35, 105, 115, 205, 215]
660            .into_iter()
661            .collect();
662        let selection = RowGroupSelection::from_row_ids(row_ids, row_group_size, num_row_groups);
663        assert_eq!(selection.row_count(), 8);
664        assert_eq!(selection.row_group_count(), 3);
665
666        // Check content of each row group
667        let row_selection = selection.get(0).unwrap();
668        assert_eq!(row_selection.row_count(), 4); // 5, 15, 25, 35
669
670        let row_selection = selection.get(1).unwrap();
671        assert_eq!(row_selection.row_count(), 2); // 105, 115
672
673        let row_selection = selection.get(2).unwrap();
674        assert_eq!(row_selection.row_count(), 2); // 205, 215
675
676        // Test with empty row IDs
677        let empty_row_ids: BTreeSet<u32> = BTreeSet::new();
678        let selection =
679            RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
680        assert_eq!(selection.row_count(), 0);
681        assert_eq!(selection.row_group_count(), 0);
682        assert!(selection.get(0).is_none());
683
684        // Test with consecutive row IDs
685        let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
686        let selection =
687            RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
688        assert_eq!(selection.row_count(), 5);
689        assert_eq!(selection.row_group_count(), 1);
690
691        let row_selection = selection.get(0).unwrap();
692        assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
693
694        // Test with row IDs at row group boundaries
695        let boundary_row_ids: BTreeSet<u32> = vec![0, 99, 100, 199, 200, 249].into_iter().collect();
696        let selection =
697            RowGroupSelection::from_row_ids(boundary_row_ids, row_group_size, num_row_groups);
698        assert_eq!(selection.row_count(), 6);
699        assert_eq!(selection.row_group_count(), 3);
700
701        let row_selection = selection.get(0).unwrap();
702        assert_eq!(row_selection.row_count(), 2); // 0, 99
703
704        let row_selection = selection.get(1).unwrap();
705        assert_eq!(row_selection.row_count(), 2); // 100, 199
706
707        let row_selection = selection.get(2).unwrap();
708        assert_eq!(row_selection.row_count(), 2); // 200, 249
709
710        // Test with single row group
711        let single_group_row_ids: BTreeSet<u32> = vec![5, 10, 15].into_iter().collect();
712        let selection = RowGroupSelection::from_row_ids(single_group_row_ids, row_group_size, 1);
713        assert_eq!(selection.row_count(), 3);
714        assert_eq!(selection.row_group_count(), 1);
715
716        let row_selection = selection.get(0).unwrap();
717        assert_eq!(row_selection.row_count(), 3); // 5, 10, 15
718    }
719
720    #[test]
721    fn test_from_row_ranges() {
722        let row_group_size = 100;
723
724        // Test with regular case
725        let ranges = vec![
726            (0, vec![5..15, 25..35]), // Within [0, 100)
727            (1, vec![5..15]),         // Within [0, 100)
728            (2, vec![0..5, 10..15]),  // Within [0, 50) for last row group
729        ];
730        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
731        assert_eq!(selection.row_count(), 40);
732        assert_eq!(selection.row_group_count(), 3);
733
734        // Check content of each row group
735        let row_selection = selection.get(0).unwrap();
736        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 25..35 (10)
737
738        let row_selection = selection.get(1).unwrap();
739        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
740
741        let row_selection = selection.get(2).unwrap();
742        assert_eq!(row_selection.row_count(), 10); // 0..5 (5) + 10..15 (5)
743
744        // Test with empty ranges
745        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
746        let selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
747        assert_eq!(selection.row_count(), 0);
748        assert_eq!(selection.row_group_count(), 0);
749        assert!(selection.get(0).is_none());
750
751        // Test with adjacent ranges within same row group
752        let adjacent_ranges = vec![
753            (0, vec![5..15, 15..25]), // Adjacent ranges within [0, 100)
754        ];
755        let selection = RowGroupSelection::from_row_ranges(adjacent_ranges, row_group_size);
756        assert_eq!(selection.row_count(), 20);
757        assert_eq!(selection.row_group_count(), 1);
758
759        let row_selection = selection.get(0).unwrap();
760        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 15..25 (10)
761
762        // Test with ranges at row group boundaries
763        let boundary_ranges = vec![
764            (0, vec![0..10, 90..100]), // Ranges at start and end of first row group
765            (1, vec![0..100]),         // Full range of second row group
766            (2, vec![0..50]),          // Full range of last row group
767        ];
768        let selection = RowGroupSelection::from_row_ranges(boundary_ranges, row_group_size);
769        assert_eq!(selection.row_count(), 170);
770        assert_eq!(selection.row_group_count(), 3);
771
772        let row_selection = selection.get(0).unwrap();
773        assert_eq!(row_selection.row_count(), 20); // 0..10 (10) + 90..100 (10)
774
775        let row_selection = selection.get(1).unwrap();
776        assert_eq!(row_selection.row_count(), 100); // 0..100 (100)
777
778        let row_selection = selection.get(2).unwrap();
779        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
780
781        // Test with single row group
782        let single_group_ranges = vec![
783            (0, vec![0..50]), // Half of first row group
784        ];
785        let selection = RowGroupSelection::from_row_ranges(single_group_ranges, row_group_size);
786        assert_eq!(selection.row_count(), 50);
787        assert_eq!(selection.row_group_count(), 1);
788
789        let row_selection = selection.get(0).unwrap();
790        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
791    }
792
793    #[test]
794    fn test_intersect() {
795        let row_group_size = 100;
796
797        // Test case 1: Regular intersection with partial overlap
798        let ranges1 = vec![
799            (0, vec![5..15, 25..35]), // Within [0, 100)
800            (1, vec![5..15]),         // Within [0, 100)
801        ];
802        let selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
803
804        let ranges2 = vec![
805            (0, vec![10..20]), // Within [0, 100)
806            (1, vec![10..20]), // Within [0, 100)
807            (2, vec![0..10]),  // Within [0, 50) for last row group
808        ];
809        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
810
811        let intersection = selection1.intersect(&selection2);
812        assert_eq!(intersection.row_count(), 10);
813        assert_eq!(intersection.row_group_count(), 2);
814
815        let row_selection = intersection.get(0).unwrap();
816        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
817
818        let row_selection = intersection.get(1).unwrap();
819        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
820
821        // Test case 2: Empty intersection with empty selection
822        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
823        let empty_selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
824        let intersection = selection1.intersect(&empty_selection);
825        assert_eq!(intersection.row_count(), 0);
826        assert_eq!(intersection.row_group_count(), 0);
827        assert!(intersection.get(0).is_none());
828
829        // Test case 3: No overlapping row groups
830        let non_overlapping_ranges = vec![
831            (3, vec![0..10]), // Within [0, 50) for last row group
832        ];
833        let non_overlapping_selection =
834            RowGroupSelection::from_row_ranges(non_overlapping_ranges, row_group_size);
835        let intersection = selection1.intersect(&non_overlapping_selection);
836        assert_eq!(intersection.row_count(), 0);
837        assert_eq!(intersection.row_group_count(), 0);
838        assert!(intersection.get(0).is_none());
839
840        // Test case 4: Full overlap within same row group
841        let full_overlap_ranges1 = vec![
842            (0, vec![0..50]), // Within [0, 100)
843        ];
844        let full_overlap_ranges2 = vec![
845            (0, vec![0..50]), // Within [0, 100)
846        ];
847        let selection1 = RowGroupSelection::from_row_ranges(full_overlap_ranges1, row_group_size);
848        let selection2 = RowGroupSelection::from_row_ranges(full_overlap_ranges2, row_group_size);
849        let intersection = selection1.intersect(&selection2);
850        assert_eq!(intersection.row_count(), 50);
851        assert_eq!(intersection.row_group_count(), 1);
852
853        let row_selection = intersection.get(0).unwrap();
854        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
855
856        // Test case 5: Partial overlap at row group boundaries
857        let boundary_ranges1 = vec![
858            (0, vec![0..10, 90..100]), // Within [0, 100)
859            (1, vec![0..100]),         // Within [0, 100)
860        ];
861        let boundary_ranges2 = vec![
862            (0, vec![5..15, 95..100]), // Within [0, 100)
863            (1, vec![50..100]),        // Within [0, 100)
864        ];
865        let selection1 = RowGroupSelection::from_row_ranges(boundary_ranges1, row_group_size);
866        let selection2 = RowGroupSelection::from_row_ranges(boundary_ranges2, row_group_size);
867        let intersection = selection1.intersect(&selection2);
868        assert_eq!(intersection.row_count(), 60);
869        assert_eq!(intersection.row_group_count(), 2);
870
871        let row_selection = intersection.get(0).unwrap();
872        assert_eq!(row_selection.row_count(), 10); // 5..10 (5) + 95..100 (5)
873
874        let row_selection = intersection.get(1).unwrap();
875        assert_eq!(row_selection.row_count(), 50); // 50..100 (50)
876
877        // Test case 6: Multiple ranges with complex overlap
878        let complex_ranges1 = vec![
879            (0, vec![5..15, 25..35, 45..55]), // Within [0, 100)
880            (1, vec![10..20, 30..40]),        // Within [0, 100)
881        ];
882        let complex_ranges2 = vec![
883            (0, vec![10..20, 30..40, 50..60]), // Within [0, 100)
884            (1, vec![15..25, 35..45]),         // Within [0, 100)
885        ];
886        let selection1 = RowGroupSelection::from_row_ranges(complex_ranges1, row_group_size);
887        let selection2 = RowGroupSelection::from_row_ranges(complex_ranges2, row_group_size);
888        let intersection = selection1.intersect(&selection2);
889        assert_eq!(intersection.row_count(), 25);
890        assert_eq!(intersection.row_group_count(), 2);
891
892        let row_selection = intersection.get(0).unwrap();
893        assert_eq!(row_selection.row_count(), 15); // 10..15 (5) + 30..35 (5) + 50..55 (5)
894
895        let row_selection = intersection.get(1).unwrap();
896        assert_eq!(row_selection.row_count(), 10); // 15..20 (5) + 35..40 (5)
897
898        // Test case 7: Intersection with last row group (smaller size)
899        let last_rg_ranges1 = vec![
900            (2, vec![0..25, 30..40]), // Within [0, 50) for last row group
901        ];
902        let last_rg_ranges2 = vec![
903            (2, vec![20..30, 35..45]), // Within [0, 50) for last row group
904        ];
905        let selection1 = RowGroupSelection::from_row_ranges(last_rg_ranges1, row_group_size);
906        let selection2 = RowGroupSelection::from_row_ranges(last_rg_ranges2, row_group_size);
907        let intersection = selection1.intersect(&selection2);
908        assert_eq!(intersection.row_count(), 10);
909        assert_eq!(intersection.row_group_count(), 1);
910
911        let row_selection = intersection.get(2).unwrap();
912        assert_eq!(row_selection.row_count(), 10); // 20..25 (5) + 35..40 (5)
913
914        // Test case 8: Intersection with empty ranges in one selection
915        let empty_ranges = vec![
916            (0, vec![]),      // Empty ranges
917            (1, vec![5..15]), // Within [0, 100)
918        ];
919        let selection1 = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
920        let ranges2 = vec![
921            (0, vec![5..15, 25..35]), // Within [0, 100)
922            (1, vec![5..15]),         // Within [0, 100)
923        ];
924        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
925        let intersection = selection1.intersect(&selection2);
926        assert_eq!(intersection.row_count(), 10);
927        assert_eq!(intersection.row_group_count(), 1);
928
929        let row_selection = intersection.get(1).unwrap();
930        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
931    }
932
933    #[test]
934    fn test_pop_first() {
935        let row_group_size = 100;
936        let ranges = vec![
937            (0, vec![5..15]), // Within [0, 100)
938            (1, vec![5..15]), // Within [0, 100)
939            (2, vec![0..5]),  // Within [0, 50) for last row group
940        ];
941        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
942
943        // Test popping first row group
944        let (rg_id, row_selection) = selection.pop_first().unwrap();
945        assert_eq!(rg_id, 0);
946        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
947        assert_eq!(selection.row_count(), 15);
948        assert_eq!(selection.row_group_count(), 2);
949
950        // Verify remaining row groups' content
951        let row_selection = selection.get(1).unwrap();
952        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
953
954        let row_selection = selection.get(2).unwrap();
955        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
956
957        // Test popping second row group
958        let (rg_id, row_selection) = selection.pop_first().unwrap();
959        assert_eq!(rg_id, 1);
960        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
961        assert_eq!(selection.row_count(), 5);
962        assert_eq!(selection.row_group_count(), 1);
963
964        // Verify remaining row group's content
965        let row_selection = selection.get(2).unwrap();
966        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
967
968        // Test popping last row group
969        let (rg_id, row_selection) = selection.pop_first().unwrap();
970        assert_eq!(rg_id, 2);
971        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
972        assert_eq!(selection.row_count(), 0);
973        assert_eq!(selection.row_group_count(), 0);
974        assert!(selection.is_empty());
975
976        // Test popping from empty selection
977        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
978        assert!(empty_selection.pop_first().is_none());
979        assert_eq!(empty_selection.row_count(), 0);
980        assert_eq!(empty_selection.row_group_count(), 0);
981        assert!(empty_selection.is_empty());
982    }
983
984    #[test]
985    fn test_remove_row_group() {
986        let row_group_size = 100;
987        let ranges = vec![
988            (0, vec![5..15]), // Within [0, 100)
989            (1, vec![5..15]), // Within [0, 100)
990            (2, vec![0..5]),  // Within [0, 50) for last row group
991        ];
992        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
993
994        // Test removing existing row group
995        selection.remove_row_group(1);
996        assert_eq!(selection.row_count(), 15);
997        assert_eq!(selection.row_group_count(), 2);
998        assert!(!selection.contains_row_group(1));
999
1000        // Verify remaining row groups' content
1001        let row_selection = selection.get(0).unwrap();
1002        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1003
1004        let row_selection = selection.get(2).unwrap();
1005        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1006
1007        // Test removing non-existent row group
1008        selection.remove_row_group(5);
1009        assert_eq!(selection.row_count(), 15);
1010        assert_eq!(selection.row_group_count(), 2);
1011
1012        // Test removing all row groups
1013        selection.remove_row_group(0);
1014        assert_eq!(selection.row_count(), 5);
1015        assert_eq!(selection.row_group_count(), 1);
1016
1017        let row_selection = selection.get(2).unwrap();
1018        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1019
1020        selection.remove_row_group(2);
1021        assert_eq!(selection.row_count(), 0);
1022        assert_eq!(selection.row_group_count(), 0);
1023        assert!(selection.is_empty());
1024
1025        // Test removing from empty selection
1026        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1027        empty_selection.remove_row_group(0);
1028        assert_eq!(empty_selection.row_count(), 0);
1029        assert_eq!(empty_selection.row_group_count(), 0);
1030        assert!(empty_selection.is_empty());
1031    }
1032
1033    #[test]
1034    fn test_contains_row_group() {
1035        let row_group_size = 100;
1036        let ranges = vec![
1037            (0, vec![5..15]), // Within [0, 100)
1038            (1, vec![5..15]), // Within [0, 100)
1039        ];
1040        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1041
1042        assert!(selection.contains_row_group(0));
1043        assert!(selection.contains_row_group(1));
1044        assert!(!selection.contains_row_group(2));
1045
1046        // Test empty selection
1047        let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1048        assert!(!empty_selection.contains_row_group(0));
1049    }
1050}