1use std::collections::HashSet;
16use std::ops::Range;
17
18use fastbloom::BloomFilter;
19use greptime_proto::v1::index::BloomFilterMeta;
20use itertools::Itertools;
21
22use crate::bloom_filter::error::Result;
23use crate::bloom_filter::reader::BloomFilterReader;
24use crate::Bytes;
25
26#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct InListPredicate {
30 pub list: HashSet<Bytes>,
32}
33
34pub struct BloomFilterApplier {
35 reader: Box<dyn BloomFilterReader + Send>,
36 meta: BloomFilterMeta,
37}
38
39impl BloomFilterApplier {
40 pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
41 let meta = reader.metadata().await?;
42
43 Ok(Self { reader, meta })
44 }
45
46 pub async fn search(
50 &mut self,
51 predicates: &[InListPredicate],
52 search_ranges: &[Range<usize>],
53 ) -> Result<Vec<Range<usize>>> {
54 if predicates.is_empty() {
55 return Ok(Vec::new());
57 }
58
59 let segments = self.row_ranges_to_segments(search_ranges);
60 let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
61 let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
62 Ok(intersect_ranges(search_ranges, &matching_row_ranges))
63 }
64
65 fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
67 let rows_per_segment = self.meta.rows_per_segment as usize;
68
69 let mut segments = vec![];
70 for range in row_ranges {
71 let start_seg = range.start / rows_per_segment;
72 let mut end_seg = range.end.div_ceil(rows_per_segment);
73
74 if end_seg == self.meta.segment_loc_indices.len() + 1 {
75 end_seg -= 1;
83 }
84 segments.extend(start_seg..end_seg);
85 }
86
87 segments.sort_unstable();
89 segments.dedup();
90
91 segments
92 }
93
94 async fn load_bloom_filters(
96 &mut self,
97 segments: &[usize],
98 ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
99 let segment_locations = segments
100 .iter()
101 .map(|&seg| (self.meta.segment_loc_indices[seg], seg))
102 .collect::<Vec<_>>();
103
104 let bloom_filter_locs = segment_locations
105 .iter()
106 .map(|(loc, _)| *loc)
107 .dedup()
108 .map(|i| self.meta.bloom_filter_locs[i as usize])
109 .collect::<Vec<_>>();
110
111 let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
112
113 Ok((segment_locations, bloom_filters))
114 }
115
116 fn find_matching_rows(
118 &self,
119 segment_locations: Vec<(u64, usize)>,
120 bloom_filters: Vec<BloomFilter>,
121 predicates: &[InListPredicate],
122 ) -> Vec<Range<usize>> {
123 let rows_per_segment = self.meta.rows_per_segment as usize;
124 let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
125
126 for ((_loc_index, group), bloom_filter) in segment_locations
128 .into_iter()
129 .chunk_by(|(loc, _)| *loc)
130 .into_iter()
131 .zip(bloom_filters.iter())
132 {
133 let matches_all_predicates = predicates.iter().all(|predicate| {
135 predicate
137 .list
138 .iter()
139 .any(|probe| bloom_filter.contains(probe))
140 });
141
142 if !matches_all_predicates {
143 continue;
144 }
145
146 for (_, segment) in group {
148 let start_row = segment * rows_per_segment;
149 let end_row = (segment + 1) * rows_per_segment;
150 matching_row_ranges.push(start_row..end_row);
151 }
152 }
153
154 self.merge_adjacent_ranges(matching_row_ranges)
155 }
156
157 fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
159 ranges
160 .into_iter()
161 .coalesce(|prev, next| {
162 if prev.end == next.start {
163 Ok(prev.start..next.end)
164 } else {
165 Err((prev, next))
166 }
167 })
168 .collect::<Vec<_>>()
169 }
170}
171
172fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
176 let mut i = 0;
177 let mut j = 0;
178
179 let mut output = Vec::new();
180 while i < lhs.len() && j < rhs.len() {
181 let r1 = &lhs[i];
182 let r2 = &rhs[j];
183
184 let start = r1.start.max(r2.start);
186 let end = r1.end.min(r2.end);
187
188 if start < end {
189 output.push(start..end);
190 }
191
192 if r1.end < r2.end {
194 i += 1;
195 } else {
196 j += 1;
197 }
198 }
199
200 output
201}
202
203#[cfg(test)]
204mod tests {
205 use std::sync::atomic::AtomicUsize;
206 use std::sync::Arc;
207
208 use futures::io::Cursor;
209
210 use super::*;
211 use crate::bloom_filter::creator::BloomFilterCreator;
212 use crate::bloom_filter::reader::BloomFilterReaderImpl;
213 use crate::external_provider::MockExternalTempFileProvider;
214
215 #[tokio::test]
216 #[allow(clippy::single_range_in_vec_init)]
217 async fn test_appliter() {
218 let mut writer = Cursor::new(Vec::new());
219 let mut creator = BloomFilterCreator::new(
220 4,
221 Arc::new(MockExternalTempFileProvider::new()),
222 Arc::new(AtomicUsize::new(0)),
223 None,
224 );
225
226 let rows = vec![
227 vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
229 vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
230 vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
231 vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
232 vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
234 vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
235 vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
236 vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
237 vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
239 vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
240 vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
241 vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
242 vec![b"dup".to_vec()],
245 vec![b"dup".to_vec()],
246 vec![b"dup".to_vec()],
247 vec![b"dup".to_vec()],
248 vec![b"dup".to_vec()],
250 vec![b"dup".to_vec()],
251 vec![b"dup".to_vec()],
252 vec![b"dup".to_vec()],
253 vec![b"dup".to_vec()],
255 vec![b"dup".to_vec()],
256 vec![b"dup".to_vec()],
257 vec![b"dup".to_vec()],
258 vec![b"dup".to_vec()],
260 vec![b"dup".to_vec()],
261 vec![b"dup".to_vec()],
262 vec![b"dup".to_vec()],
263 ];
264
265 for row in rows {
266 creator.push_row_elems(row).await.unwrap();
267 }
268
269 creator.finish(&mut writer).await.unwrap();
270
271 let bytes = writer.into_inner();
272 let reader = BloomFilterReaderImpl::new(bytes);
273 let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
274
275 let cases = vec![
277 (
279 vec![InListPredicate {
280 list: HashSet::from_iter([b"row00".to_vec()]),
281 }],
282 0..28,
283 vec![0..4],
284 ),
285 (
286 vec![InListPredicate {
287 list: HashSet::from_iter([b"row05".to_vec()]),
288 }],
289 4..8,
290 vec![4..8],
291 ),
292 (
293 vec![InListPredicate {
294 list: HashSet::from_iter([b"row03".to_vec()]),
295 }],
296 4..8,
297 vec![],
298 ),
299 (
301 vec![InListPredicate {
302 list: HashSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
303 }],
304 0..28,
305 vec![0..8],
306 ),
307 (
308 vec![InListPredicate {
309 list: HashSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
310 }],
311 0..28,
312 vec![4..12],
313 ),
314 (
316 vec![InListPredicate {
317 list: HashSet::from_iter([b"row99".to_vec()]),
318 }],
319 0..28,
320 vec![],
321 ),
322 (
324 vec![InListPredicate {
325 list: HashSet::from_iter([b"row00".to_vec()]),
326 }],
327 12..12,
328 vec![],
329 ),
330 (
332 vec![InListPredicate {
333 list: HashSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
334 }],
335 0..12,
336 vec![4..8],
337 ),
338 (
339 vec![InListPredicate {
340 list: HashSet::from_iter([b"seg01".to_vec()]),
341 }],
342 0..28,
343 vec![4..8],
344 ),
345 (
346 vec![InListPredicate {
347 list: HashSet::from_iter([b"seg01".to_vec()]),
348 }],
349 6..28,
350 vec![6..8],
351 ),
352 (
354 vec![InListPredicate {
355 list: HashSet::from_iter([b"overl".to_vec()]),
356 }],
357 0..28,
358 vec![0..8],
359 ),
360 (
361 vec![InListPredicate {
362 list: HashSet::from_iter([b"overl".to_vec()]),
363 }],
364 2..28,
365 vec![2..8],
366 ),
367 (
368 vec![InListPredicate {
369 list: HashSet::from_iter([b"overp".to_vec()]),
370 }],
371 0..10,
372 vec![4..10],
373 ),
374 (
376 vec![InListPredicate {
377 list: HashSet::from_iter([b"dup".to_vec()]),
378 }],
379 0..12,
380 vec![],
381 ),
382 (
383 vec![InListPredicate {
384 list: HashSet::from_iter([b"dup".to_vec()]),
385 }],
386 0..16,
387 vec![12..16],
388 ),
389 (
390 vec![InListPredicate {
391 list: HashSet::from_iter([b"dup".to_vec()]),
392 }],
393 0..28,
394 vec![12..28],
395 ),
396 (
398 vec![
399 InListPredicate {
400 list: HashSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
401 },
402 InListPredicate {
403 list: HashSet::from_iter([b"seg00".to_vec()]),
404 },
405 ],
406 0..28,
407 vec![0..4],
408 ),
409 (
410 vec![
411 InListPredicate {
412 list: HashSet::from_iter([b"overl".to_vec()]),
413 },
414 InListPredicate {
415 list: HashSet::from_iter([b"seg01".to_vec()]),
416 },
417 ],
418 0..28,
419 vec![4..8],
420 ),
421 ];
422
423 for (predicates, search_range, expected) in cases {
424 let result = applier.search(&predicates, &[search_range]).await.unwrap();
425 assert_eq!(
426 result, expected,
427 "Expected {:?}, got {:?}",
428 expected, result
429 );
430 }
431 }
432
433 #[test]
434 #[allow(clippy::single_range_in_vec_init)]
435 fn test_intersect_ranges() {
436 assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
438 assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
439 assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
440
441 assert_eq!(
443 intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
444 Vec::<Range<usize>>::new()
445 );
446
447 assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
449
450 assert_eq!(
452 intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
453 vec![2..5, 8..10, 12..13]
454 );
455
456 assert_eq!(
458 intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
459 vec![1..3, 5..7]
460 );
461
462 assert_eq!(
464 intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
465 vec![2..4, 5..7, 8..9]
466 );
467
468 assert_eq!(
470 intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
471 vec![2..4, 6..7, 8..9]
472 );
473
474 assert_eq!(
476 intersect_ranges(&[1..3], &[3..5]),
477 Vec::<Range<usize>>::new()
478 );
479
480 assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
482 }
483}