index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16use std::ops::Range;
17
18use fastbloom::BloomFilter;
19use greptime_proto::v1::index::BloomFilterMeta;
20use itertools::Itertools;
21
22use crate::Bytes;
23use crate::bloom_filter::error::Result;
24use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
25
26/// `InListPredicate` contains a list of acceptable values. A value needs to match at least
27/// one of the elements (logical OR semantic) for the predicate to be satisfied.
28#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct InListPredicate {
30    /// List of acceptable values.
31    pub list: BTreeSet<Bytes>,
32}
33
34pub struct BloomFilterApplier {
35    reader: Box<dyn BloomFilterReader + Send>,
36    meta: BloomFilterMeta,
37}
38
39impl BloomFilterApplier {
40    pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
41        let meta = reader.metadata(None).await?;
42
43        Ok(Self { reader, meta })
44    }
45
46    /// Searches ranges of rows that match all the given predicates in the search ranges.
47    /// Each predicate represents an OR condition of probes, and all predicates must match (AND semantics).
48    /// The logic is: (probe1 OR probe2 OR ...) AND (probe3 OR probe4 OR ...)
49    pub async fn search(
50        &mut self,
51        predicates: &[InListPredicate],
52        search_ranges: &[Range<usize>],
53        metrics: Option<&mut BloomFilterReadMetrics>,
54    ) -> Result<Vec<Range<usize>>> {
55        if predicates.is_empty() {
56            // If no predicates, return empty result
57            return Ok(Vec::new());
58        }
59
60        let segments = self.row_ranges_to_segments(search_ranges);
61        let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
62        let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
63        Ok(intersect_ranges(search_ranges, &matching_row_ranges))
64    }
65
66    /// Converts row ranges to segment ranges and returns unique segments
67    fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
68        let rows_per_segment = self.meta.rows_per_segment as usize;
69
70        let mut segments = vec![];
71        for range in row_ranges {
72            let start_seg = range.start / rows_per_segment;
73            let mut end_seg = range.end.div_ceil(rows_per_segment);
74
75            if end_seg == self.meta.segment_loc_indices.len() + 1 {
76                // Handle legacy bug with missing last segment
77                //
78                // In a previous version, there was a bug where if the last segment was all null,
79                // this segment would not be written into the index. This caused the slice
80                // `self.meta.segment_loc_indices[start_seg..end_seg]` to go out of bounds due to
81                // the missing segment. Since the `search` function does not search for nulls,
82                // we can simply ignore the last segment in this buggy scenario.
83                end_seg -= 1;
84            }
85            segments.extend(start_seg..end_seg);
86        }
87
88        // Ensure segments are unique and sorted
89        segments.sort_unstable();
90        segments.dedup();
91
92        segments
93    }
94
95    /// Loads bloom filters for the given segments and returns the segment locations and bloom filters
96    async fn load_bloom_filters(
97        &mut self,
98        segments: &[usize],
99        metrics: Option<&mut BloomFilterReadMetrics>,
100    ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
101        let segment_locations = segments
102            .iter()
103            .map(|&seg| (self.meta.segment_loc_indices[seg], seg))
104            .collect::<Vec<_>>();
105
106        let bloom_filter_locs = segment_locations
107            .iter()
108            .map(|(loc, _)| *loc)
109            .dedup()
110            .map(|i| self.meta.bloom_filter_locs[i as usize])
111            .collect::<Vec<_>>();
112
113        let bloom_filters = self
114            .reader
115            .bloom_filter_vec(&bloom_filter_locs, metrics)
116            .await?;
117
118        Ok((segment_locations, bloom_filters))
119    }
120
121    /// Finds segments that match all predicates and converts them to row ranges
122    fn find_matching_rows(
123        &self,
124        segment_locations: Vec<(u64, usize)>,
125        bloom_filters: Vec<BloomFilter>,
126        predicates: &[InListPredicate],
127    ) -> Vec<Range<usize>> {
128        let rows_per_segment = self.meta.rows_per_segment as usize;
129        let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
130
131        // Group segments by their location index (since they have the same bloom filter) and check if they match all predicates
132        for ((_loc_index, group), bloom_filter) in segment_locations
133            .into_iter()
134            .chunk_by(|(loc, _)| *loc)
135            .into_iter()
136            .zip(bloom_filters.iter())
137        {
138            // Check if this bloom filter matches each predicate (AND semantics)
139            let matches_all_predicates = predicates.iter().all(|predicate| {
140                // For each predicate, at least one probe must match (OR semantics)
141                predicate
142                    .list
143                    .iter()
144                    .any(|probe| bloom_filter.contains(probe))
145            });
146
147            if !matches_all_predicates {
148                continue;
149            }
150
151            // For each matching segment, convert to row range
152            for (_, segment) in group {
153                let start_row = segment * rows_per_segment;
154                let end_row = (segment + 1) * rows_per_segment;
155                matching_row_ranges.push(start_row..end_row);
156            }
157        }
158
159        self.merge_adjacent_ranges(matching_row_ranges)
160    }
161
162    /// Merges adjacent row ranges to reduce the number of ranges
163    fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
164        ranges
165            .into_iter()
166            .coalesce(|prev, next| {
167                if prev.end == next.start {
168                    Ok(prev.start..next.end)
169                } else {
170                    Err((prev, next))
171                }
172            })
173            .collect::<Vec<_>>()
174    }
175}
176
177/// Intersects two lists of ranges and returns the intersection.
178///
179/// The input lists are assumed to be sorted and non-overlapping.
180fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
181    let mut i = 0;
182    let mut j = 0;
183
184    let mut output = Vec::new();
185    while i < lhs.len() && j < rhs.len() {
186        let r1 = &lhs[i];
187        let r2 = &rhs[j];
188
189        // Find intersection if exists
190        let start = r1.start.max(r2.start);
191        let end = r1.end.min(r2.end);
192
193        if start < end {
194            output.push(start..end);
195        }
196
197        // Move forward the range that ends first
198        if r1.end < r2.end {
199            i += 1;
200        } else {
201            j += 1;
202        }
203    }
204
205    output
206}
207
208#[cfg(test)]
209mod tests {
210    use std::sync::Arc;
211    use std::sync::atomic::AtomicUsize;
212
213    use futures::io::Cursor;
214
215    use super::*;
216    use crate::bloom_filter::creator::BloomFilterCreator;
217    use crate::bloom_filter::reader::BloomFilterReaderImpl;
218    use crate::external_provider::MockExternalTempFileProvider;
219
220    #[tokio::test]
221    #[allow(clippy::single_range_in_vec_init)]
222    async fn test_appliter() {
223        let mut writer = Cursor::new(Vec::new());
224        let mut creator = BloomFilterCreator::new(
225            4,
226            0.01,
227            Arc::new(MockExternalTempFileProvider::new()),
228            Arc::new(AtomicUsize::new(0)),
229            None,
230        );
231
232        let rows = vec![
233            // seg 0
234            vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
235            vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
236            vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
237            vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
238            // seg 1
239            vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
240            vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
241            vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
242            vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
243            // seg 2
244            vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
245            vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
246            vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
247            vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
248            // duplicate rows
249            // seg 3
250            vec![b"dup".to_vec()],
251            vec![b"dup".to_vec()],
252            vec![b"dup".to_vec()],
253            vec![b"dup".to_vec()],
254            // seg 4
255            vec![b"dup".to_vec()],
256            vec![b"dup".to_vec()],
257            vec![b"dup".to_vec()],
258            vec![b"dup".to_vec()],
259            // seg 5
260            vec![b"dup".to_vec()],
261            vec![b"dup".to_vec()],
262            vec![b"dup".to_vec()],
263            vec![b"dup".to_vec()],
264            // seg 6
265            vec![b"dup".to_vec()],
266            vec![b"dup".to_vec()],
267            vec![b"dup".to_vec()],
268            vec![b"dup".to_vec()],
269        ];
270
271        for row in rows {
272            creator.push_row_elems(row).await.unwrap();
273        }
274
275        creator.finish(&mut writer).await.unwrap();
276
277        let bytes = writer.into_inner();
278        let reader = BloomFilterReaderImpl::new(bytes);
279        let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
280
281        // Test cases for predicates
282        let cases = vec![
283            // Single value predicates
284            (
285                vec![InListPredicate {
286                    list: BTreeSet::from_iter([b"row00".to_vec()]),
287                }],
288                0..28,
289                vec![0..4],
290            ),
291            (
292                vec![InListPredicate {
293                    list: BTreeSet::from_iter([b"row05".to_vec()]),
294                }],
295                4..8,
296                vec![4..8],
297            ),
298            (
299                vec![InListPredicate {
300                    list: BTreeSet::from_iter([b"row03".to_vec()]),
301                }],
302                4..8,
303                vec![],
304            ),
305            // Multiple values in a single predicate (OR logic)
306            (
307                vec![InListPredicate {
308                    list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
309                }],
310                0..28,
311                vec![0..8],
312            ),
313            (
314                vec![InListPredicate {
315                    list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
316                }],
317                0..28,
318                vec![4..12],
319            ),
320            // Non-existent values
321            (
322                vec![InListPredicate {
323                    list: BTreeSet::from_iter([b"row99".to_vec()]),
324                }],
325                0..28,
326                vec![],
327            ),
328            // Empty range
329            (
330                vec![InListPredicate {
331                    list: BTreeSet::from_iter([b"row00".to_vec()]),
332                }],
333                12..12,
334                vec![],
335            ),
336            // Multiple values in a single predicate within specific ranges
337            (
338                vec![InListPredicate {
339                    list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
340                }],
341                0..12,
342                vec![4..8],
343            ),
344            (
345                vec![InListPredicate {
346                    list: BTreeSet::from_iter([b"seg01".to_vec()]),
347                }],
348                0..28,
349                vec![4..8],
350            ),
351            (
352                vec![InListPredicate {
353                    list: BTreeSet::from_iter([b"seg01".to_vec()]),
354                }],
355                6..28,
356                vec![6..8],
357            ),
358            // Values spanning multiple segments
359            (
360                vec![InListPredicate {
361                    list: BTreeSet::from_iter([b"overl".to_vec()]),
362                }],
363                0..28,
364                vec![0..8],
365            ),
366            (
367                vec![InListPredicate {
368                    list: BTreeSet::from_iter([b"overl".to_vec()]),
369                }],
370                2..28,
371                vec![2..8],
372            ),
373            (
374                vec![InListPredicate {
375                    list: BTreeSet::from_iter([b"overp".to_vec()]),
376                }],
377                0..10,
378                vec![4..10],
379            ),
380            // Duplicate values
381            (
382                vec![InListPredicate {
383                    list: BTreeSet::from_iter([b"dup".to_vec()]),
384                }],
385                0..12,
386                vec![],
387            ),
388            (
389                vec![InListPredicate {
390                    list: BTreeSet::from_iter([b"dup".to_vec()]),
391                }],
392                0..16,
393                vec![12..16],
394            ),
395            (
396                vec![InListPredicate {
397                    list: BTreeSet::from_iter([b"dup".to_vec()]),
398                }],
399                0..28,
400                vec![12..28],
401            ),
402            // Multiple predicates (AND logic)
403            (
404                vec![
405                    InListPredicate {
406                        list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
407                    },
408                    InListPredicate {
409                        list: BTreeSet::from_iter([b"seg00".to_vec()]),
410                    },
411                ],
412                0..28,
413                vec![0..4],
414            ),
415            (
416                vec![
417                    InListPredicate {
418                        list: BTreeSet::from_iter([b"overl".to_vec()]),
419                    },
420                    InListPredicate {
421                        list: BTreeSet::from_iter([b"seg01".to_vec()]),
422                    },
423                ],
424                0..28,
425                vec![4..8],
426            ),
427        ];
428
429        for (predicates, search_range, expected) in cases {
430            let result = applier
431                .search(&predicates, &[search_range], None)
432                .await
433                .unwrap();
434            assert_eq!(
435                result, expected,
436                "Expected {:?}, got {:?}",
437                expected, result
438            );
439        }
440    }
441
442    #[test]
443    #[allow(clippy::single_range_in_vec_init)]
444    fn test_intersect_ranges() {
445        // empty inputs
446        assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
447        assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
448        assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
449
450        // no overlap
451        assert_eq!(
452            intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
453            Vec::<Range<usize>>::new()
454        );
455
456        // single overlap
457        assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
458
459        // multiple overlaps
460        assert_eq!(
461            intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
462            vec![2..5, 8..10, 12..13]
463        );
464
465        // exact overlap
466        assert_eq!(
467            intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
468            vec![1..3, 5..7]
469        );
470
471        // contained ranges
472        assert_eq!(
473            intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
474            vec![2..4, 5..7, 8..9]
475        );
476
477        // partial overlaps
478        assert_eq!(
479            intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
480            vec![2..4, 6..7, 8..9]
481        );
482
483        // single point overlap
484        assert_eq!(
485            intersect_ranges(&[1..3], &[3..5]),
486            Vec::<Range<usize>>::new()
487        );
488
489        // large ranges
490        assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
491    }
492}