1use std::collections::BTreeSet;
16use std::ops::Range;
17
18use fastbloom::BloomFilter;
19use greptime_proto::v1::index::BloomFilterMeta;
20use itertools::Itertools;
21
22use crate::Bytes;
23use crate::bloom_filter::error::Result;
24use crate::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct InListPredicate {
30 pub list: BTreeSet<Bytes>,
32}
33
34pub struct BloomFilterApplier {
35 reader: Box<dyn BloomFilterReader + Send>,
36 meta: BloomFilterMeta,
37}
38
39impl BloomFilterApplier {
40 pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
41 let meta = reader.metadata(None).await?;
42
43 Ok(Self { reader, meta })
44 }
45
46 pub async fn search(
50 &mut self,
51 predicates: &[InListPredicate],
52 search_ranges: &[Range<usize>],
53 metrics: Option<&mut BloomFilterReadMetrics>,
54 ) -> Result<Vec<Range<usize>>> {
55 if predicates.is_empty() {
56 return Ok(Vec::new());
58 }
59
60 let segments = self.row_ranges_to_segments(search_ranges);
61 let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments, metrics).await?;
62 let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
63 Ok(intersect_ranges(search_ranges, &matching_row_ranges))
64 }
65
66 fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
68 let rows_per_segment = self.meta.rows_per_segment as usize;
69
70 let mut segments = vec![];
71 for range in row_ranges {
72 let start_seg = range.start / rows_per_segment;
73 let mut end_seg = range.end.div_ceil(rows_per_segment);
74
75 if end_seg == self.meta.segment_loc_indices.len() + 1 {
76 end_seg -= 1;
84 }
85 segments.extend(start_seg..end_seg);
86 }
87
88 segments.sort_unstable();
90 segments.dedup();
91
92 segments
93 }
94
95 async fn load_bloom_filters(
97 &mut self,
98 segments: &[usize],
99 metrics: Option<&mut BloomFilterReadMetrics>,
100 ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
101 let segment_locations = segments
102 .iter()
103 .map(|&seg| (self.meta.segment_loc_indices[seg], seg))
104 .collect::<Vec<_>>();
105
106 let bloom_filter_locs = segment_locations
107 .iter()
108 .map(|(loc, _)| *loc)
109 .dedup()
110 .map(|i| self.meta.bloom_filter_locs[i as usize])
111 .collect::<Vec<_>>();
112
113 let bloom_filters = self
114 .reader
115 .bloom_filter_vec(&bloom_filter_locs, metrics)
116 .await?;
117
118 Ok((segment_locations, bloom_filters))
119 }
120
121 fn find_matching_rows(
123 &self,
124 segment_locations: Vec<(u64, usize)>,
125 bloom_filters: Vec<BloomFilter>,
126 predicates: &[InListPredicate],
127 ) -> Vec<Range<usize>> {
128 let rows_per_segment = self.meta.rows_per_segment as usize;
129 let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
130
131 for ((_loc_index, group), bloom_filter) in segment_locations
133 .into_iter()
134 .chunk_by(|(loc, _)| *loc)
135 .into_iter()
136 .zip(bloom_filters.iter())
137 {
138 let matches_all_predicates = predicates.iter().all(|predicate| {
140 predicate
142 .list
143 .iter()
144 .any(|probe| bloom_filter.contains(probe))
145 });
146
147 if !matches_all_predicates {
148 continue;
149 }
150
151 for (_, segment) in group {
153 let start_row = segment * rows_per_segment;
154 let end_row = (segment + 1) * rows_per_segment;
155 matching_row_ranges.push(start_row..end_row);
156 }
157 }
158
159 self.merge_adjacent_ranges(matching_row_ranges)
160 }
161
162 fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
164 ranges
165 .into_iter()
166 .coalesce(|prev, next| {
167 if prev.end == next.start {
168 Ok(prev.start..next.end)
169 } else {
170 Err((prev, next))
171 }
172 })
173 .collect::<Vec<_>>()
174 }
175}
176
177fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
181 let mut i = 0;
182 let mut j = 0;
183
184 let mut output = Vec::new();
185 while i < lhs.len() && j < rhs.len() {
186 let r1 = &lhs[i];
187 let r2 = &rhs[j];
188
189 let start = r1.start.max(r2.start);
191 let end = r1.end.min(r2.end);
192
193 if start < end {
194 output.push(start..end);
195 }
196
197 if r1.end < r2.end {
199 i += 1;
200 } else {
201 j += 1;
202 }
203 }
204
205 output
206}
207
208#[cfg(test)]
209mod tests {
210 use std::sync::Arc;
211 use std::sync::atomic::AtomicUsize;
212
213 use futures::io::Cursor;
214
215 use super::*;
216 use crate::bloom_filter::creator::BloomFilterCreator;
217 use crate::bloom_filter::reader::BloomFilterReaderImpl;
218 use crate::external_provider::MockExternalTempFileProvider;
219
220 #[tokio::test]
221 #[allow(clippy::single_range_in_vec_init)]
222 async fn test_appliter() {
223 let mut writer = Cursor::new(Vec::new());
224 let mut creator = BloomFilterCreator::new(
225 4,
226 0.01,
227 Arc::new(MockExternalTempFileProvider::new()),
228 Arc::new(AtomicUsize::new(0)),
229 None,
230 );
231
232 let rows = vec![
233 vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
235 vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
236 vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
237 vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
238 vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
240 vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
241 vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
242 vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
243 vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
245 vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
246 vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
247 vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
248 vec![b"dup".to_vec()],
251 vec![b"dup".to_vec()],
252 vec![b"dup".to_vec()],
253 vec![b"dup".to_vec()],
254 vec![b"dup".to_vec()],
256 vec![b"dup".to_vec()],
257 vec![b"dup".to_vec()],
258 vec![b"dup".to_vec()],
259 vec![b"dup".to_vec()],
261 vec![b"dup".to_vec()],
262 vec![b"dup".to_vec()],
263 vec![b"dup".to_vec()],
264 vec![b"dup".to_vec()],
266 vec![b"dup".to_vec()],
267 vec![b"dup".to_vec()],
268 vec![b"dup".to_vec()],
269 ];
270
271 for row in rows {
272 creator.push_row_elems(row).await.unwrap();
273 }
274
275 creator.finish(&mut writer).await.unwrap();
276
277 let bytes = writer.into_inner();
278 let reader = BloomFilterReaderImpl::new(bytes);
279 let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
280
281 let cases = vec![
283 (
285 vec![InListPredicate {
286 list: BTreeSet::from_iter([b"row00".to_vec()]),
287 }],
288 0..28,
289 vec![0..4],
290 ),
291 (
292 vec![InListPredicate {
293 list: BTreeSet::from_iter([b"row05".to_vec()]),
294 }],
295 4..8,
296 vec![4..8],
297 ),
298 (
299 vec![InListPredicate {
300 list: BTreeSet::from_iter([b"row03".to_vec()]),
301 }],
302 4..8,
303 vec![],
304 ),
305 (
307 vec![InListPredicate {
308 list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
309 }],
310 0..28,
311 vec![0..8],
312 ),
313 (
314 vec![InListPredicate {
315 list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
316 }],
317 0..28,
318 vec![4..12],
319 ),
320 (
322 vec![InListPredicate {
323 list: BTreeSet::from_iter([b"row99".to_vec()]),
324 }],
325 0..28,
326 vec![],
327 ),
328 (
330 vec![InListPredicate {
331 list: BTreeSet::from_iter([b"row00".to_vec()]),
332 }],
333 12..12,
334 vec![],
335 ),
336 (
338 vec![InListPredicate {
339 list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
340 }],
341 0..12,
342 vec![4..8],
343 ),
344 (
345 vec![InListPredicate {
346 list: BTreeSet::from_iter([b"seg01".to_vec()]),
347 }],
348 0..28,
349 vec![4..8],
350 ),
351 (
352 vec![InListPredicate {
353 list: BTreeSet::from_iter([b"seg01".to_vec()]),
354 }],
355 6..28,
356 vec![6..8],
357 ),
358 (
360 vec![InListPredicate {
361 list: BTreeSet::from_iter([b"overl".to_vec()]),
362 }],
363 0..28,
364 vec![0..8],
365 ),
366 (
367 vec![InListPredicate {
368 list: BTreeSet::from_iter([b"overl".to_vec()]),
369 }],
370 2..28,
371 vec![2..8],
372 ),
373 (
374 vec![InListPredicate {
375 list: BTreeSet::from_iter([b"overp".to_vec()]),
376 }],
377 0..10,
378 vec![4..10],
379 ),
380 (
382 vec![InListPredicate {
383 list: BTreeSet::from_iter([b"dup".to_vec()]),
384 }],
385 0..12,
386 vec![],
387 ),
388 (
389 vec![InListPredicate {
390 list: BTreeSet::from_iter([b"dup".to_vec()]),
391 }],
392 0..16,
393 vec![12..16],
394 ),
395 (
396 vec![InListPredicate {
397 list: BTreeSet::from_iter([b"dup".to_vec()]),
398 }],
399 0..28,
400 vec![12..28],
401 ),
402 (
404 vec![
405 InListPredicate {
406 list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
407 },
408 InListPredicate {
409 list: BTreeSet::from_iter([b"seg00".to_vec()]),
410 },
411 ],
412 0..28,
413 vec![0..4],
414 ),
415 (
416 vec![
417 InListPredicate {
418 list: BTreeSet::from_iter([b"overl".to_vec()]),
419 },
420 InListPredicate {
421 list: BTreeSet::from_iter([b"seg01".to_vec()]),
422 },
423 ],
424 0..28,
425 vec![4..8],
426 ),
427 ];
428
429 for (predicates, search_range, expected) in cases {
430 let result = applier
431 .search(&predicates, &[search_range], None)
432 .await
433 .unwrap();
434 assert_eq!(
435 result, expected,
436 "Expected {:?}, got {:?}",
437 expected, result
438 );
439 }
440 }
441
442 #[test]
443 #[allow(clippy::single_range_in_vec_init)]
444 fn test_intersect_ranges() {
445 assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
447 assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
448 assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
449
450 assert_eq!(
452 intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
453 Vec::<Range<usize>>::new()
454 );
455
456 assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
458
459 assert_eq!(
461 intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
462 vec![2..5, 8..10, 12..13]
463 );
464
465 assert_eq!(
467 intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
468 vec![1..3, 5..7]
469 );
470
471 assert_eq!(
473 intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
474 vec![2..4, 5..7, 8..9]
475 );
476
477 assert_eq!(
479 intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
480 vec![2..4, 6..7, 8..9]
481 );
482
483 assert_eq!(
485 intersect_ranges(&[1..3], &[3..5]),
486 Vec::<Range<usize>>::new()
487 );
488
489 assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
491 }
492}