1use std::collections::BTreeSet;
16use std::ops::Range;
17
18use fastbloom::BloomFilter;
19use greptime_proto::v1::index::BloomFilterMeta;
20use itertools::Itertools;
21
22use crate::bloom_filter::error::Result;
23use crate::bloom_filter::reader::BloomFilterReader;
24use crate::Bytes;
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash)]
29pub struct InListPredicate {
30 pub list: BTreeSet<Bytes>,
32}
33
34pub struct BloomFilterApplier {
35 reader: Box<dyn BloomFilterReader + Send>,
36 meta: BloomFilterMeta,
37}
38
39impl BloomFilterApplier {
40 pub async fn new(reader: Box<dyn BloomFilterReader + Send>) -> Result<Self> {
41 let meta = reader.metadata().await?;
42
43 Ok(Self { reader, meta })
44 }
45
46 pub async fn search(
50 &mut self,
51 predicates: &[InListPredicate],
52 search_ranges: &[Range<usize>],
53 ) -> Result<Vec<Range<usize>>> {
54 if predicates.is_empty() {
55 return Ok(Vec::new());
57 }
58
59 let segments = self.row_ranges_to_segments(search_ranges);
60 let (seg_locations, bloom_filters) = self.load_bloom_filters(&segments).await?;
61 let matching_row_ranges = self.find_matching_rows(seg_locations, bloom_filters, predicates);
62 Ok(intersect_ranges(search_ranges, &matching_row_ranges))
63 }
64
65 fn row_ranges_to_segments(&self, row_ranges: &[Range<usize>]) -> Vec<usize> {
67 let rows_per_segment = self.meta.rows_per_segment as usize;
68
69 let mut segments = vec![];
70 for range in row_ranges {
71 let start_seg = range.start / rows_per_segment;
72 let mut end_seg = range.end.div_ceil(rows_per_segment);
73
74 if end_seg == self.meta.segment_loc_indices.len() + 1 {
75 end_seg -= 1;
83 }
84 segments.extend(start_seg..end_seg);
85 }
86
87 segments.sort_unstable();
89 segments.dedup();
90
91 segments
92 }
93
94 async fn load_bloom_filters(
96 &mut self,
97 segments: &[usize],
98 ) -> Result<(Vec<(u64, usize)>, Vec<BloomFilter>)> {
99 let segment_locations = segments
100 .iter()
101 .map(|&seg| (self.meta.segment_loc_indices[seg], seg))
102 .collect::<Vec<_>>();
103
104 let bloom_filter_locs = segment_locations
105 .iter()
106 .map(|(loc, _)| *loc)
107 .dedup()
108 .map(|i| self.meta.bloom_filter_locs[i as usize])
109 .collect::<Vec<_>>();
110
111 let bloom_filters = self.reader.bloom_filter_vec(&bloom_filter_locs).await?;
112
113 Ok((segment_locations, bloom_filters))
114 }
115
116 fn find_matching_rows(
118 &self,
119 segment_locations: Vec<(u64, usize)>,
120 bloom_filters: Vec<BloomFilter>,
121 predicates: &[InListPredicate],
122 ) -> Vec<Range<usize>> {
123 let rows_per_segment = self.meta.rows_per_segment as usize;
124 let mut matching_row_ranges = Vec::with_capacity(bloom_filters.len());
125
126 for ((_loc_index, group), bloom_filter) in segment_locations
128 .into_iter()
129 .chunk_by(|(loc, _)| *loc)
130 .into_iter()
131 .zip(bloom_filters.iter())
132 {
133 let matches_all_predicates = predicates.iter().all(|predicate| {
135 predicate
137 .list
138 .iter()
139 .any(|probe| bloom_filter.contains(probe))
140 });
141
142 if !matches_all_predicates {
143 continue;
144 }
145
146 for (_, segment) in group {
148 let start_row = segment * rows_per_segment;
149 let end_row = (segment + 1) * rows_per_segment;
150 matching_row_ranges.push(start_row..end_row);
151 }
152 }
153
154 self.merge_adjacent_ranges(matching_row_ranges)
155 }
156
157 fn merge_adjacent_ranges(&self, ranges: Vec<Range<usize>>) -> Vec<Range<usize>> {
159 ranges
160 .into_iter()
161 .coalesce(|prev, next| {
162 if prev.end == next.start {
163 Ok(prev.start..next.end)
164 } else {
165 Err((prev, next))
166 }
167 })
168 .collect::<Vec<_>>()
169 }
170}
171
172fn intersect_ranges(lhs: &[Range<usize>], rhs: &[Range<usize>]) -> Vec<Range<usize>> {
176 let mut i = 0;
177 let mut j = 0;
178
179 let mut output = Vec::new();
180 while i < lhs.len() && j < rhs.len() {
181 let r1 = &lhs[i];
182 let r2 = &rhs[j];
183
184 let start = r1.start.max(r2.start);
186 let end = r1.end.min(r2.end);
187
188 if start < end {
189 output.push(start..end);
190 }
191
192 if r1.end < r2.end {
194 i += 1;
195 } else {
196 j += 1;
197 }
198 }
199
200 output
201}
202
203#[cfg(test)]
204mod tests {
205 use std::sync::atomic::AtomicUsize;
206 use std::sync::Arc;
207
208 use futures::io::Cursor;
209
210 use super::*;
211 use crate::bloom_filter::creator::BloomFilterCreator;
212 use crate::bloom_filter::reader::BloomFilterReaderImpl;
213 use crate::external_provider::MockExternalTempFileProvider;
214
215 #[tokio::test]
216 #[allow(clippy::single_range_in_vec_init)]
217 async fn test_appliter() {
218 let mut writer = Cursor::new(Vec::new());
219 let mut creator = BloomFilterCreator::new(
220 4,
221 0.01,
222 Arc::new(MockExternalTempFileProvider::new()),
223 Arc::new(AtomicUsize::new(0)),
224 None,
225 );
226
227 let rows = vec![
228 vec![b"row00".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
230 vec![b"row01".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
231 vec![b"row02".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
232 vec![b"row03".to_vec(), b"seg00".to_vec(), b"overl".to_vec()],
233 vec![b"row04".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
235 vec![b"row05".to_vec(), b"seg01".to_vec(), b"overl".to_vec()],
236 vec![b"row06".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
237 vec![b"row07".to_vec(), b"seg01".to_vec(), b"overp".to_vec()],
238 vec![b"row08".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
240 vec![b"row09".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
241 vec![b"row10".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
242 vec![b"row11".to_vec(), b"seg02".to_vec(), b"overp".to_vec()],
243 vec![b"dup".to_vec()],
246 vec![b"dup".to_vec()],
247 vec![b"dup".to_vec()],
248 vec![b"dup".to_vec()],
249 vec![b"dup".to_vec()],
251 vec![b"dup".to_vec()],
252 vec![b"dup".to_vec()],
253 vec![b"dup".to_vec()],
254 vec![b"dup".to_vec()],
256 vec![b"dup".to_vec()],
257 vec![b"dup".to_vec()],
258 vec![b"dup".to_vec()],
259 vec![b"dup".to_vec()],
261 vec![b"dup".to_vec()],
262 vec![b"dup".to_vec()],
263 vec![b"dup".to_vec()],
264 ];
265
266 for row in rows {
267 creator.push_row_elems(row).await.unwrap();
268 }
269
270 creator.finish(&mut writer).await.unwrap();
271
272 let bytes = writer.into_inner();
273 let reader = BloomFilterReaderImpl::new(bytes);
274 let mut applier = BloomFilterApplier::new(Box::new(reader)).await.unwrap();
275
276 let cases = vec![
278 (
280 vec![InListPredicate {
281 list: BTreeSet::from_iter([b"row00".to_vec()]),
282 }],
283 0..28,
284 vec![0..4],
285 ),
286 (
287 vec![InListPredicate {
288 list: BTreeSet::from_iter([b"row05".to_vec()]),
289 }],
290 4..8,
291 vec![4..8],
292 ),
293 (
294 vec![InListPredicate {
295 list: BTreeSet::from_iter([b"row03".to_vec()]),
296 }],
297 4..8,
298 vec![],
299 ),
300 (
302 vec![InListPredicate {
303 list: BTreeSet::from_iter([b"overl".to_vec(), b"row06".to_vec()]),
304 }],
305 0..28,
306 vec![0..8],
307 ),
308 (
309 vec![InListPredicate {
310 list: BTreeSet::from_iter([b"seg01".to_vec(), b"overp".to_vec()]),
311 }],
312 0..28,
313 vec![4..12],
314 ),
315 (
317 vec![InListPredicate {
318 list: BTreeSet::from_iter([b"row99".to_vec()]),
319 }],
320 0..28,
321 vec![],
322 ),
323 (
325 vec![InListPredicate {
326 list: BTreeSet::from_iter([b"row00".to_vec()]),
327 }],
328 12..12,
329 vec![],
330 ),
331 (
333 vec![InListPredicate {
334 list: BTreeSet::from_iter([b"row04".to_vec(), b"row05".to_vec()]),
335 }],
336 0..12,
337 vec![4..8],
338 ),
339 (
340 vec![InListPredicate {
341 list: BTreeSet::from_iter([b"seg01".to_vec()]),
342 }],
343 0..28,
344 vec![4..8],
345 ),
346 (
347 vec![InListPredicate {
348 list: BTreeSet::from_iter([b"seg01".to_vec()]),
349 }],
350 6..28,
351 vec![6..8],
352 ),
353 (
355 vec![InListPredicate {
356 list: BTreeSet::from_iter([b"overl".to_vec()]),
357 }],
358 0..28,
359 vec![0..8],
360 ),
361 (
362 vec![InListPredicate {
363 list: BTreeSet::from_iter([b"overl".to_vec()]),
364 }],
365 2..28,
366 vec![2..8],
367 ),
368 (
369 vec![InListPredicate {
370 list: BTreeSet::from_iter([b"overp".to_vec()]),
371 }],
372 0..10,
373 vec![4..10],
374 ),
375 (
377 vec![InListPredicate {
378 list: BTreeSet::from_iter([b"dup".to_vec()]),
379 }],
380 0..12,
381 vec![],
382 ),
383 (
384 vec![InListPredicate {
385 list: BTreeSet::from_iter([b"dup".to_vec()]),
386 }],
387 0..16,
388 vec![12..16],
389 ),
390 (
391 vec![InListPredicate {
392 list: BTreeSet::from_iter([b"dup".to_vec()]),
393 }],
394 0..28,
395 vec![12..28],
396 ),
397 (
399 vec![
400 InListPredicate {
401 list: BTreeSet::from_iter([b"row00".to_vec(), b"row01".to_vec()]),
402 },
403 InListPredicate {
404 list: BTreeSet::from_iter([b"seg00".to_vec()]),
405 },
406 ],
407 0..28,
408 vec![0..4],
409 ),
410 (
411 vec![
412 InListPredicate {
413 list: BTreeSet::from_iter([b"overl".to_vec()]),
414 },
415 InListPredicate {
416 list: BTreeSet::from_iter([b"seg01".to_vec()]),
417 },
418 ],
419 0..28,
420 vec![4..8],
421 ),
422 ];
423
424 for (predicates, search_range, expected) in cases {
425 let result = applier.search(&predicates, &[search_range]).await.unwrap();
426 assert_eq!(
427 result, expected,
428 "Expected {:?}, got {:?}",
429 expected, result
430 );
431 }
432 }
433
434 #[test]
435 #[allow(clippy::single_range_in_vec_init)]
436 fn test_intersect_ranges() {
437 assert_eq!(intersect_ranges(&[], &[]), Vec::<Range<usize>>::new());
439 assert_eq!(intersect_ranges(&[1..5], &[]), Vec::<Range<usize>>::new());
440 assert_eq!(intersect_ranges(&[], &[1..5]), Vec::<Range<usize>>::new());
441
442 assert_eq!(
444 intersect_ranges(&[1..3, 5..7], &[3..5, 7..9]),
445 Vec::<Range<usize>>::new()
446 );
447
448 assert_eq!(intersect_ranges(&[1..5], &[3..7]), vec![3..5]);
450
451 assert_eq!(
453 intersect_ranges(&[1..5, 7..10, 12..15], &[2..6, 8..13]),
454 vec![2..5, 8..10, 12..13]
455 );
456
457 assert_eq!(
459 intersect_ranges(&[1..3, 5..7], &[1..3, 5..7]),
460 vec![1..3, 5..7]
461 );
462
463 assert_eq!(
465 intersect_ranges(&[1..10], &[2..4, 5..7, 8..9]),
466 vec![2..4, 5..7, 8..9]
467 );
468
469 assert_eq!(
471 intersect_ranges(&[1..4, 6..9], &[2..7, 8..10]),
472 vec![2..4, 6..7, 8..9]
473 );
474
475 assert_eq!(
477 intersect_ranges(&[1..3], &[3..5]),
478 Vec::<Range<usize>>::new()
479 );
480
481 assert_eq!(intersect_ranges(&[0..100], &[50..150]), vec![50..100]);
483 }
484}