index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::bloom_filter::error::{IoSnafu, Result};
29use crate::bloom_filter::SEED;
30use crate::external_provider::ExternalTempFileProvider;
31use crate::Bytes;
32
33/// `BloomFilterCreator` is responsible for creating and managing bloom filters
34/// for a set of elements. It divides the rows into segments and creates
35/// bloom filters for each segment.
36///
37/// # Format
38///
39/// The bloom filter creator writes the following format to the writer:
40///
41/// ```text
42/// +--------------------+--------------------+-----+----------------------+----------------------+
43/// | Bloom filter 0     | Bloom filter 1     | ... | BloomFilterMeta      | Meta size            |
44/// +--------------------+--------------------+-----+----------------------+----------------------+
45/// |<- bytes (size 0) ->|<- bytes (size 1) ->| ... |<- json (meta size) ->|<- u32 LE (4 bytes) ->|
46/// ```
47///
48pub struct BloomFilterCreator {
49    /// The number of rows per segment set by the user.
50    rows_per_segment: usize,
51
52    /// Row count that added to the bloom filter so far.
53    accumulated_row_count: usize,
54
55    /// A set of distinct elements in the current segment.
56    cur_seg_distinct_elems: HashSet<Bytes>,
57
58    /// The memory usage of the current segment's distinct elements.
59    cur_seg_distinct_elems_mem_usage: usize,
60
61    /// Storage for finalized Bloom filters.
62    finalized_bloom_filters: FinalizedBloomFilterStorage,
63
64    /// Row count that finalized so far.
65    finalized_row_count: usize,
66
67    /// Global memory usage of the bloom filter creator.
68    global_memory_usage: Arc<AtomicUsize>,
69}
70
71impl BloomFilterCreator {
72    /// Creates a new `BloomFilterCreator` with the specified number of rows per segment.
73    ///
74    /// # PANICS
75    ///
76    /// `rows_per_segment` <= 0
77    pub fn new(
78        rows_per_segment: usize,
79        false_positive_rate: f64,
80        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
81        global_memory_usage: Arc<AtomicUsize>,
82        global_memory_usage_threshold: Option<usize>,
83    ) -> Self {
84        assert!(
85            rows_per_segment > 0,
86            "rows_per_segment must be greater than 0"
87        );
88
89        Self {
90            rows_per_segment,
91            accumulated_row_count: 0,
92            cur_seg_distinct_elems: HashSet::default(),
93            cur_seg_distinct_elems_mem_usage: 0,
94            global_memory_usage: global_memory_usage.clone(),
95            finalized_bloom_filters: FinalizedBloomFilterStorage::new(
96                false_positive_rate,
97                intermediate_provider,
98                global_memory_usage,
99                global_memory_usage_threshold,
100            ),
101            finalized_row_count: 0,
102        }
103    }
104
105    /// Adds multiple rows of elements to the bloom filter. If the number of accumulated rows
106    /// reaches `rows_per_segment`, it finalizes the current segment.
107    pub async fn push_n_row_elems(
108        &mut self,
109        mut nrows: usize,
110        elems: impl IntoIterator<Item = Bytes>,
111    ) -> Result<()> {
112        if nrows == 0 {
113            return Ok(());
114        }
115        if nrows == 1 {
116            return self.push_row_elems(elems).await;
117        }
118
119        let elems = elems.into_iter().collect::<Vec<_>>();
120        while nrows > 0 {
121            let rows_to_seg_end =
122                self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
123            let rows_to_push = nrows.min(rows_to_seg_end);
124            nrows -= rows_to_push;
125
126            self.accumulated_row_count += rows_to_push;
127
128            let mut mem_diff = 0;
129            for elem in &elems {
130                let len = elem.len();
131                let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
132                if is_new {
133                    mem_diff += len;
134                }
135            }
136            self.cur_seg_distinct_elems_mem_usage += mem_diff;
137            self.global_memory_usage
138                .fetch_add(mem_diff, Ordering::Relaxed);
139
140            if self.accumulated_row_count % self.rows_per_segment == 0 {
141                self.finalize_segment().await?;
142                self.finalized_row_count = self.accumulated_row_count;
143            }
144        }
145
146        Ok(())
147    }
148
149    /// Adds a row of elements to the bloom filter. If the number of accumulated rows
150    /// reaches `rows_per_segment`, it finalizes the current segment.
151    pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
152        self.accumulated_row_count += 1;
153
154        let mut mem_diff = 0;
155        for elem in elems.into_iter() {
156            let len = elem.len();
157            let is_new = self.cur_seg_distinct_elems.insert(elem);
158            if is_new {
159                mem_diff += len;
160            }
161        }
162        self.cur_seg_distinct_elems_mem_usage += mem_diff;
163        self.global_memory_usage
164            .fetch_add(mem_diff, Ordering::Relaxed);
165
166        if self.accumulated_row_count % self.rows_per_segment == 0 {
167            self.finalize_segment().await?;
168            self.finalized_row_count = self.accumulated_row_count;
169        }
170
171        Ok(())
172    }
173
174    /// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
175    pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
176        if self.accumulated_row_count > self.finalized_row_count {
177            self.finalize_segment().await?;
178        }
179
180        let mut meta = BloomFilterMeta {
181            rows_per_segment: self.rows_per_segment as _,
182            row_count: self.accumulated_row_count as _,
183            ..Default::default()
184        };
185
186        let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
187        meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
188        meta.segment_count = meta.segment_loc_indices.len() as _;
189
190        while let Some(segment) = segs.next().await {
191            let segment = segment?;
192            writer
193                .write_all(&segment.bloom_filter_bytes)
194                .await
195                .context(IoSnafu)?;
196
197            let size = segment.bloom_filter_bytes.len() as u64;
198            meta.bloom_filter_locs.push(BloomFilterLoc {
199                offset: meta.bloom_filter_size as _,
200                size,
201                element_count: segment.element_count as _,
202            });
203            meta.bloom_filter_size += size;
204        }
205
206        let meta_bytes = meta.encode_to_vec();
207        writer.write_all(&meta_bytes).await.context(IoSnafu)?;
208
209        let meta_size = meta_bytes.len() as u32;
210        writer
211            .write_all(&meta_size.to_le_bytes())
212            .await
213            .context(IoSnafu)?;
214        writer.flush().await.unwrap();
215
216        Ok(())
217    }
218
219    /// Returns the memory usage of the creating bloom filter.
220    pub fn memory_usage(&self) -> usize {
221        self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
222    }
223
224    async fn finalize_segment(&mut self) -> Result<()> {
225        let elem_count = self.cur_seg_distinct_elems.len();
226        self.finalized_bloom_filters
227            .add(self.cur_seg_distinct_elems.drain(), elem_count)
228            .await?;
229
230        self.global_memory_usage
231            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
232        self.cur_seg_distinct_elems_mem_usage = 0;
233        Ok(())
234    }
235}
236
237impl Drop for BloomFilterCreator {
238    fn drop(&mut self) {
239        self.global_memory_usage
240            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
241    }
242}
243
244#[cfg(test)]
245mod tests {
246    use fastbloom::BloomFilter;
247    use futures::io::Cursor;
248
249    use super::*;
250    use crate::external_provider::MockExternalTempFileProvider;
251
252    /// Converts a slice of bytes to a vector of `u64`.
253    pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
254        bytes
255            .chunks_exact(std::mem::size_of::<u64>())
256            .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
257            .collect()
258    }
259
260    #[tokio::test]
261    async fn test_bloom_filter_creator() {
262        let mut writer = Cursor::new(Vec::new());
263        let mut creator = BloomFilterCreator::new(
264            2,
265            0.01,
266            Arc::new(MockExternalTempFileProvider::new()),
267            Arc::new(AtomicUsize::new(0)),
268            None,
269        );
270
271        creator
272            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
273            .await
274            .unwrap();
275        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
276        assert!(creator.memory_usage() > 0);
277
278        creator
279            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
280            .await
281            .unwrap();
282        // Finalize the first segment
283        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
284        assert!(creator.memory_usage() > 0);
285
286        creator
287            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
288            .await
289            .unwrap();
290        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
291        assert!(creator.memory_usage() > 0);
292
293        creator.finish(&mut writer).await.unwrap();
294
295        let bytes = writer.into_inner();
296        let total_size = bytes.len();
297        let meta_size_offset = total_size - 4;
298        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
299
300        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
301        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
302
303        assert_eq!(meta.rows_per_segment, 2);
304        assert_eq!(meta.segment_count, 2);
305        assert_eq!(meta.row_count, 3);
306        assert_eq!(
307            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
308            total_size
309        );
310
311        let mut bfs = Vec::new();
312        for segment in meta.bloom_filter_locs {
313            let bloom_filter_bytes =
314                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
315            let v = u64_vec_from_bytes(bloom_filter_bytes);
316            let bloom_filter = BloomFilter::from_vec(v)
317                .seed(&SEED)
318                .expected_items(segment.element_count as usize);
319            bfs.push(bloom_filter);
320        }
321
322        assert_eq!(meta.segment_loc_indices.len(), 2);
323
324        let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
325        assert!(bf0.contains(&b"a"));
326        assert!(bf0.contains(&b"b"));
327        assert!(bf0.contains(&b"c"));
328        assert!(bf0.contains(&b"d"));
329
330        let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
331        assert!(bf1.contains(&b"e"));
332        assert!(bf1.contains(&b"f"));
333    }
334
335    #[tokio::test]
336    async fn test_bloom_filter_creator_batch_push() {
337        let mut writer = Cursor::new(Vec::new());
338        let mut creator: BloomFilterCreator = BloomFilterCreator::new(
339            2,
340            0.01,
341            Arc::new(MockExternalTempFileProvider::new()),
342            Arc::new(AtomicUsize::new(0)),
343            None,
344        );
345
346        creator
347            .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
348            .await
349            .unwrap();
350        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
351        assert!(creator.memory_usage() > 0);
352
353        creator
354            .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
355            .await
356            .unwrap();
357        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
358        assert!(creator.memory_usage() > 0);
359
360        creator
361            .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
362            .await
363            .unwrap();
364        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
365        assert!(creator.memory_usage() > 0);
366
367        creator.finish(&mut writer).await.unwrap();
368
369        let bytes = writer.into_inner();
370        let total_size = bytes.len();
371        let meta_size_offset = total_size - 4;
372        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
373
374        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
375        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
376
377        assert_eq!(meta.rows_per_segment, 2);
378        assert_eq!(meta.segment_count, 10);
379        assert_eq!(meta.row_count, 20);
380        assert_eq!(
381            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
382            total_size
383        );
384
385        let mut bfs = Vec::new();
386        for segment in meta.bloom_filter_locs {
387            let bloom_filter_bytes =
388                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
389            let v = u64_vec_from_bytes(bloom_filter_bytes);
390            let bloom_filter = BloomFilter::from_vec(v)
391                .seed(&SEED)
392                .expected_items(segment.element_count as _);
393            bfs.push(bloom_filter);
394        }
395
396        // 4 bloom filters to serve 10 segments
397        assert_eq!(bfs.len(), 4);
398        assert_eq!(meta.segment_loc_indices.len(), 10);
399
400        for idx in meta.segment_loc_indices.iter().take(3) {
401            let bf = &bfs[*idx as usize];
402            assert!(bf.contains(&b"a"));
403            assert!(bf.contains(&b"b"));
404        }
405        for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
406            let bf = &bfs[*idx as usize];
407            assert!(bf.contains(&b"c"));
408            assert!(bf.contains(&b"d"));
409        }
410        for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
411            let bf = &bfs[*idx as usize];
412            assert!(bf.contains(&b"e"));
413            assert!(bf.contains(&b"f"));
414        }
415    }
416
417    #[tokio::test]
418    async fn test_final_seg_all_null() {
419        let mut writer = Cursor::new(Vec::new());
420        let mut creator = BloomFilterCreator::new(
421            2,
422            0.01,
423            Arc::new(MockExternalTempFileProvider::new()),
424            Arc::new(AtomicUsize::new(0)),
425            None,
426        );
427
428        creator
429            .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
430            .await
431            .unwrap();
432        creator.push_row_elems(Vec::new()).await.unwrap();
433
434        creator.finish(&mut writer).await.unwrap();
435
436        let bytes = writer.into_inner();
437        let total_size = bytes.len();
438        let meta_size_offset = total_size - 4;
439        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
440
441        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
442        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
443
444        assert_eq!(meta.rows_per_segment, 2);
445        assert_eq!(meta.segment_count, 3);
446        assert_eq!(meta.row_count, 5);
447    }
448}