index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod finalize_segment;
16mod intermediate_codec;
17
18use std::collections::HashSet;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21
22use finalize_segment::FinalizedBloomFilterStorage;
23use futures::{AsyncWrite, AsyncWriteExt, StreamExt};
24use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
25use prost::Message;
26use snafu::ResultExt;
27
28use crate::Bytes;
29use crate::bloom_filter::SEED;
30use crate::bloom_filter::error::{IoSnafu, Result};
31use crate::external_provider::ExternalTempFileProvider;
32
33/// `BloomFilterCreator` is responsible for creating and managing bloom filters
34/// for a set of elements. It divides the rows into segments and creates
35/// bloom filters for each segment.
36///
37/// # Format
38///
39/// The bloom filter creator writes the following format to the writer:
40///
41/// ```text
42/// +--------------------+--------------------+-----+----------------------+----------------------+
43/// | Bloom filter 0     | Bloom filter 1     | ... | BloomFilterMeta      | Meta size            |
44/// +--------------------+--------------------+-----+----------------------+----------------------+
45/// |<- bytes (size 0) ->|<- bytes (size 1) ->| ... |<- json (meta size) ->|<- u32 LE (4 bytes) ->|
46/// ```
47///
48pub struct BloomFilterCreator {
49    /// The number of rows per segment set by the user.
50    rows_per_segment: usize,
51
52    /// Row count that added to the bloom filter so far.
53    accumulated_row_count: usize,
54
55    /// A set of distinct elements in the current segment.
56    cur_seg_distinct_elems: HashSet<Bytes>,
57
58    /// The memory usage of the current segment's distinct elements.
59    cur_seg_distinct_elems_mem_usage: usize,
60
61    /// Storage for finalized Bloom filters.
62    finalized_bloom_filters: FinalizedBloomFilterStorage,
63
64    /// Row count that finalized so far.
65    finalized_row_count: usize,
66
67    /// Global memory usage of the bloom filter creator.
68    global_memory_usage: Arc<AtomicUsize>,
69}
70
71impl BloomFilterCreator {
72    /// Creates a new `BloomFilterCreator` with the specified number of rows per segment.
73    ///
74    /// # PANICS
75    ///
76    /// `rows_per_segment` <= 0
77    pub fn new(
78        rows_per_segment: usize,
79        false_positive_rate: f64,
80        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
81        global_memory_usage: Arc<AtomicUsize>,
82        global_memory_usage_threshold: Option<usize>,
83    ) -> Self {
84        assert!(
85            rows_per_segment > 0,
86            "rows_per_segment must be greater than 0"
87        );
88
89        Self {
90            rows_per_segment,
91            accumulated_row_count: 0,
92            cur_seg_distinct_elems: HashSet::default(),
93            cur_seg_distinct_elems_mem_usage: 0,
94            global_memory_usage: global_memory_usage.clone(),
95            finalized_bloom_filters: FinalizedBloomFilterStorage::new(
96                false_positive_rate,
97                intermediate_provider,
98                global_memory_usage,
99                global_memory_usage_threshold,
100            ),
101            finalized_row_count: 0,
102        }
103    }
104
105    /// Adds multiple rows of elements to the bloom filter. If the number of accumulated rows
106    /// reaches `rows_per_segment`, it finalizes the current segment.
107    pub async fn push_n_row_elems(
108        &mut self,
109        mut nrows: usize,
110        elems: impl IntoIterator<Item = Bytes>,
111    ) -> Result<()> {
112        if nrows == 0 {
113            return Ok(());
114        }
115        if nrows == 1 {
116            return self.push_row_elems(elems).await;
117        }
118
119        let elems = elems.into_iter().collect::<Vec<_>>();
120        while nrows > 0 {
121            let rows_to_seg_end =
122                self.rows_per_segment - (self.accumulated_row_count % self.rows_per_segment);
123            let rows_to_push = nrows.min(rows_to_seg_end);
124            nrows -= rows_to_push;
125
126            self.accumulated_row_count += rows_to_push;
127
128            let mut mem_diff = 0;
129            for elem in &elems {
130                let len = elem.len();
131                let is_new = self.cur_seg_distinct_elems.insert(elem.clone());
132                if is_new {
133                    mem_diff += len;
134                }
135            }
136            self.cur_seg_distinct_elems_mem_usage += mem_diff;
137            self.global_memory_usage
138                .fetch_add(mem_diff, Ordering::Relaxed);
139
140            if self
141                .accumulated_row_count
142                .is_multiple_of(self.rows_per_segment)
143            {
144                self.finalize_segment().await?;
145                self.finalized_row_count = self.accumulated_row_count;
146            }
147        }
148
149        Ok(())
150    }
151
152    /// Adds a row of elements to the bloom filter. If the number of accumulated rows
153    /// reaches `rows_per_segment`, it finalizes the current segment.
154    pub async fn push_row_elems(&mut self, elems: impl IntoIterator<Item = Bytes>) -> Result<()> {
155        self.accumulated_row_count += 1;
156
157        let mut mem_diff = 0;
158        for elem in elems.into_iter() {
159            let len = elem.len();
160            let is_new = self.cur_seg_distinct_elems.insert(elem);
161            if is_new {
162                mem_diff += len;
163            }
164        }
165        self.cur_seg_distinct_elems_mem_usage += mem_diff;
166        self.global_memory_usage
167            .fetch_add(mem_diff, Ordering::Relaxed);
168
169        if self
170            .accumulated_row_count
171            .is_multiple_of(self.rows_per_segment)
172        {
173            self.finalize_segment().await?;
174            self.finalized_row_count = self.accumulated_row_count;
175        }
176
177        Ok(())
178    }
179
180    /// Finalizes any remaining segments and writes the bloom filters and metadata to the provided writer.
181    pub async fn finish(&mut self, mut writer: impl AsyncWrite + Unpin) -> Result<()> {
182        if self.accumulated_row_count > self.finalized_row_count {
183            self.finalize_segment().await?;
184        }
185
186        let mut meta = BloomFilterMeta {
187            rows_per_segment: self.rows_per_segment as _,
188            row_count: self.accumulated_row_count as _,
189            ..Default::default()
190        };
191
192        let (indices, mut segs) = self.finalized_bloom_filters.drain().await?;
193        meta.segment_loc_indices = indices.into_iter().map(|i| i as u64).collect();
194        meta.segment_count = meta.segment_loc_indices.len() as _;
195
196        while let Some(segment) = segs.next().await {
197            let segment = segment?;
198            writer
199                .write_all(&segment.bloom_filter_bytes)
200                .await
201                .context(IoSnafu)?;
202
203            let size = segment.bloom_filter_bytes.len() as u64;
204            meta.bloom_filter_locs.push(BloomFilterLoc {
205                offset: meta.bloom_filter_size as _,
206                size,
207                element_count: segment.element_count as _,
208            });
209            meta.bloom_filter_size += size;
210        }
211
212        let meta_bytes = meta.encode_to_vec();
213        writer.write_all(&meta_bytes).await.context(IoSnafu)?;
214
215        let meta_size = meta_bytes.len() as u32;
216        writer
217            .write_all(&meta_size.to_le_bytes())
218            .await
219            .context(IoSnafu)?;
220        writer.flush().await.unwrap();
221
222        Ok(())
223    }
224
225    /// Returns the memory usage of the creating bloom filter.
226    pub fn memory_usage(&self) -> usize {
227        self.cur_seg_distinct_elems_mem_usage + self.finalized_bloom_filters.memory_usage()
228    }
229
230    async fn finalize_segment(&mut self) -> Result<()> {
231        let elem_count = self.cur_seg_distinct_elems.len();
232        self.finalized_bloom_filters
233            .add(self.cur_seg_distinct_elems.drain(), elem_count)
234            .await?;
235
236        self.global_memory_usage
237            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
238        self.cur_seg_distinct_elems_mem_usage = 0;
239        Ok(())
240    }
241}
242
243impl Drop for BloomFilterCreator {
244    fn drop(&mut self) {
245        self.global_memory_usage
246            .fetch_sub(self.cur_seg_distinct_elems_mem_usage, Ordering::Relaxed);
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use fastbloom::BloomFilter;
253    use futures::io::Cursor;
254
255    use super::*;
256    use crate::external_provider::MockExternalTempFileProvider;
257
258    /// Converts a slice of bytes to a vector of `u64`.
259    pub fn u64_vec_from_bytes(bytes: &[u8]) -> Vec<u64> {
260        bytes
261            .chunks_exact(std::mem::size_of::<u64>())
262            .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
263            .collect()
264    }
265
266    #[tokio::test]
267    async fn test_bloom_filter_creator() {
268        let mut writer = Cursor::new(Vec::new());
269        let mut creator = BloomFilterCreator::new(
270            2,
271            0.01,
272            Arc::new(MockExternalTempFileProvider::new()),
273            Arc::new(AtomicUsize::new(0)),
274            None,
275        );
276
277        creator
278            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
279            .await
280            .unwrap();
281        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
282        assert!(creator.memory_usage() > 0);
283
284        creator
285            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
286            .await
287            .unwrap();
288        // Finalize the first segment
289        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
290        assert!(creator.memory_usage() > 0);
291
292        creator
293            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
294            .await
295            .unwrap();
296        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
297        assert!(creator.memory_usage() > 0);
298
299        creator.finish(&mut writer).await.unwrap();
300
301        let bytes = writer.into_inner();
302        let total_size = bytes.len();
303        let meta_size_offset = total_size - 4;
304        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
305
306        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
307        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
308
309        assert_eq!(meta.rows_per_segment, 2);
310        assert_eq!(meta.segment_count, 2);
311        assert_eq!(meta.row_count, 3);
312        assert_eq!(
313            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
314            total_size
315        );
316
317        let mut bfs = Vec::new();
318        for segment in meta.bloom_filter_locs {
319            let bloom_filter_bytes =
320                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
321            let v = u64_vec_from_bytes(bloom_filter_bytes);
322            let bloom_filter = BloomFilter::from_vec(v)
323                .seed(&SEED)
324                .expected_items(segment.element_count as usize);
325            bfs.push(bloom_filter);
326        }
327
328        assert_eq!(meta.segment_loc_indices.len(), 2);
329
330        let bf0 = &bfs[meta.segment_loc_indices[0] as usize];
331        assert!(bf0.contains(&b"a"));
332        assert!(bf0.contains(&b"b"));
333        assert!(bf0.contains(&b"c"));
334        assert!(bf0.contains(&b"d"));
335
336        let bf1 = &bfs[meta.segment_loc_indices[1] as usize];
337        assert!(bf1.contains(&b"e"));
338        assert!(bf1.contains(&b"f"));
339    }
340
341    #[tokio::test]
342    async fn test_bloom_filter_creator_batch_push() {
343        let mut writer = Cursor::new(Vec::new());
344        let mut creator: BloomFilterCreator = BloomFilterCreator::new(
345            2,
346            0.01,
347            Arc::new(MockExternalTempFileProvider::new()),
348            Arc::new(AtomicUsize::new(0)),
349            None,
350        );
351
352        creator
353            .push_n_row_elems(5, vec![b"a".to_vec(), b"b".to_vec()])
354            .await
355            .unwrap();
356        assert!(creator.cur_seg_distinct_elems_mem_usage > 0);
357        assert!(creator.memory_usage() > 0);
358
359        creator
360            .push_n_row_elems(5, vec![b"c".to_vec(), b"d".to_vec()])
361            .await
362            .unwrap();
363        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
364        assert!(creator.memory_usage() > 0);
365
366        creator
367            .push_n_row_elems(10, vec![b"e".to_vec(), b"f".to_vec()])
368            .await
369            .unwrap();
370        assert_eq!(creator.cur_seg_distinct_elems_mem_usage, 0);
371        assert!(creator.memory_usage() > 0);
372
373        creator.finish(&mut writer).await.unwrap();
374
375        let bytes = writer.into_inner();
376        let total_size = bytes.len();
377        let meta_size_offset = total_size - 4;
378        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
379
380        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
381        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
382
383        assert_eq!(meta.rows_per_segment, 2);
384        assert_eq!(meta.segment_count, 10);
385        assert_eq!(meta.row_count, 20);
386        assert_eq!(
387            meta.bloom_filter_size as usize + meta_bytes.len() + 4,
388            total_size
389        );
390
391        let mut bfs = Vec::new();
392        for segment in meta.bloom_filter_locs {
393            let bloom_filter_bytes =
394                &bytes[segment.offset as usize..(segment.offset + segment.size) as usize];
395            let v = u64_vec_from_bytes(bloom_filter_bytes);
396            let bloom_filter = BloomFilter::from_vec(v)
397                .seed(&SEED)
398                .expected_items(segment.element_count as _);
399            bfs.push(bloom_filter);
400        }
401
402        // 4 bloom filters to serve 10 segments
403        assert_eq!(bfs.len(), 4);
404        assert_eq!(meta.segment_loc_indices.len(), 10);
405
406        for idx in meta.segment_loc_indices.iter().take(3) {
407            let bf = &bfs[*idx as usize];
408            assert!(bf.contains(&b"a"));
409            assert!(bf.contains(&b"b"));
410        }
411        for idx in meta.segment_loc_indices.iter().take(5).skip(2) {
412            let bf = &bfs[*idx as usize];
413            assert!(bf.contains(&b"c"));
414            assert!(bf.contains(&b"d"));
415        }
416        for idx in meta.segment_loc_indices.iter().take(10).skip(5) {
417            let bf = &bfs[*idx as usize];
418            assert!(bf.contains(&b"e"));
419            assert!(bf.contains(&b"f"));
420        }
421    }
422
423    #[tokio::test]
424    async fn test_final_seg_all_null() {
425        let mut writer = Cursor::new(Vec::new());
426        let mut creator = BloomFilterCreator::new(
427            2,
428            0.01,
429            Arc::new(MockExternalTempFileProvider::new()),
430            Arc::new(AtomicUsize::new(0)),
431            None,
432        );
433
434        creator
435            .push_n_row_elems(4, vec![b"a".to_vec(), b"b".to_vec()])
436            .await
437            .unwrap();
438        creator.push_row_elems(Vec::new()).await.unwrap();
439
440        creator.finish(&mut writer).await.unwrap();
441
442        let bytes = writer.into_inner();
443        let total_size = bytes.len();
444        let meta_size_offset = total_size - 4;
445        let meta_size = u32::from_le_bytes((&bytes[meta_size_offset..]).try_into().unwrap());
446
447        let meta_bytes = &bytes[total_size - meta_size as usize - 4..total_size - 4];
448        let meta = BloomFilterMeta::decode(meta_bytes).unwrap();
449
450        assert_eq!(meta.rows_per_segment, 2);
451        assert_eq!(meta.segment_count, 3);
452        assert_eq!(meta.row_count, 5);
453    }
454}