index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::bloom_filter::error::{IoSnafu, Result};
29use crate::bloom_filter::SEED;
30use crate::external_provider::ExternalTempFileProvider;
31use crate::Bytes;
32
33/// The false positive rate of the Bloom filter.
34pub const FALSE_POSITIVE_RATE: f64 = 0.01;
35
36/// `BloomFilterCreator` is responsible for creating and managing bloom filters
37/// for a set of elements. It divides the rows into segments and creates
38/// bloom filters for each segment.
39///
40/// # Format
41///
42/// The bloom filter creator writes the following format to the writer:
43///
44/// ```text
45/// +--------------------+--------------------+-----+----------------------+----------------------+
46/// | Bloom filter 0     | Bloom filter 1     | ... | BloomFilterMeta      | Meta size            |
47/// +--------------------+--------------------+-----+----------------------+----------------------+
48/// |<- bytes (size 0) ->|<- bytes (size 1) ->| ... |<- json (meta size) ->|<- u32 LE (4 bytes) ->|
49/// ```
50///
51pub struct BloomFilterCreator {
52    /// The number of rows per segment set by the user.
53    rows_per_segment: usize,
54
55    /// Row count that added to the bloom filter so far.
56    accumulated_row_count: usize,
57
58    /// A set of distinct elements in the current segment.
59    cur_seg_distinct_elems: HashSet<Bytes>,
60
61    /// The memory usage of the current segment's distinct elements.
62    cur_seg_distinct_elems_mem_usage: usize,
63
64    /// Storage for finalized Bloom filters.
65    finalized_bloom_filters: FinalizedBloomFilterStorage,
66
67    /// Row count that finalized so far.
68    finalized_row_count: usize,
69
70    /// Global memory usage of the bloom filter creator.
71    global_memory_usage: Arc<AtomicUsize>,
72}
73
74impl BloomFilterCreator {
75    /// Creates a new `BloomFilterCreator` with the specified number of rows per segment.
76    ///
77    /// # PANICS
78    ///
79    /// `rows_per_segment` <= 0
80    pub fn new(
81        rows_per_segment: usize,
82        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
83        global_memory_usage: Arc<AtomicUsize>,
84        global_memory_usage_threshold: Option<usize>,
85    ) -> Self {
86        assert!(
87            rows_per_segment > 0,
88            "rows_per_segment must be greater than 0"
89        );
90
91        Self {
92            rows_per_segment,
93            accumulated_row_count: 0,
94            cur_seg_distinct_elems: HashSet::default(),
95            cur_seg_distinct_elems_mem_usage: 0,
96            global_memory_usage: global_memory_usage.clone(),
97            finalized_bloom_filters: FinalizedBloomFilterStorage::new(
98                intermediate_provider,
99                global_memory_usage,
100                global_memory_usage_threshold,
101            ),
102            finalized_row_count: 0,
103        }
104    }
105
106    /// Adds multiple rows of elements to the bloom filter. If the number of accumulated rows
107    /// reaches `rows_per_segment`, it finalizes the current segment.
108    pub async fn push_n_row_elems(
109        &mut self,
110        mut nrows: usize,
111        elems: impl IntoIterator<Item = Bytes>,
112    ) -> Result<()> {
113        if nrows == 0 {
114            return Ok(());
115        }
116        if nrows == 1 {
117            return self.push_row_elems(elems).await;
118        }
119
120        let elems = elems.into_iter().collect::<Vec<_>>();
121        while nrows > 0 {
122            let rows_to_seg_end =
123                self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
124            let rows_to_push = nrows.min(rows_to_seg_end);
125            nrows -= rows_to_push;
126
127            self.accumulated_row_count += rows_to_push;
128
129            let mut mem_diff = 0;
130            for elem in &elems {
131                let len = elem.len();
132                let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
133                if is_new {
134                    mem_diff += len;
135                }
136            }
137            self.cur_seg_distinct_elems_mem_usage += mem_diff;
138            self.global_memory_usage
139                .fetch_add(mem_diff, Ordering::Relaxed);
140
141            if self.accumulated_row_count % self.rows_per_segment == 0 {
142                self.finalize_segment().await?;
143                self.finalized_row_count = self.accumulated_row_count;
144            }
145        }
146
147        Ok(())
148    }
149
150    /// Adds a row of elements to the bloom filter. If the number of accumulated rows
151    /// reaches `rows_per_segment`, it finalizes the current segment.
152    pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
153        self.accumulated_row_count += 1;
154
155        let mut mem_diff = 0;
156        for elem in elems.into_iter() {
157            let len = elem.len();
158            let is_new = self.cur_seg_distinct_elems.insert(elem);
159            if is_new {
160                mem_diff += len;
161            }
162        }
163        self.cur_seg_distinct_elems_mem_usage += mem_diff;
164        self.global_memory_usage
165            .fetch_add(mem_diff, Ordering::Relaxed);
166
167        if self.accumulated_row_count % self.rows_per_segment == 0 {
168            self.finalize_segment().await?;
169            self.finalized_row_count = self.accumulated_row_count;
170        }
171
172        Ok(())
173    }
174
175    /// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
176    pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
177        if self.accumulated_row_count > self.finalized_row_count {
178            self.finalize_segment().await?;
179        }
180
181        let mut meta = BloomFilterMeta {
182            rows_per_segment: self.rows_per_segment as _,
183            row_count: self.accumulated_row_count as _,
184            ..Default::default()
185        };
186
187        let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
188        meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
189        meta.segment_count = meta.segment_loc_indices.len() as _;
190
191        while let Some(segment) = segs.next().await {
192            let segment = segment?;
193            writer
194                .write_all(&segment.bloom_filter_bytes)
195                .await
196                .context(IoSnafu)?;
197
198            let size = segment.bloom_filter_bytes.len() as u64;
199            meta.bloom_filter_locs.push(BloomFilterLoc {
200                offset: meta.bloom_filter_size as _,
201                size,
202                element_count: segment.element_count as _,
203            });
204            meta.bloom_filter_size += size;
205        }
206
207        let meta_bytes = meta.encode_to_vec();
208        writer.write_all(&meta_bytes).await.context(IoSnafu)?;
209
210        let meta_size = meta_bytes.len() as u32;
211        writer
212            .write_all(&meta_size.to_le_bytes())
213            .await
214            .context(IoSnafu)?;
215        writer.flush().await.unwrap();
216
217        Ok(())
218    }
219
220    /// Returns the memory usage of the creating bloom filter.
221    pub fn memory_usage(&self) -> usize {
222        self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
223    }
224
225    async fn finalize_segment(&mut self) -> Result<()> {
226        let elem_count = self.cur_seg_distinct_elems.len();
227        self.finalized_bloom_filters
228            .add(self.cur_seg_distinct_elems.drain(), elem_count)
229            .await?;
230
231        self.global_memory_usage
232            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
233        self.cur_seg_distinct_elems_mem_usage = 0;
234        Ok(())
235    }
236}
237
238impl Drop for BloomFilterCreator {
239    fn drop(&mut self) {
240        self.global_memory_usage
241            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use fastbloom::BloomFilter;
248    use futures::io::Cursor;
249
250    use super::*;
251    use crate::external_provider::MockExternalTempFileProvider;
252
253    /// Converts a slice of bytes to a vector of `u64`.
254    pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
255        bytes
256            .chunks_exact(std::mem::size_of::<u64>())
257            .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
258            .collect()
259    }
260
261    #[tokio::test]
262    async fn test_bloom_filter_creator() {
263        let mut writer = Cursor::new(Vec::new());
264        let mut creator = BloomFilterCreator::new(
265            2,
266            Arc::new(MockExternalTempFileProvider::new()),
267            Arc::new(AtomicUsize::new(0)),
268            None,
269        );
270
271        creator
272            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
273            .await
274            .unwrap();
275        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
276        assert!(creator.memory_usage() > 0);
277
278        creator
279            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
280            .await
281            .unwrap();
282        // Finalize the first segment
283        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
284        assert!(creator.memory_usage() > 0);
285
286        creator
287            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
288            .await
289            .unwrap();
290        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
291        assert!(creator.memory_usage() > 0);
292
293        creator.finish(&mut writer).await.unwrap();
294
295        let bytes = writer.into_inner();
296        let total_size = bytes.len();
297        let meta_size_offset = total_size - 4;
298        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
299
300        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
301        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
302
303        assert_eq!(meta.rows_per_segment, 2);
304        assert_eq!(meta.segment_count, 2);
305        assert_eq!(meta.row_count, 3);
306        assert_eq!(
307            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
308            total_size
309        );
310
311        let mut bfs = Vec::new();
312        for segment in meta.bloom_filter_locs {
313            let bloom_filter_bytes =
314                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
315            let v = u64_vec_from_bytes(bloom_filter_bytes);
316            let bloom_filter = BloomFilter::from_vec(v)
317                .seed(&SEED)
318                .expected_items(segment.element_count as usize);
319            bfs.push(bloom_filter);
320        }
321
322        assert_eq!(meta.segment_loc_indices.len(), 2);
323
324        let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
325        assert!(bf0.contains(&b"a"));
326        assert!(bf0.contains(&b"b"));
327        assert!(bf0.contains(&b"c"));
328        assert!(bf0.contains(&b"d"));
329
330        let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
331        assert!(bf1.contains(&b"e"));
332        assert!(bf1.contains(&b"f"));
333    }
334
335    #[tokio::test]
336    async fn test_bloom_filter_creator_batch_push() {
337        let mut writer = Cursor::new(Vec::new());
338        let mut creator: BloomFilterCreator = BloomFilterCreator::new(
339            2,
340            Arc::new(MockExternalTempFileProvider::new()),
341            Arc::new(AtomicUsize::new(0)),
342            None,
343        );
344
345        creator
346            .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
347            .await
348            .unwrap();
349        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
350        assert!(creator.memory_usage() > 0);
351
352        creator
353            .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
354            .await
355            .unwrap();
356        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
357        assert!(creator.memory_usage() > 0);
358
359        creator
360            .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
361            .await
362            .unwrap();
363        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
364        assert!(creator.memory_usage() > 0);
365
366        creator.finish(&mut writer).await.unwrap();
367
368        let bytes = writer.into_inner();
369        let total_size = bytes.len();
370        let meta_size_offset = total_size - 4;
371        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
372
373        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
374        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
375
376        assert_eq!(meta.rows_per_segment, 2);
377        assert_eq!(meta.segment_count, 10);
378        assert_eq!(meta.row_count, 20);
379        assert_eq!(
380            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
381            total_size
382        );
383
384        let mut bfs = Vec::new();
385        for segment in meta.bloom_filter_locs {
386            let bloom_filter_bytes =
387                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
388            let v = u64_vec_from_bytes(bloom_filter_bytes);
389            let bloom_filter = BloomFilter::from_vec(v)
390                .seed(&SEED)
391                .expected_items(segment.element_count as _);
392            bfs.push(bloom_filter);
393        }
394
395        // 4 bloom filters to serve 10 segments
396        assert_eq!(bfs.len(), 4);
397        assert_eq!(meta.segment_loc_indices.len(), 10);
398
399        for idx in meta.segment_loc_indices.iter().take(3) {
400            let bf = &bfs[*idx as usize];
401            assert!(bf.contains(&b"a"));
402            assert!(bf.contains(&b"b"));
403        }
404        for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
405            let bf = &bfs[*idx as usize];
406            assert!(bf.contains(&b"c"));
407            assert!(bf.contains(&b"d"));
408        }
409        for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
410            let bf = &bfs[*idx as usize];
411            assert!(bf.contains(&b"e"));
412            assert!(bf.contains(&b"f"));
413        }
414    }
415
416    #[tokio::test]
417    async fn test_final_seg_all_null() {
418        let mut writer = Cursor::new(Vec::new());
419        let mut creator = BloomFilterCreator::new(
420            2,
421            Arc::new(MockExternalTempFileProvider::new()),
422            Arc::new(AtomicUsize::new(0)),
423            None,
424        );
425
426        creator
427            .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
428            .await
429            .unwrap();
430        creator.push_row_elems(Vec::new()).await.unwrap();
431
432        creator.finish(&mut writer).await.unwrap();
433
434        let bytes = writer.into_inner();
435        let total_size = bytes.len();
436        let meta_size_offset = total_size - 4;
437        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
438
439        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
440        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
441
442        assert_eq!(meta.rows_per_segment, 2);
443        assert_eq!(meta.segment_count, 3);
444        assert_eq!(meta.row_count, 5);
445    }
446}