mito2/sst/parquet/
row_selection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::ops::Range;
17
18use index::inverted_index::search::index_apply::ApplyOutput;
19use itertools::Itertools;
20use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
21
22/// A selection of row groups.
23#[derive(Debug, Clone, Default)]
24pub struct RowGroupSelection {
25    /// Row group id to row selection.
26    selection_in_rg: BTreeMap<usize, RowSelectionWithCount>,
27    /// Total number of rows in the selection.
28    row_count: usize,
29    /// Total length of the selectors.
30    selector_len: usize,
31}
32
33/// A row selection with its count.
34#[derive(Debug, Clone, Default)]
35struct RowSelectionWithCount {
36    /// Row selection.
37    selection: RowSelection,
38    /// Number of rows in the selection.
39    row_count: usize,
40    /// Length of the selectors.
41    selector_len: usize,
42}
43
44impl RowGroupSelection {
45    /// Creates a new `RowGroupSelection` with all row groups selected.
46    ///
47    /// # Arguments
48    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
49    /// * `total_row_count` - Total number of rows
50    pub fn new(row_group_size: usize, total_row_count: usize) -> Self {
51        let mut selection_in_rg = BTreeMap::new();
52
53        let row_group_count = total_row_count.div_ceil(row_group_size);
54        for rg_id in 0..row_group_count {
55            // The last row group may have fewer rows than `row_group_size`
56            let row_group_size = if rg_id == row_group_count - 1 {
57                total_row_count - (row_group_count - 1) * row_group_size
58            } else {
59                row_group_size
60            };
61
62            let selection = RowSelection::from(vec![RowSelector::select(row_group_size)]);
63            selection_in_rg.insert(
64                rg_id,
65                RowSelectionWithCount {
66                    selection,
67                    row_count: row_group_size,
68                    selector_len: 1,
69                },
70            );
71        }
72
73        Self {
74            selection_in_rg,
75            row_count: total_row_count,
76            selector_len: row_group_count,
77        }
78    }
79
80    /// Returns the row selection for a given row group.
81    ///
82    /// `None` indicates not selected.
83    pub fn get(&self, rg_id: usize) -> Option<&RowSelection> {
84        self.selection_in_rg.get(&rg_id).map(|x| &x.selection)
85    }
86
87    /// Creates a new `RowGroupSelection` from the output of inverted index application.
88    ///
89    /// # Arguments
90    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
91    /// * `apply_output` - The output from applying the inverted index
92    ///
93    /// # Assumptions
94    /// * All row groups (except possibly the last one) have the same number of rows
95    /// * The last row group may have fewer rows than `row_group_size`
96    pub fn from_inverted_index_apply_output(
97        row_group_size: usize,
98        num_row_groups: usize,
99        apply_output: ApplyOutput,
100    ) -> Self {
101        // Step 1: Convert segment IDs to row ranges within row groups
102        // For each segment ID, calculate its corresponding row range in the row group
103        let segment_row_count = apply_output.segment_row_count;
104        let row_group_ranges = apply_output.matched_segment_ids.iter_ones().map(|seg_id| {
105            // Calculate the global row ID where this segment starts
106            let begin_row_id = seg_id * segment_row_count;
107            // Determine which row group this segment belongs to
108            let row_group_id = begin_row_id / row_group_size;
109            // Calculate the offset within the row group
110            let rg_begin_row_id = begin_row_id % row_group_size;
111            // Ensure the end row ID doesn't exceed the row group size
112            let rg_end_row_id = (rg_begin_row_id + segment_row_count).min(row_group_size);
113
114            (row_group_id, rg_begin_row_id..rg_end_row_id)
115        });
116
117        // Step 2: Group ranges by row group ID and create row selections
118        let mut total_row_count = 0;
119        let mut total_selector_len = 0;
120        let mut selection_in_rg = row_group_ranges
121            .chunk_by(|(row_group_id, _)| *row_group_id)
122            .into_iter()
123            .map(|(row_group_id, group)| {
124                // Extract just the ranges from the group
125                let ranges = group.map(|(_, ranges)| ranges);
126                // Create row selection from the ranges
127                // Note: We use `row_group_size` here, which is safe because:
128                // 1. For non-last row groups, it's the actual size
129                // 2. For the last row group, any ranges beyond the actual size will be clipped
130                //    by the min() operation above
131                let selection = row_selection_from_row_ranges(ranges, row_group_size);
132                let row_count = selection.row_count();
133                let selector_len = selector_len(&selection);
134                total_row_count += row_count;
135                total_selector_len += selector_len;
136                (
137                    row_group_id,
138                    RowSelectionWithCount {
139                        selection,
140                        row_count,
141                        selector_len,
142                    },
143                )
144            })
145            .collect::<BTreeMap<_, _>>();
146
147        Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
148
149        Self {
150            selection_in_rg,
151            row_count: total_row_count,
152            selector_len: total_selector_len,
153        }
154    }
155
156    /// Creates a new `RowGroupSelection` from a set of row IDs.
157    ///
158    /// # Arguments
159    /// * `row_ids` - Set of row IDs to select
160    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
161    /// * `num_row_groups` - Total number of row groups
162    ///
163    /// # Assumptions
164    /// * All row groups (except possibly the last one) have the same number of rows
165    /// * The last row group may have fewer rows than `row_group_size`
166    /// * All row IDs must within the range of [0, num_row_groups * row_group_size)
167    pub fn from_row_ids(
168        row_ids: BTreeSet<u32>,
169        row_group_size: usize,
170        num_row_groups: usize,
171    ) -> Self {
172        // Step 1: Group row IDs by their row group
173        let row_group_to_row_ids =
174            Self::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
175
176        // Step 2: Create row selections for each row group
177        let mut total_row_count = 0;
178        let mut total_selector_len = 0;
179        let mut selection_in_rg = row_group_to_row_ids
180            .into_iter()
181            .map(|(row_group_id, row_ids)| {
182                let selection =
183                    row_selection_from_sorted_row_ids(row_ids.into_iter(), row_group_size);
184                let row_count = selection.row_count();
185                let selector_len = selector_len(&selection);
186                total_row_count += row_count;
187                total_selector_len += selector_len;
188                (
189                    row_group_id,
190                    RowSelectionWithCount {
191                        selection,
192                        row_count,
193                        selector_len,
194                    },
195                )
196            })
197            .collect::<BTreeMap<_, _>>();
198
199        Self::fill_missing_row_groups(&mut selection_in_rg, num_row_groups);
200
201        Self {
202            selection_in_rg,
203            row_count: total_row_count,
204            selector_len: total_selector_len,
205        }
206    }
207
208    /// Creates a new `RowGroupSelection` from a set of row ranges.
209    ///
210    /// # Arguments
211    /// * `row_ranges` - A vector of (row_group_id, row_ranges) pairs
212    /// * `row_group_size` - The number of rows in each row group (except possibly the last one)
213    ///
214    /// # Assumptions
215    /// * All row groups (except possibly the last one) have the same number of rows
216    /// * The last row group may have fewer rows than `row_group_size`
217    /// * All ranges in `row_ranges` must be within the bounds of their respective row groups
218    ///   (i.e., for row group i, all ranges must be within [0, row_group_size) or [0, remaining_rows) for the last row group)
219    /// * Ranges within the same row group must not overlap. Overlapping ranges will result in undefined behavior.
220    pub fn from_row_ranges(
221        row_ranges: Vec<(usize, Vec<Range<usize>>)>,
222        row_group_size: usize,
223    ) -> Self {
224        let mut total_row_count = 0;
225        let mut total_selector_len = 0;
226        let selection_in_rg = row_ranges
227            .into_iter()
228            .map(|(row_group_id, ranges)| {
229                let selection = row_selection_from_row_ranges(ranges.into_iter(), row_group_size);
230                let row_count = selection.row_count();
231                let selector_len = selector_len(&selection);
232                total_row_count += row_count;
233                total_selector_len += selector_len;
234                (
235                    row_group_id,
236                    RowSelectionWithCount {
237                        selection,
238                        row_count,
239                        selector_len,
240                    },
241                )
242            })
243            .collect();
244
245        Self {
246            selection_in_rg,
247            row_count: total_row_count,
248            selector_len: total_selector_len,
249        }
250    }
251
252    /// Groups row IDs by their row group.
253    ///
254    /// # Arguments
255    /// * `row_ids` - Set of row IDs to group
256    /// * `row_group_size` - Size of each row group
257    /// * `num_row_groups` - Total number of row groups
258    ///
259    /// # Returns
260    /// A vector of (row_group_id, row_ids) pairs, where row_ids are the IDs within that row group.
261    fn group_row_ids_by_row_group(
262        row_ids: BTreeSet<u32>,
263        row_group_size: usize,
264        num_row_groups: usize,
265    ) -> Vec<(usize, Vec<usize>)> {
266        let est_rows_per_group = row_ids.len() / num_row_groups;
267        let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
268
269        for row_id in row_ids {
270            let row_group_id = row_id as usize / row_group_size;
271            let row_id_in_group = row_id as usize % row_group_size;
272
273            if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
274                && *rg_id == row_group_id
275            {
276                row_ids.push(row_id_in_group);
277            } else {
278                let mut row_ids = Vec::with_capacity(est_rows_per_group);
279                row_ids.push(row_id_in_group);
280                row_group_to_row_ids.push((row_group_id, row_ids));
281            }
282        }
283
284        row_group_to_row_ids
285    }
286
287    /// Intersects two `RowGroupSelection`s.
288    pub fn intersect(&self, other: &Self) -> Self {
289        let mut res = BTreeMap::new();
290        let mut total_row_count = 0;
291        let mut total_selector_len = 0;
292
293        for (rg_id, x) in other.selection_in_rg.iter() {
294            let Some(y) = self.selection_in_rg.get(rg_id) else {
295                continue;
296            };
297            let selection = intersect_row_selections(&x.selection, &y.selection);
298            let row_count = selection.row_count();
299            let selector_len = selector_len(&selection);
300            if row_count > 0 {
301                total_row_count += row_count;
302                total_selector_len += selector_len;
303                res.insert(
304                    *rg_id,
305                    RowSelectionWithCount {
306                        selection,
307                        row_count,
308                        selector_len,
309                    },
310                );
311            }
312        }
313
314        Self {
315            selection_in_rg: res,
316            row_count: total_row_count,
317            selector_len: total_selector_len,
318        }
319    }
320
321    /// Returns the number of row groups in the selection.
322    pub fn row_group_count(&self) -> usize {
323        self.selection_in_rg.len()
324    }
325
326    /// Returns the number of rows in the selection.
327    pub fn row_count(&self) -> usize {
328        self.row_count
329    }
330
331    /// Returns the first row group in the selection.
332    ///
333    /// Skip the row group if the row count is 0.
334    pub fn pop_first(&mut self) -> Option<(usize, RowSelection)> {
335        while let Some((
336            row_group_id,
337            RowSelectionWithCount {
338                selection,
339                row_count,
340                selector_len,
341            },
342        )) = self.selection_in_rg.pop_first()
343        {
344            if row_count > 0 {
345                self.row_count -= row_count;
346                self.selector_len -= selector_len;
347                return Some((row_group_id, selection));
348            }
349        }
350
351        None
352    }
353
354    /// Removes a row group from the selection.
355    pub fn remove_row_group(&mut self, row_group_id: usize) {
356        let Some(RowSelectionWithCount {
357            row_count,
358            selector_len,
359            ..
360        }) = self.selection_in_rg.remove(&row_group_id)
361        else {
362            return;
363        };
364        self.row_count -= row_count;
365        self.selector_len -= selector_len;
366    }
367
368    /// Returns true if the selection is empty.
369    pub fn is_empty(&self) -> bool {
370        self.selection_in_rg.is_empty()
371    }
372
373    /// Returns true if the selection contains a row group with the given ID.
374    pub fn contains_row_group(&self, row_group_id: usize) -> bool {
375        self.selection_in_rg.contains_key(&row_group_id)
376    }
377
378    /// Returns true if the selection contains a row group with the given ID and the row selection is not empty.
379    pub fn contains_non_empty_row_group(&self, row_group_id: usize) -> bool {
380        self.selection_in_rg
381            .get(&row_group_id)
382            .map(|r| r.row_count > 0)
383            .unwrap_or(false)
384    }
385
386    /// Returns an iterator over the row groups in the selection.
387    pub fn iter(&self) -> impl Iterator<Item = (&usize, &RowSelection)> {
388        self.selection_in_rg
389            .iter()
390            .map(|(row_group_id, x)| (row_group_id, &x.selection))
391    }
392
393    /// Returns the memory usage of the selection.
394    pub fn mem_usage(&self) -> usize {
395        self.selector_len * size_of::<RowSelector>()
396            + self.selection_in_rg.len() * size_of::<RowSelectionWithCount>()
397    }
398
399    /// Concatenates `other` into `self`. `other` must not contain row groups that `self` contains.
400    ///
401    /// Panics if `self` contains row groups that `other` contains.
402    pub fn concat(&mut self, other: &Self) {
403        for (rg_id, other_rs) in other.selection_in_rg.iter() {
404            if self.selection_in_rg.contains_key(rg_id) {
405                panic!("row group {} is already in `self`", rg_id);
406            }
407
408            self.selection_in_rg.insert(*rg_id, other_rs.clone());
409            self.row_count += other_rs.row_count;
410            self.selector_len += other_rs.selector_len;
411        }
412    }
413
414    /// Fills the missing row groups with empty selections.
415    /// This is to indicate that the row groups are searched even if no rows are found.
416    fn fill_missing_row_groups(
417        selection_in_rg: &mut BTreeMap<usize, RowSelectionWithCount>,
418        num_row_groups: usize,
419    ) {
420        for rg_id in 0..num_row_groups {
421            selection_in_rg.entry(rg_id).or_default();
422        }
423    }
424}
425
426/// Ported from `parquet` but trailing rows are removed.
427///
428/// Combine two lists of `RowSelection` return the intersection of them
429/// For example:
430/// self:      NNYYYYNNYYNYN
431/// other:     NYNNNNNNY
432///
433/// returned:  NNNNNNNNY     (modified)
434///            NNNNNNNNYYNYN (original)
435fn intersect_row_selections(left: &RowSelection, right: &RowSelection) -> RowSelection {
436    let mut l_iter = left.iter().copied().peekable();
437    let mut r_iter = right.iter().copied().peekable();
438
439    let iter = std::iter::from_fn(move || {
440        loop {
441            let l = l_iter.peek_mut();
442            let r = r_iter.peek_mut();
443
444            match (l, r) {
445                (Some(a), _) if a.row_count == 0 => {
446                    l_iter.next().unwrap();
447                }
448                (_, Some(b)) if b.row_count == 0 => {
449                    r_iter.next().unwrap();
450                }
451                (Some(l), Some(r)) => {
452                    return match (l.skip, r.skip) {
453                        // Keep both ranges
454                        (false, false) => {
455                            if l.row_count < r.row_count {
456                                r.row_count -= l.row_count;
457                                l_iter.next()
458                            } else {
459                                l.row_count -= r.row_count;
460                                r_iter.next()
461                            }
462                        }
463                        // skip at least one
464                        _ => {
465                            if l.row_count < r.row_count {
466                                let skip = l.row_count;
467                                r.row_count -= l.row_count;
468                                l_iter.next();
469                                Some(RowSelector::skip(skip))
470                            } else {
471                                let skip = r.row_count;
472                                l.row_count -= skip;
473                                r_iter.next();
474                                Some(RowSelector::skip(skip))
475                            }
476                        }
477                    };
478                }
479                (None, _) => return None,
480                (_, None) => return None,
481            }
482        }
483    });
484
485    iter.collect()
486}
487
488/// Converts an iterator of row ranges into a `RowSelection` by creating a sequence of `RowSelector`s.
489///
490/// This function processes each range in the input and either creates a new selector or merges
491/// with the existing one, depending on whether the current range is contiguous with the preceding one
492/// or if there's a gap that requires skipping rows. It handles both "select" and "skip" actions,
493/// optimizing the list of selectors by merging contiguous actions of the same type.
494///
495/// Note: overlapping ranges are not supported and will result in an incorrect selection.
496pub(crate) fn row_selection_from_row_ranges(
497    row_ranges: impl Iterator<Item = Range<usize>>,
498    total_row_count: usize,
499) -> RowSelection {
500    let mut selectors: Vec<RowSelector> = Vec::new();
501    let mut last_processed_end = 0;
502
503    for Range { start, end } in row_ranges {
504        let end = end.min(total_row_count);
505        if start > last_processed_end {
506            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
507        }
508
509        add_or_merge_selector(&mut selectors, end - start, false);
510        last_processed_end = end;
511    }
512
513    RowSelection::from(selectors)
514}
515
516/// Converts an iterator of sorted row IDs into a `RowSelection`.
517///
518/// Note: the input iterator must be sorted in ascending order and
519///       contain unique row IDs in the range [0, total_row_count).
520pub(crate) fn row_selection_from_sorted_row_ids(
521    row_ids: impl Iterator<Item = usize>,
522    total_row_count: usize,
523) -> RowSelection {
524    let mut selectors: Vec<RowSelector> = Vec::new();
525    let mut last_processed_end = 0;
526
527    for row_id in row_ids {
528        let start = row_id;
529        let end = start + 1;
530
531        if start > last_processed_end {
532            add_or_merge_selector(&mut selectors, start - last_processed_end, true);
533        }
534
535        add_or_merge_selector(&mut selectors, end - start, false);
536        last_processed_end = end;
537    }
538
539    if last_processed_end < total_row_count {
540        add_or_merge_selector(&mut selectors, total_row_count - last_processed_end, true);
541    }
542
543    RowSelection::from(selectors)
544}
545
546/// Helper function to either add a new `RowSelector` to `selectors` or merge it with the last one
547/// if they are of the same type (both skip or both select).
548fn add_or_merge_selector(selectors: &mut Vec<RowSelector>, count: usize, is_skip: bool) {
549    if let Some(last) = selectors.last_mut() {
550        // Merge with last if both actions are same
551        if last.skip == is_skip {
552            last.row_count += count;
553            return;
554        }
555    }
556    // Add new selector otherwise
557    let new_selector = if is_skip {
558        RowSelector::skip(count)
559    } else {
560        RowSelector::select(count)
561    };
562    selectors.push(new_selector);
563}
564
565/// Returns the length of the selectors in the selection.
566fn selector_len(selection: &RowSelection) -> usize {
567    selection.iter().size_hint().0
568}
569
570#[cfg(test)]
571#[allow(clippy::single_range_in_vec_init)]
572mod tests {
573    use super::*;
574
575    #[test]
576    fn test_selector_len() {
577        let selection = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
578        assert_eq!(selector_len(&selection), 2);
579
580        let selection = RowSelection::from(vec![
581            RowSelector::select(5),
582            RowSelector::skip(5),
583            RowSelector::select(5),
584        ]);
585        assert_eq!(selector_len(&selection), 3);
586
587        let selection = RowSelection::from(vec![]);
588        assert_eq!(selector_len(&selection), 0);
589    }
590
591    #[test]
592    fn test_single_contiguous_range() {
593        let selection = row_selection_from_row_ranges(Some(5..10).into_iter(), 10);
594        let expected = RowSelection::from(vec![RowSelector::skip(5), RowSelector::select(5)]);
595        assert_eq!(selection, expected);
596    }
597
598    #[test]
599    fn test_non_contiguous_ranges() {
600        let ranges = [1..3, 5..8];
601        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
602        let expected = RowSelection::from(vec![
603            RowSelector::skip(1),
604            RowSelector::select(2),
605            RowSelector::skip(2),
606            RowSelector::select(3),
607        ]);
608        assert_eq!(selection, expected);
609    }
610
611    #[test]
612    fn test_empty_range() {
613        let ranges = [];
614        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
615        let expected = RowSelection::from(vec![]);
616        assert_eq!(selection, expected);
617    }
618
619    #[test]
620    fn test_adjacent_ranges() {
621        let ranges = [1..2, 2..3];
622        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10);
623        let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(2)]);
624        assert_eq!(selection, expected);
625    }
626
627    #[test]
628    fn test_large_gap_between_ranges() {
629        let ranges = [1..2, 100..101];
630        let selection = row_selection_from_row_ranges(ranges.iter().cloned(), 10240);
631        let expected = RowSelection::from(vec![
632            RowSelector::skip(1),
633            RowSelector::select(1),
634            RowSelector::skip(98),
635            RowSelector::select(1),
636        ]);
637        assert_eq!(selection, expected);
638    }
639
640    #[test]
641    fn test_range_end_over_total_row_count() {
642        let ranges = Some(1..10);
643        let selection = row_selection_from_row_ranges(ranges.into_iter(), 5);
644        let expected = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(4)]);
645        assert_eq!(selection, expected);
646    }
647
648    #[test]
649    fn test_row_ids_to_selection() {
650        let row_ids = [1, 3, 5, 7, 9].into_iter();
651        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
652        let expected = RowSelection::from(vec![
653            RowSelector::skip(1),
654            RowSelector::select(1),
655            RowSelector::skip(1),
656            RowSelector::select(1),
657            RowSelector::skip(1),
658            RowSelector::select(1),
659            RowSelector::skip(1),
660            RowSelector::select(1),
661            RowSelector::skip(1),
662            RowSelector::select(1),
663        ]);
664        assert_eq!(selection, expected);
665    }
666
667    #[test]
668    fn test_row_ids_to_selection_full() {
669        let row_ids = 0..10;
670        let selection = row_selection_from_sorted_row_ids(row_ids, 10);
671        let expected = RowSelection::from(vec![RowSelector::select(10)]);
672        assert_eq!(selection, expected);
673    }
674
675    #[test]
676    fn test_row_ids_to_selection_empty() {
677        let selection = row_selection_from_sorted_row_ids(None.into_iter(), 10);
678        let expected = RowSelection::from(vec![RowSelector::skip(10)]);
679        assert_eq!(selection, expected);
680    }
681
682    #[test]
683    fn test_group_row_ids() {
684        let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
685        let row_group_size = 5;
686        let num_row_groups = 3;
687
688        let row_group_to_row_ids =
689            RowGroupSelection::group_row_ids_by_row_group(row_ids, row_group_size, num_row_groups);
690
691        assert_eq!(
692            row_group_to_row_ids,
693            vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
694        );
695    }
696
697    #[test]
698    fn test_row_group_selection_new() {
699        // Test with regular case
700        let selection = RowGroupSelection::new(100, 250);
701        assert_eq!(selection.row_count(), 250);
702        assert_eq!(selection.row_group_count(), 3);
703
704        // Check content of each row group
705        let row_selection = selection.get(0).unwrap();
706        assert_eq!(row_selection.row_count(), 100);
707
708        let row_selection = selection.get(1).unwrap();
709        assert_eq!(row_selection.row_count(), 100);
710
711        let row_selection = selection.get(2).unwrap();
712        assert_eq!(row_selection.row_count(), 50);
713
714        // Test with empty selection
715        let selection = RowGroupSelection::new(100, 0);
716        assert_eq!(selection.row_count(), 0);
717        assert_eq!(selection.row_group_count(), 0);
718        assert!(selection.get(0).is_none());
719
720        // Test with single row group
721        let selection = RowGroupSelection::new(100, 50);
722        assert_eq!(selection.row_count(), 50);
723        assert_eq!(selection.row_group_count(), 1);
724
725        let row_selection = selection.get(0).unwrap();
726        assert_eq!(row_selection.row_count(), 50);
727
728        // Test with row count that doesn't divide evenly
729        let selection = RowGroupSelection::new(100, 150);
730        assert_eq!(selection.row_count(), 150);
731        assert_eq!(selection.row_group_count(), 2);
732
733        let row_selection = selection.get(0).unwrap();
734        assert_eq!(row_selection.row_count(), 100);
735
736        let row_selection = selection.get(1).unwrap();
737        assert_eq!(row_selection.row_count(), 50);
738
739        // Test with row count that's just over a multiple of row_group_size
740        let selection = RowGroupSelection::new(100, 101);
741        assert_eq!(selection.row_count(), 101);
742        assert_eq!(selection.row_group_count(), 2);
743
744        let row_selection = selection.get(0).unwrap();
745        assert_eq!(row_selection.row_count(), 100);
746
747        let row_selection = selection.get(1).unwrap();
748        assert_eq!(row_selection.row_count(), 1);
749    }
750
751    #[test]
752    fn test_from_row_ids() {
753        let row_group_size = 100;
754        let num_row_groups = 3;
755
756        // Test with regular case
757        let row_ids: BTreeSet<u32> = vec![5, 15, 25, 35, 105, 115, 205, 215]
758            .into_iter()
759            .collect();
760        let selection = RowGroupSelection::from_row_ids(row_ids, row_group_size, num_row_groups);
761        assert_eq!(selection.row_count(), 8);
762        assert_eq!(selection.row_group_count(), 3);
763
764        // Check content of each row group
765        let row_selection = selection.get(0).unwrap();
766        assert_eq!(row_selection.row_count(), 4); // 5, 15, 25, 35
767
768        let row_selection = selection.get(1).unwrap();
769        assert_eq!(row_selection.row_count(), 2); // 105, 115
770
771        let row_selection = selection.get(2).unwrap();
772        assert_eq!(row_selection.row_count(), 2); // 205, 215
773
774        // Test with empty row IDs
775        let empty_row_ids: BTreeSet<u32> = BTreeSet::new();
776        let selection =
777            RowGroupSelection::from_row_ids(empty_row_ids, row_group_size, num_row_groups);
778        assert_eq!(selection.row_count(), 0);
779        assert_eq!(selection.row_group_count(), 3);
780
781        // Test with consecutive row IDs
782        let consecutive_row_ids: BTreeSet<u32> = vec![5, 6, 7, 8, 9].into_iter().collect();
783        let selection =
784            RowGroupSelection::from_row_ids(consecutive_row_ids, row_group_size, num_row_groups);
785        assert_eq!(selection.row_count(), 5);
786        assert_eq!(selection.row_group_count(), 3);
787
788        let row_selection = selection.get(0).unwrap();
789        assert_eq!(row_selection.row_count(), 5); // 5, 6, 7, 8, 9
790
791        // Test with row IDs at row group boundaries
792        let boundary_row_ids: BTreeSet<u32> = vec![0, 99, 100, 199, 200, 249].into_iter().collect();
793        let selection =
794            RowGroupSelection::from_row_ids(boundary_row_ids, row_group_size, num_row_groups);
795        assert_eq!(selection.row_count(), 6);
796        assert_eq!(selection.row_group_count(), 3);
797
798        let row_selection = selection.get(0).unwrap();
799        assert_eq!(row_selection.row_count(), 2); // 0, 99
800
801        let row_selection = selection.get(1).unwrap();
802        assert_eq!(row_selection.row_count(), 2); // 100, 199
803
804        let row_selection = selection.get(2).unwrap();
805        assert_eq!(row_selection.row_count(), 2); // 200, 249
806
807        // Test with single row group
808        let single_group_row_ids: BTreeSet<u32> = vec![5, 10, 15].into_iter().collect();
809        let selection = RowGroupSelection::from_row_ids(single_group_row_ids, row_group_size, 1);
810        assert_eq!(selection.row_count(), 3);
811        assert_eq!(selection.row_group_count(), 1);
812
813        let row_selection = selection.get(0).unwrap();
814        assert_eq!(row_selection.row_count(), 3); // 5, 10, 15
815    }
816
817    #[test]
818    fn test_from_row_ranges() {
819        let row_group_size = 100;
820
821        // Test with regular case
822        let ranges = vec![
823            (0, vec![5..15, 25..35]), // Within [0, 100)
824            (1, vec![5..15]),         // Within [0, 100)
825            (2, vec![0..5, 10..15]),  // Within [0, 50) for last row group
826        ];
827        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
828        assert_eq!(selection.row_count(), 40);
829        assert_eq!(selection.row_group_count(), 3);
830
831        // Check content of each row group
832        let row_selection = selection.get(0).unwrap();
833        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 25..35 (10)
834
835        let row_selection = selection.get(1).unwrap();
836        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
837
838        let row_selection = selection.get(2).unwrap();
839        assert_eq!(row_selection.row_count(), 10); // 0..5 (5) + 10..15 (5)
840
841        // Test with empty ranges
842        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
843        let selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
844        assert_eq!(selection.row_count(), 0);
845        assert_eq!(selection.row_group_count(), 0);
846        assert!(selection.get(0).is_none());
847
848        // Test with adjacent ranges within same row group
849        let adjacent_ranges = vec![
850            (0, vec![5..15, 15..25]), // Adjacent ranges within [0, 100)
851        ];
852        let selection = RowGroupSelection::from_row_ranges(adjacent_ranges, row_group_size);
853        assert_eq!(selection.row_count(), 20);
854        assert_eq!(selection.row_group_count(), 1);
855
856        let row_selection = selection.get(0).unwrap();
857        assert_eq!(row_selection.row_count(), 20); // 5..15 (10) + 15..25 (10)
858
859        // Test with ranges at row group boundaries
860        let boundary_ranges = vec![
861            (0, vec![0..10, 90..100]), // Ranges at start and end of first row group
862            (1, vec![0..100]),         // Full range of second row group
863            (2, vec![0..50]),          // Full range of last row group
864        ];
865        let selection = RowGroupSelection::from_row_ranges(boundary_ranges, row_group_size);
866        assert_eq!(selection.row_count(), 170);
867        assert_eq!(selection.row_group_count(), 3);
868
869        let row_selection = selection.get(0).unwrap();
870        assert_eq!(row_selection.row_count(), 20); // 0..10 (10) + 90..100 (10)
871
872        let row_selection = selection.get(1).unwrap();
873        assert_eq!(row_selection.row_count(), 100); // 0..100 (100)
874
875        let row_selection = selection.get(2).unwrap();
876        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
877
878        // Test with single row group
879        let single_group_ranges = vec![
880            (0, vec![0..50]), // Half of first row group
881        ];
882        let selection = RowGroupSelection::from_row_ranges(single_group_ranges, row_group_size);
883        assert_eq!(selection.row_count(), 50);
884        assert_eq!(selection.row_group_count(), 1);
885
886        let row_selection = selection.get(0).unwrap();
887        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
888    }
889
890    #[test]
891    fn test_intersect() {
892        let row_group_size = 100;
893
894        // Test case 1: Regular intersection with partial overlap
895        let ranges1 = vec![
896            (0, vec![5..15, 25..35]), // Within [0, 100)
897            (1, vec![5..15]),         // Within [0, 100)
898        ];
899        let selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
900
901        let ranges2 = vec![
902            (0, vec![10..20]), // Within [0, 100)
903            (1, vec![10..20]), // Within [0, 100)
904            (2, vec![0..10]),  // Within [0, 50) for last row group
905        ];
906        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
907
908        let intersection = selection1.intersect(&selection2);
909        assert_eq!(intersection.row_count(), 10);
910        assert_eq!(intersection.row_group_count(), 2);
911
912        let row_selection = intersection.get(0).unwrap();
913        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
914
915        let row_selection = intersection.get(1).unwrap();
916        assert_eq!(row_selection.row_count(), 5); // 10..15 (5)
917
918        // Test case 2: Empty intersection with empty selection
919        let empty_ranges: Vec<(usize, Vec<Range<usize>>)> = vec![];
920        let empty_selection = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
921        let intersection = selection1.intersect(&empty_selection);
922        assert_eq!(intersection.row_count(), 0);
923        assert_eq!(intersection.row_group_count(), 0);
924        assert!(intersection.get(0).is_none());
925
926        // Test case 3: No overlapping row groups
927        let non_overlapping_ranges = vec![
928            (3, vec![0..10]), // Within [0, 50) for last row group
929        ];
930        let non_overlapping_selection =
931            RowGroupSelection::from_row_ranges(non_overlapping_ranges, row_group_size);
932        let intersection = selection1.intersect(&non_overlapping_selection);
933        assert_eq!(intersection.row_count(), 0);
934        assert_eq!(intersection.row_group_count(), 0);
935        assert!(intersection.get(0).is_none());
936
937        // Test case 4: Full overlap within same row group
938        let full_overlap_ranges1 = vec![
939            (0, vec![0..50]), // Within [0, 100)
940        ];
941        let full_overlap_ranges2 = vec![
942            (0, vec![0..50]), // Within [0, 100)
943        ];
944        let selection1 = RowGroupSelection::from_row_ranges(full_overlap_ranges1, row_group_size);
945        let selection2 = RowGroupSelection::from_row_ranges(full_overlap_ranges2, row_group_size);
946        let intersection = selection1.intersect(&selection2);
947        assert_eq!(intersection.row_count(), 50);
948        assert_eq!(intersection.row_group_count(), 1);
949
950        let row_selection = intersection.get(0).unwrap();
951        assert_eq!(row_selection.row_count(), 50); // 0..50 (50)
952
953        // Test case 5: Partial overlap at row group boundaries
954        let boundary_ranges1 = vec![
955            (0, vec![0..10, 90..100]), // Within [0, 100)
956            (1, vec![0..100]),         // Within [0, 100)
957        ];
958        let boundary_ranges2 = vec![
959            (0, vec![5..15, 95..100]), // Within [0, 100)
960            (1, vec![50..100]),        // Within [0, 100)
961        ];
962        let selection1 = RowGroupSelection::from_row_ranges(boundary_ranges1, row_group_size);
963        let selection2 = RowGroupSelection::from_row_ranges(boundary_ranges2, row_group_size);
964        let intersection = selection1.intersect(&selection2);
965        assert_eq!(intersection.row_count(), 60);
966        assert_eq!(intersection.row_group_count(), 2);
967
968        let row_selection = intersection.get(0).unwrap();
969        assert_eq!(row_selection.row_count(), 10); // 5..10 (5) + 95..100 (5)
970
971        let row_selection = intersection.get(1).unwrap();
972        assert_eq!(row_selection.row_count(), 50); // 50..100 (50)
973
974        // Test case 6: Multiple ranges with complex overlap
975        let complex_ranges1 = vec![
976            (0, vec![5..15, 25..35, 45..55]), // Within [0, 100)
977            (1, vec![10..20, 30..40]),        // Within [0, 100)
978        ];
979        let complex_ranges2 = vec![
980            (0, vec![10..20, 30..40, 50..60]), // Within [0, 100)
981            (1, vec![15..25, 35..45]),         // Within [0, 100)
982        ];
983        let selection1 = RowGroupSelection::from_row_ranges(complex_ranges1, row_group_size);
984        let selection2 = RowGroupSelection::from_row_ranges(complex_ranges2, row_group_size);
985        let intersection = selection1.intersect(&selection2);
986        assert_eq!(intersection.row_count(), 25);
987        assert_eq!(intersection.row_group_count(), 2);
988
989        let row_selection = intersection.get(0).unwrap();
990        assert_eq!(row_selection.row_count(), 15); // 10..15 (5) + 30..35 (5) + 50..55 (5)
991
992        let row_selection = intersection.get(1).unwrap();
993        assert_eq!(row_selection.row_count(), 10); // 15..20 (5) + 35..40 (5)
994
995        // Test case 7: Intersection with last row group (smaller size)
996        let last_rg_ranges1 = vec![
997            (2, vec![0..25, 30..40]), // Within [0, 50) for last row group
998        ];
999        let last_rg_ranges2 = vec![
1000            (2, vec![20..30, 35..45]), // Within [0, 50) for last row group
1001        ];
1002        let selection1 = RowGroupSelection::from_row_ranges(last_rg_ranges1, row_group_size);
1003        let selection2 = RowGroupSelection::from_row_ranges(last_rg_ranges2, row_group_size);
1004        let intersection = selection1.intersect(&selection2);
1005        assert_eq!(intersection.row_count(), 10);
1006        assert_eq!(intersection.row_group_count(), 1);
1007
1008        let row_selection = intersection.get(2).unwrap();
1009        assert_eq!(row_selection.row_count(), 10); // 20..25 (5) + 35..40 (5)
1010
1011        // Test case 8: Intersection with empty ranges in one selection
1012        let empty_ranges = vec![
1013            (0, vec![]),      // Empty ranges
1014            (1, vec![5..15]), // Within [0, 100)
1015        ];
1016        let selection1 = RowGroupSelection::from_row_ranges(empty_ranges, row_group_size);
1017        let ranges2 = vec![
1018            (0, vec![5..15, 25..35]), // Within [0, 100)
1019            (1, vec![5..15]),         // Within [0, 100)
1020        ];
1021        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
1022        let intersection = selection1.intersect(&selection2);
1023        assert_eq!(intersection.row_count(), 10);
1024        assert_eq!(intersection.row_group_count(), 1);
1025
1026        let row_selection = intersection.get(1).unwrap();
1027        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1028    }
1029
1030    #[test]
1031    fn test_pop_first() {
1032        let row_group_size = 100;
1033        let ranges = vec![
1034            (0, vec![5..15]), // Within [0, 100)
1035            (1, vec![5..15]), // Within [0, 100)
1036            (2, vec![0..5]),  // Within [0, 50) for last row group
1037        ];
1038        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1039
1040        // Test popping first row group
1041        let (rg_id, row_selection) = selection.pop_first().unwrap();
1042        assert_eq!(rg_id, 0);
1043        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1044        assert_eq!(selection.row_count(), 15);
1045        assert_eq!(selection.row_group_count(), 2);
1046
1047        // Verify remaining row groups' content
1048        let row_selection = selection.get(1).unwrap();
1049        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1050
1051        let row_selection = selection.get(2).unwrap();
1052        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1053
1054        // Test popping second row group
1055        let (rg_id, row_selection) = selection.pop_first().unwrap();
1056        assert_eq!(rg_id, 1);
1057        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1058        assert_eq!(selection.row_count(), 5);
1059        assert_eq!(selection.row_group_count(), 1);
1060
1061        // Verify remaining row group's content
1062        let row_selection = selection.get(2).unwrap();
1063        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1064
1065        // Test popping last row group
1066        let (rg_id, row_selection) = selection.pop_first().unwrap();
1067        assert_eq!(rg_id, 2);
1068        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1069        assert_eq!(selection.row_count(), 0);
1070        assert_eq!(selection.row_group_count(), 0);
1071        assert!(selection.is_empty());
1072
1073        // Test popping from empty selection
1074        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1075        assert!(empty_selection.pop_first().is_none());
1076        assert_eq!(empty_selection.row_count(), 0);
1077        assert_eq!(empty_selection.row_group_count(), 0);
1078        assert!(empty_selection.is_empty());
1079    }
1080
1081    #[test]
1082    fn test_remove_row_group() {
1083        let row_group_size = 100;
1084        let ranges = vec![
1085            (0, vec![5..15]), // Within [0, 100)
1086            (1, vec![5..15]), // Within [0, 100)
1087            (2, vec![0..5]),  // Within [0, 50) for last row group
1088        ];
1089        let mut selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1090
1091        // Test removing existing row group
1092        selection.remove_row_group(1);
1093        assert_eq!(selection.row_count(), 15);
1094        assert_eq!(selection.row_group_count(), 2);
1095        assert!(!selection.contains_row_group(1));
1096
1097        // Verify remaining row groups' content
1098        let row_selection = selection.get(0).unwrap();
1099        assert_eq!(row_selection.row_count(), 10); // 5..15 (10)
1100
1101        let row_selection = selection.get(2).unwrap();
1102        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1103
1104        // Test removing non-existent row group
1105        selection.remove_row_group(5);
1106        assert_eq!(selection.row_count(), 15);
1107        assert_eq!(selection.row_group_count(), 2);
1108
1109        // Test removing all row groups
1110        selection.remove_row_group(0);
1111        assert_eq!(selection.row_count(), 5);
1112        assert_eq!(selection.row_group_count(), 1);
1113
1114        let row_selection = selection.get(2).unwrap();
1115        assert_eq!(row_selection.row_count(), 5); // 0..5 (5)
1116
1117        selection.remove_row_group(2);
1118        assert_eq!(selection.row_count(), 0);
1119        assert_eq!(selection.row_group_count(), 0);
1120        assert!(selection.is_empty());
1121
1122        // Test removing from empty selection
1123        let mut empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1124        empty_selection.remove_row_group(0);
1125        assert_eq!(empty_selection.row_count(), 0);
1126        assert_eq!(empty_selection.row_group_count(), 0);
1127        assert!(empty_selection.is_empty());
1128    }
1129
1130    #[test]
1131    fn test_contains_row_group() {
1132        let row_group_size = 100;
1133        let ranges = vec![
1134            (0, vec![5..15]), // Within [0, 100)
1135            (1, vec![5..15]), // Within [0, 100)
1136        ];
1137        let selection = RowGroupSelection::from_row_ranges(ranges, row_group_size);
1138
1139        assert!(selection.contains_row_group(0));
1140        assert!(selection.contains_row_group(1));
1141        assert!(!selection.contains_row_group(2));
1142
1143        // Test empty selection
1144        let empty_selection = RowGroupSelection::from_row_ranges(vec![], row_group_size);
1145        assert!(!empty_selection.contains_row_group(0));
1146    }
1147
1148    #[test]
1149    fn test_concat() {
1150        let row_group_size = 100;
1151        let ranges1 = vec![
1152            (0, vec![5..15]), // Within [0, 100)
1153            (1, vec![5..15]), // Within [0, 100)
1154        ];
1155
1156        let ranges2 = vec![
1157            (2, vec![5..15]), // Within [0, 100)
1158            (3, vec![5..15]), // Within [0, 100)
1159        ];
1160
1161        let mut selection1 = RowGroupSelection::from_row_ranges(ranges1, row_group_size);
1162        let selection2 = RowGroupSelection::from_row_ranges(ranges2, row_group_size);
1163
1164        selection1.concat(&selection2);
1165        assert_eq!(selection1.row_count(), 40);
1166        assert_eq!(selection1.row_group_count(), 4);
1167    }
1168}