index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Range;
17
18use fastbloom::BloomFilter;
19use greptime_proto::v1::index::BloomFilterMeta;
20use itertools::Itertools;
21
22use crate::bloom_filter::error::Result;
23use crate::bloom_filter::reader::BloomFilterReader;
24use crate::Bytes;
25
26/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
27/// one of the elements (logical OR semantic) for the predicate to be satisfied.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct InListPredicate {
30    /// List of acceptable values.
31    pub list: HashSet<Bytes>,
32}
33
34pub struct BloomFilterApplier {
35    reader: Box<dyn BloomFilterReader + Send>,
36    meta: BloomFilterMeta,
37}
38
39impl BloomFilterApplier {
40    pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
41        let meta = reader.metadata().await?;
42
43        Ok(Self { reader, meta })
44    }
45
46    /// Searches ranges of rows that match all the given predicates in the search ranges.
47    /// Each predicate represents an OR condition of probes, and all predicates must match (AND semantics).
48    /// The logic is: (probe1 OR probe2 OR ...) AND (probe3 OR probe4 OR ...)
49    pub async fn search(
50        &mut self,
51        predicates: &[InListPredicate],
52        search_ranges: &[Range<usize>],
53    ) -> Result<Vec<Range<usize>>> {
54        if predicates.is_empty() {
55            // If no predicates, return empty result
56            return Ok(Vec::new());
57        }
58
59        let segments = self.row_ranges_to_segments(search_ranges);
60        let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
61        let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
62        Ok(intersect_ranges(search_ranges, &matching_row_ranges))
63    }
64
65    /// Converts row ranges to segment ranges and returns unique segments
66    fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
67        let rows_per_segment = self.meta.rows_per_segment as usize;
68
69        let mut segments = vec![];
70        for range in row_ranges {
71            let start_seg = range.start / rows_per_segment;
72            let mut end_seg = range.end.div_ceil(rows_per_segment);
73
74            if end_seg == self.meta.segment_loc_indices.len() + 1 {
75                // Handle legacy bug with missing last segment
76                //
77                // In a previous version, there was a bug where if the last segment was all null,
78                // this segment would not be written into the index. This caused the slice
79                // `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
80                // the missing segment. Since the `search` function does not search for nulls,
81                // we can simply ignore the last segment in this buggy scenario.
82                end_seg -= 1;
83            }
84            segments.extend(start_seg..end_seg);
85        }
86
87        // Ensure segments are unique and sorted
88        segments.sort_unstable();
89        segments.dedup();
90
91        segments
92    }
93
94    /// Loads bloom filters for the given segments and returns the segment locations and bloom filters
95    async fn load_bloom_filters(
96        &mut self,
97        segments: &[usize],
98    ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
99        let segment_locations = segments
100            .iter()
101            .map(|&seg| (self.meta.segment_loc_indices[seg], seg))
102            .collect::<Vec<_>>();
103
104        let bloom_filter_locs = segment_locations
105            .iter()
106            .map(|(loc, _)| *loc)
107            .dedup()
108            .map(|i| self.meta.bloom_filter_locs[i as usize])
109            .collect::<Vec<_>>();
110
111        let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
112
113        Ok((segment_locations, bloom_filters))
114    }
115
116    /// Finds segments that match all predicates and converts them to row ranges
117    fn find_matching_rows(
118        &self,
119        segment_locations: Vec<(u64, usize)>,
120        bloom_filters: Vec<BloomFilter>,
121        predicates: &[InListPredicate],
122    ) -> Vec<Range<usize>> {
123        let rows_per_segment = self.meta.rows_per_segment as usize;
124        let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
125
126        // Group segments by their location index (since they have the same bloom filter) and check if they match all predicates
127        for ((_loc_index, group), bloom_filter) in segment_locations
128            .into_iter()
129            .chunk_by(|(loc, _)| *loc)
130            .into_iter()
131            .zip(bloom_filters.iter())
132        {
133            // Check if this bloom filter matches each predicate (AND semantics)
134            let matches_all_predicates = predicates.iter().all(|predicate| {
135                // For each predicate, at least one probe must match (OR semantics)
136                predicate
137                    .list
138                    .iter()
139                    .any(|probe| bloom_filter.contains(probe))
140            });
141
142            if !matches_all_predicates {
143                continue;
144            }
145
146            // For each matching segment, convert to row range
147            for (_, segment) in group {
148                let start_row = segment * rows_per_segment;
149                let end_row = (segment + 1) * rows_per_segment;
150                matching_row_ranges.push(start_row..end_row);
151            }
152        }
153
154        self.merge_adjacent_ranges(matching_row_ranges)
155    }
156
157    /// Merges adjacent row ranges to reduce the number of ranges
158    fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
159        ranges
160            .into_iter()
161            .coalesce(|prev, next| {
162                if prev.end == next.start {
163                    Ok(prev.start..next.end)
164                } else {
165                    Err((prev, next))
166                }
167            })
168            .collect::<Vec<_>>()
169    }
170}
171
172/// Intersects two lists of ranges and returns the intersection.
173///
174/// The input lists are assumed to be sorted and non-overlapping.
175fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
176    let mut i = 0;
177    let mut j = 0;
178
179    let mut output = Vec::new();
180    while i < lhs.len() && j < rhs.len() {
181        let r1 = &lhs[i];
182        let r2 = &rhs[j];
183
184        // Find intersection if exists
185        let start = r1.start.max(r2.start);
186        let end = r1.end.min(r2.end);
187
188        if start < end {
189            output.push(start..end);
190        }
191
192        // Move forward the range that ends first
193        if r1.end < r2.end {
194            i += 1;
195        } else {
196            j += 1;
197        }
198    }
199
200    output
201}
202
203#[cfg(test)]
204mod tests {
205    use std::sync::atomic::AtomicUsize;
206    use std::sync::Arc;
207
208    use futures::io::Cursor;
209
210    use super::*;
211    use crate::bloom_filter::creator::BloomFilterCreator;
212    use crate::bloom_filter::reader::BloomFilterReaderImpl;
213    use crate::external_provider::MockExternalTempFileProvider;
214
215    #[tokio::test]
216    #[allow(clippy::single_range_in_vec_init)]
217    async fn test_appliter() {
218        let mut writer = Cursor::new(Vec::new());
219        let mut creator = BloomFilterCreator::new(
220            4,
221            Arc::new(MockExternalTempFileProvider::new()),
222            Arc::new(AtomicUsize::new(0)),
223            None,
224        );
225
226        let rows = vec![
227            // seg 0
228            vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
229            vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
230            vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
231            vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
232            // seg 1
233            vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
234            vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
235            vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
236            vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
237            // seg 2
238            vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
239            vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
240            vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
241            vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
242            // duplicate rows
243            // seg 3
244            vec![b"dup".to_vec()],
245            vec![b"dup".to_vec()],
246            vec![b"dup".to_vec()],
247            vec![b"dup".to_vec()],
248            // seg 4
249            vec![b"dup".to_vec()],
250            vec![b"dup".to_vec()],
251            vec![b"dup".to_vec()],
252            vec![b"dup".to_vec()],
253            // seg 5
254            vec![b"dup".to_vec()],
255            vec![b"dup".to_vec()],
256            vec![b"dup".to_vec()],
257            vec![b"dup".to_vec()],
258            // seg 6
259            vec![b"dup".to_vec()],
260            vec![b"dup".to_vec()],
261            vec![b"dup".to_vec()],
262            vec![b"dup".to_vec()],
263        ];
264
265        for row in rows {
266            creator.push_row_elems(row).await.unwrap();
267        }
268
269        creator.finish(&mut writer).await.unwrap();
270
271        let bytes = writer.into_inner();
272        let reader = BloomFilterReaderImpl::new(bytes);
273        let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
274
275        // Test cases for predicates
276        let cases = vec![
277            // Single value predicates
278            (
279                vec![InListPredicate {
280                    list: HashSet::from_iter([b"row00".to_vec()]),
281                }],
282                0..28,
283                vec![0..4],
284            ),
285            (
286                vec![InListPredicate {
287                    list: HashSet::from_iter([b"row05".to_vec()]),
288                }],
289                4..8,
290                vec![4..8],
291            ),
292            (
293                vec![InListPredicate {
294                    list: HashSet::from_iter([b"row03".to_vec()]),
295                }],
296                4..8,
297                vec![],
298            ),
299            // Multiple values in a single predicate (OR logic)
300            (
301                vec![InListPredicate {
302                    list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
303                }],
304                0..28,
305                vec![0..8],
306            ),
307            (
308                vec![InListPredicate {
309                    list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
310                }],
311                0..28,
312                vec![4..12],
313            ),
314            // Non-existent values
315            (
316                vec![InListPredicate {
317                    list: HashSet::from_iter([b"row99".to_vec()]),
318                }],
319                0..28,
320                vec![],
321            ),
322            // Empty range
323            (
324                vec![InListPredicate {
325                    list: HashSet::from_iter([b"row00".to_vec()]),
326                }],
327                12..12,
328                vec![],
329            ),
330            // Multiple values in a single predicate within specific ranges
331            (
332                vec![InListPredicate {
333                    list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
334                }],
335                0..12,
336                vec![4..8],
337            ),
338            (
339                vec![InListPredicate {
340                    list: HashSet::from_iter([b"seg01".to_vec()]),
341                }],
342                0..28,
343                vec![4..8],
344            ),
345            (
346                vec![InListPredicate {
347                    list: HashSet::from_iter([b"seg01".to_vec()]),
348                }],
349                6..28,
350                vec![6..8],
351            ),
352            // Values spanning multiple segments
353            (
354                vec![InListPredicate {
355                    list: HashSet::from_iter([b"overl".to_vec()]),
356                }],
357                0..28,
358                vec![0..8],
359            ),
360            (
361                vec![InListPredicate {
362                    list: HashSet::from_iter([b"overl".to_vec()]),
363                }],
364                2..28,
365                vec![2..8],
366            ),
367            (
368                vec![InListPredicate {
369                    list: HashSet::from_iter([b"overp".to_vec()]),
370                }],
371                0..10,
372                vec![4..10],
373            ),
374            // Duplicate values
375            (
376                vec![InListPredicate {
377                    list: HashSet::from_iter([b"dup".to_vec()]),
378                }],
379                0..12,
380                vec![],
381            ),
382            (
383                vec![InListPredicate {
384                    list: HashSet::from_iter([b"dup".to_vec()]),
385                }],
386                0..16,
387                vec![12..16],
388            ),
389            (
390                vec![InListPredicate {
391                    list: HashSet::from_iter([b"dup".to_vec()]),
392                }],
393                0..28,
394                vec![12..28],
395            ),
396            // Multiple predicates (AND logic)
397            (
398                vec![
399                    InListPredicate {
400                        list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
401                    },
402                    InListPredicate {
403                        list: HashSet::from_iter([b"seg00".to_vec()]),
404                    },
405                ],
406                0..28,
407                vec![0..4],
408            ),
409            (
410                vec![
411                    InListPredicate {
412                        list: HashSet::from_iter([b"overl".to_vec()]),
413                    },
414                    InListPredicate {
415                        list: HashSet::from_iter([b"seg01".to_vec()]),
416                    },
417                ],
418                0..28,
419                vec![4..8],
420            ),
421        ];
422
423        for (predicates, search_range, expected) in cases {
424            let result = applier.search(&predicates, &[search_range]).await.unwrap();
425            assert_eq!(
426                result, expected,
427                "Expected {:?}, got {:?}",
428                expected, result
429            );
430        }
431    }
432
433    #[test]
434    #[allow(clippy::single_range_in_vec_init)]
435    fn test_intersect_ranges() {
436        // empty inputs
437        assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
438        assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
439        assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
440
441        // no overlap
442        assert_eq!(
443            intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
444            Vec::<Range<usize>>::new()
445        );
446
447        // single overlap
448        assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
449
450        // multiple overlaps
451        assert_eq!(
452            intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
453            vec![2..5, 8..10, 12..13]
454        );
455
456        // exact overlap
457        assert_eq!(
458            intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
459            vec![1..3, 5..7]
460        );
461
462        // contained ranges
463        assert_eq!(
464            intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
465            vec![2..4, 5..7, 8..9]
466        );
467
468        // partial overlaps
469        assert_eq!(
470            intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
471            vec![2..4, 6..7, 8..9]
472        );
473
474        // single point overlap
475        assert_eq!(
476            intersect_ranges(&[1..3], &[3..5]),
477            Vec::<Range<usize>>::new()
478        );
479
480        // large ranges
481        assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
482    }
483}