index/bloom_filter/creator/
finalize_segment.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use asynchronous_codec::{FramedRead, FramedWrite};
20use fastbloom::BloomFilter;
21use futures::stream::StreamExt;
22use futures::{stream, AsyncWriteExt, Stream};
23use snafu::ResultExt;
24
25use crate::bloom_filter::creator::intermediate_codec::IntermediateBloomFilterCodecV1;
26use crate::bloom_filter::creator::SEED;
27use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
28use crate::external_provider::ExternalTempFileProvider;
29use crate::Bytes;
30
31/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk.
32const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
33
34/// Storage for finalized Bloom filters.
35pub struct FinalizedBloomFilterStorage {
36    /// The false positive rate of the Bloom filter.
37    false_positive_rate: f64,
38
39    /// Indices of the segments in the sequence of finalized Bloom filters.
40    segment_indices: Vec<usize>,
41
42    /// Bloom filters that are stored in memory.
43    in_memory: Vec<FinalizedBloomFilterSegment>,
44
45    /// Used to generate unique file IDs for intermediate Bloom filters.
46    intermediate_file_id_counter: usize,
47
48    /// Prefix for intermediate Bloom filter files.
49    intermediate_prefix: String,
50
51    /// The provider for intermediate Bloom filter files.
52    intermediate_provider: Arc<dyn ExternalTempFileProvider>,
53
54    /// The memory usage of the in-memory Bloom filters.
55    memory_usage: usize,
56
57    /// The global memory usage provided by the user to track the
58    /// total memory usage of the creating Bloom filters.
59    global_memory_usage: Arc<AtomicUsize>,
60
61    /// The threshold of the global memory usage of the creating Bloom filters.
62    global_memory_usage_threshold: Option<usize>,
63
64    /// Records the number of flushed segments.
65    flushed_seg_count: usize,
66}
67
68impl FinalizedBloomFilterStorage {
69    /// Creates a new `FinalizedBloomFilterStorage`.
70    pub fn new(
71        false_positive_rate: f64,
72        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
73        global_memory_usage: Arc<AtomicUsize>,
74        global_memory_usage_threshold: Option<usize>,
75    ) -> Self {
76        let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
77        Self {
78            false_positive_rate,
79            segment_indices: Vec::new(),
80            in_memory: Vec::new(),
81            intermediate_file_id_counter: 0,
82            intermediate_prefix: external_prefix,
83            intermediate_provider,
84            memory_usage: 0,
85            global_memory_usage,
86            global_memory_usage_threshold,
87            flushed_seg_count: 0,
88        }
89    }
90
91    /// Returns the memory usage of the storage.
92    pub fn memory_usage(&self) -> usize {
93        self.memory_usage
94    }
95
96    /// Adds a new finalized Bloom filter to the storage.
97    ///
98    /// If the memory usage exceeds the threshold, flushes the in-memory Bloom filters to disk.
99    pub async fn add(
100        &mut self,
101        elems: impl IntoIterator<Item = Bytes>,
102        element_count: usize,
103    ) -> Result<()> {
104        let mut bf = BloomFilter::with_false_pos(self.false_positive_rate)
105            .seed(&SEED)
106            .expected_items(element_count);
107        for elem in elems.into_iter() {
108            bf.insert(&elem);
109        }
110
111        let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
112
113        // Reuse the last segment if it is the same as the current one.
114        if self.in_memory.last() == Some(&fbf) {
115            self.segment_indices
116                .push(self.flushed_seg_count + self.in_memory.len() - 1);
117            return Ok(());
118        }
119
120        // Update memory usage.
121        let memory_diff = fbf.bloom_filter_bytes.len();
122        self.memory_usage += memory_diff;
123        self.global_memory_usage
124            .fetch_add(memory_diff, Ordering::Relaxed);
125
126        // Add the finalized Bloom filter to the in-memory storage.
127        self.in_memory.push(fbf);
128        self.segment_indices
129            .push(self.flushed_seg_count + self.in_memory.len() - 1);
130
131        // Flush to disk if necessary.
132
133        // Do not flush if memory usage is too low.
134        if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
135            return Ok(());
136        }
137
138        // Check if the global memory usage exceeds the threshold and flush to disk if necessary.
139        if let Some(threshold) = self.global_memory_usage_threshold {
140            let global = self.global_memory_usage.load(Ordering::Relaxed);
141
142            if global > threshold {
143                self.flush_in_memory_to_disk().await?;
144
145                self.global_memory_usage
146                    .fetch_sub(self.memory_usage, Ordering::Relaxed);
147                self.memory_usage = 0;
148            }
149        }
150
151        Ok(())
152    }
153
154    /// Drains the storage and returns indieces of the segments and a stream of finalized Bloom filters.
155    pub async fn drain(
156        &mut self,
157    ) -> Result<(
158        Vec<usize>,
159        Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
160    )> {
161        // FAST PATH: memory only
162        if self.intermediate_file_id_counter == 0 {
163            return Ok((
164                std::mem::take(&mut self.segment_indices),
165                Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
166            ));
167        }
168
169        // SLOW PATH: memory + disk
170        let mut on_disk = self
171            .intermediate_provider
172            .read_all(&self.intermediate_prefix)
173            .await
174            .context(IntermediateSnafu)?;
175        on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
176
177        let streams = on_disk
178            .into_iter()
179            .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
180
181        let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
182        Ok((
183            std::mem::take(&mut self.segment_indices),
184            Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
185        ))
186    }
187
188    /// Flushes the in-memory Bloom filters to disk.
189    async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
190        let file_id = self.intermediate_file_id_counter;
191        self.intermediate_file_id_counter += 1;
192        self.flushed_seg_count += self.in_memory.len();
193
194        let file_id = format!("{:08}", file_id);
195        let mut writer = self
196            .intermediate_provider
197            .create(&self.intermediate_prefix, &file_id)
198            .await
199            .context(IntermediateSnafu)?;
200
201        let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
202        // `forward()` will flush and close the writer when the stream ends
203        if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
204            .forward(fw)
205            .await
206        {
207            writer.close().await.context(IoSnafu)?;
208            writer.flush().await.context(IoSnafu)?;
209            return Err(e);
210        }
211
212        Ok(())
213    }
214}
215
216impl Drop for FinalizedBloomFilterStorage {
217    fn drop(&mut self) {
218        self.global_memory_usage
219            .fetch_sub(self.memory_usage, Ordering::Relaxed);
220    }
221}
222
223/// A finalized Bloom filter segment.
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct FinalizedBloomFilterSegment {
226    /// The underlying Bloom filter bytes.
227    pub bloom_filter_bytes: Vec<u8>,
228
229    /// The number of elements in the Bloom filter.
230    pub element_count: usize,
231}
232
233impl FinalizedBloomFilterSegment {
234    fn from(bf: BloomFilter, elem_count: usize) -> Self {
235        let bf_slice = bf.as_slice();
236        let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
237        for &x in bf_slice {
238            bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
239        }
240
241        Self {
242            bloom_filter_bytes,
243            element_count: elem_count,
244        }
245    }
246}
247
248#[cfg(test)]
249mod tests {
250    use std::collections::HashMap;
251    use std::sync::Mutex;
252
253    use futures::AsyncRead;
254    use tokio::io::duplex;
255    use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
256
257    use super::*;
258    use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
259    use crate::external_provider::MockExternalTempFileProvider;
260
261    #[tokio::test]
262    async fn test_finalized_bloom_filter_storage() {
263        let mut mock_provider = MockExternalTempFileProvider::new();
264
265        let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
266            Arc::new(Mutex::new(HashMap::new()));
267
268        mock_provider.expect_create().returning({
269            let files = Arc::clone(&mock_files);
270            move |file_group, file_id| {
271                assert!(file_group.starts_with("intm-bloom-filters-"));
272                let mut files = files.lock().unwrap();
273                let (writer, reader) = duplex(2 * 1024 * 1024);
274                files.insert(file_id.to_string(), Box::new(reader.compat()));
275                Ok(Box::new(writer.compat_write()))
276            }
277        });
278
279        mock_provider.expect_read_all().returning({
280            let files = Arc::clone(&mock_files);
281            move |file_group| {
282                assert!(file_group.starts_with("intm-bloom-filters-"));
283                let mut files = files.lock().unwrap();
284                Ok(files.drain().collect::<Vec<_>>())
285            }
286        });
287
288        let global_memory_usage = Arc::new(AtomicUsize::new(0));
289        let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
290        let provider = Arc::new(mock_provider);
291        let mut storage = FinalizedBloomFilterStorage::new(
292            0.01,
293            provider,
294            global_memory_usage.clone(),
295            global_memory_usage_threshold,
296        );
297
298        let elem_count = 2000;
299        let batch = 1000;
300        let dup_batch = 200;
301
302        for i in 0..(batch - dup_batch) {
303            let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
304            storage.add(elems, elem_count).await.unwrap();
305        }
306        for _ in 0..dup_batch {
307            storage.add(Some(vec![]), 1).await.unwrap();
308        }
309
310        // Flush happens.
311        assert!(storage.intermediate_file_id_counter > 0);
312
313        // Drain the storage.
314        let (indices, mut stream) = storage.drain().await.unwrap();
315        assert_eq!(indices.len(), batch);
316
317        for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
318            let segment = stream.next().await.unwrap().unwrap();
319            assert_eq!(segment.element_count, elem_count);
320
321            let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
322
323            // Check the correctness of the Bloom filter.
324            let bf = BloomFilter::from_vec(v)
325                .seed(&SEED)
326                .expected_items(segment.element_count);
327            for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
328                assert!(bf.contains(&elem));
329            }
330            assert_eq!(indices[i], *idx);
331        }
332
333        // Check the correctness of the duplicated segments.
334        let dup_seg = stream.next().await.unwrap().unwrap();
335        assert_eq!(dup_seg.element_count, 1);
336        assert!(stream.next().await.is_none());
337        assert!(indices[(batch - dup_batch)..batch]
338            .iter()
339            .all(|&x| x == batch - dup_batch));
340    }
341
342    #[tokio::test]
343    async fn test_finalized_bloom_filter_storage_all_dup() {
344        let mock_provider = MockExternalTempFileProvider::new();
345        let global_memory_usage = Arc::new(AtomicUsize::new(0));
346        let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
347        let provider = Arc::new(mock_provider);
348        let mut storage = FinalizedBloomFilterStorage::new(
349            0.01,
350            provider,
351            global_memory_usage.clone(),
352            global_memory_usage_threshold,
353        );
354
355        let batch = 1000;
356        for _ in 0..batch {
357            storage.add(Some(vec![]), 1).await.unwrap();
358        }
359
360        // Drain the storage.
361        let (indices, mut stream) = storage.drain().await.unwrap();
362
363        let bf = stream.next().await.unwrap().unwrap();
364        assert_eq!(bf.element_count, 1);
365
366        assert!(stream.next().await.is_none());
367
368        assert_eq!(indices.len(), batch);
369        assert!(indices.iter().all(|&x| x == 0));
370    }
371}