index/bloom_filter/creator/
finalize_segment.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::pin::Pin;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use asynchronous_codec::{FramedRead, FramedWrite};
20use fastbloom::BloomFilter;
21use futures::stream::StreamExt;
22use futures::{stream, AsyncWriteExt, Stream};
23use snafu::ResultExt;
24
25use crate::bloom_filter::creator::intermediate_codec::IntermediateBloomFilterCodecV1;
26use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
27use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
28use crate::external_provider::ExternalTempFileProvider;
29use crate::Bytes;
30
31/// The minimum memory usage threshold for flushing in-memory Bloom filters to disk.
32const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; // 1MB
33
34/// Storage for finalized Bloom filters.
35pub struct FinalizedBloomFilterStorage {
36    /// Indices of the segments in the sequence of finalized Bloom filters.
37    segment_indices: Vec<usize>,
38
39    /// Bloom filters that are stored in memory.
40    in_memory: Vec<FinalizedBloomFilterSegment>,
41
42    /// Used to generate unique file IDs for intermediate Bloom filters.
43    intermediate_file_id_counter: usize,
44
45    /// Prefix for intermediate Bloom filter files.
46    intermediate_prefix: String,
47
48    /// The provider for intermediate Bloom filter files.
49    intermediate_provider: Arc<dyn ExternalTempFileProvider>,
50
51    /// The memory usage of the in-memory Bloom filters.
52    memory_usage: usize,
53
54    /// The global memory usage provided by the user to track the
55    /// total memory usage of the creating Bloom filters.
56    global_memory_usage: Arc<AtomicUsize>,
57
58    /// The threshold of the global memory usage of the creating Bloom filters.
59    global_memory_usage_threshold: Option<usize>,
60
61    /// Records the number of flushed segments.
62    flushed_seg_count: usize,
63}
64
65impl FinalizedBloomFilterStorage {
66    /// Creates a new `FinalizedBloomFilterStorage`.
67    pub fn new(
68        intermediate_provider: Arc<dyn ExternalTempFileProvider>,
69        global_memory_usage: Arc<AtomicUsize>,
70        global_memory_usage_threshold: Option<usize>,
71    ) -> Self {
72        let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
73        Self {
74            segment_indices: Vec::new(),
75            in_memory: Vec::new(),
76            intermediate_file_id_counter: 0,
77            intermediate_prefix: external_prefix,
78            intermediate_provider,
79            memory_usage: 0,
80            global_memory_usage,
81            global_memory_usage_threshold,
82            flushed_seg_count: 0,
83        }
84    }
85
86    /// Returns the memory usage of the storage.
87    pub fn memory_usage(&self) -> usize {
88        self.memory_usage
89    }
90
91    /// Adds a new finalized Bloom filter to the storage.
92    ///
93    /// If the memory usage exceeds the threshold, flushes the in-memory Bloom filters to disk.
94    pub async fn add(
95        &mut self,
96        elems: impl IntoIterator<Item = Bytes>,
97        element_count: usize,
98    ) -> Result<()> {
99        let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
100            .seed(&SEED)
101            .expected_items(element_count);
102        for elem in elems.into_iter() {
103            bf.insert(&elem);
104        }
105
106        let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
107
108        // Reuse the last segment if it is the same as the current one.
109        if self.in_memory.last() == Some(&fbf) {
110            self.segment_indices
111                .push(self.flushed_seg_count + self.in_memory.len() - 1);
112            return Ok(());
113        }
114
115        // Update memory usage.
116        let memory_diff = fbf.bloom_filter_bytes.len();
117        self.memory_usage += memory_diff;
118        self.global_memory_usage
119            .fetch_add(memory_diff, Ordering::Relaxed);
120
121        // Add the finalized Bloom filter to the in-memory storage.
122        self.in_memory.push(fbf);
123        self.segment_indices
124            .push(self.flushed_seg_count + self.in_memory.len() - 1);
125
126        // Flush to disk if necessary.
127
128        // Do not flush if memory usage is too low.
129        if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
130            return Ok(());
131        }
132
133        // Check if the global memory usage exceeds the threshold and flush to disk if necessary.
134        if let Some(threshold) = self.global_memory_usage_threshold {
135            let global = self.global_memory_usage.load(Ordering::Relaxed);
136
137            if global > threshold {
138                self.flush_in_memory_to_disk().await?;
139
140                self.global_memory_usage
141                    .fetch_sub(self.memory_usage, Ordering::Relaxed);
142                self.memory_usage = 0;
143            }
144        }
145
146        Ok(())
147    }
148
149    /// Drains the storage and returns indieces of the segments and a stream of finalized Bloom filters.
150    pub async fn drain(
151        &mut self,
152    ) -> Result<(
153        Vec<usize>,
154        Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
155    )> {
156        // FAST PATH: memory only
157        if self.intermediate_file_id_counter == 0 {
158            return Ok((
159                std::mem::take(&mut self.segment_indices),
160                Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
161            ));
162        }
163
164        // SLOW PATH: memory + disk
165        let mut on_disk = self
166            .intermediate_provider
167            .read_all(&self.intermediate_prefix)
168            .await
169            .context(IntermediateSnafu)?;
170        on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
171
172        let streams = on_disk
173            .into_iter()
174            .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
175
176        let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
177        Ok((
178            std::mem::take(&mut self.segment_indices),
179            Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
180        ))
181    }
182
183    /// Flushes the in-memory Bloom filters to disk.
184    async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
185        let file_id = self.intermediate_file_id_counter;
186        self.intermediate_file_id_counter += 1;
187        self.flushed_seg_count += self.in_memory.len();
188
189        let file_id = format!("{:08}", file_id);
190        let mut writer = self
191            .intermediate_provider
192            .create(&self.intermediate_prefix, &file_id)
193            .await
194            .context(IntermediateSnafu)?;
195
196        let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
197        // `forward()` will flush and close the writer when the stream ends
198        if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
199            .forward(fw)
200            .await
201        {
202            writer.close().await.context(IoSnafu)?;
203            writer.flush().await.context(IoSnafu)?;
204            return Err(e);
205        }
206
207        Ok(())
208    }
209}
210
211impl Drop for FinalizedBloomFilterStorage {
212    fn drop(&mut self) {
213        self.global_memory_usage
214            .fetch_sub(self.memory_usage, Ordering::Relaxed);
215    }
216}
217
218/// A finalized Bloom filter segment.
219#[derive(Debug, Clone, PartialEq, Eq)]
220pub struct FinalizedBloomFilterSegment {
221    /// The underlying Bloom filter bytes.
222    pub bloom_filter_bytes: Vec<u8>,
223
224    /// The number of elements in the Bloom filter.
225    pub element_count: usize,
226}
227
228impl FinalizedBloomFilterSegment {
229    fn from(bf: BloomFilter, elem_count: usize) -> Self {
230        let bf_slice = bf.as_slice();
231        let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
232        for &x in bf_slice {
233            bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
234        }
235
236        Self {
237            bloom_filter_bytes,
238            element_count: elem_count,
239        }
240    }
241}
242
243#[cfg(test)]
244mod tests {
245    use std::collections::HashMap;
246    use std::sync::Mutex;
247
248    use futures::AsyncRead;
249    use tokio::io::duplex;
250    use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
251
252    use super::*;
253    use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
254    use crate::external_provider::MockExternalTempFileProvider;
255
256    #[tokio::test]
257    async fn test_finalized_bloom_filter_storage() {
258        let mut mock_provider = MockExternalTempFileProvider::new();
259
260        let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
261            Arc::new(Mutex::new(HashMap::new()));
262
263        mock_provider.expect_create().returning({
264            let files = Arc::clone(&mock_files);
265            move |file_group, file_id| {
266                assert!(file_group.starts_with("intm-bloom-filters-"));
267                let mut files = files.lock().unwrap();
268                let (writer, reader) = duplex(2 * 1024 * 1024);
269                files.insert(file_id.to_string(), Box::new(reader.compat()));
270                Ok(Box::new(writer.compat_write()))
271            }
272        });
273
274        mock_provider.expect_read_all().returning({
275            let files = Arc::clone(&mock_files);
276            move |file_group| {
277                assert!(file_group.starts_with("intm-bloom-filters-"));
278                let mut files = files.lock().unwrap();
279                Ok(files.drain().collect::<Vec<_>>())
280            }
281        });
282
283        let global_memory_usage = Arc::new(AtomicUsize::new(0));
284        let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
285        let provider = Arc::new(mock_provider);
286        let mut storage = FinalizedBloomFilterStorage::new(
287            provider,
288            global_memory_usage.clone(),
289            global_memory_usage_threshold,
290        );
291
292        let elem_count = 2000;
293        let batch = 1000;
294        let dup_batch = 200;
295
296        for i in 0..(batch - dup_batch) {
297            let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
298            storage.add(elems, elem_count).await.unwrap();
299        }
300        for _ in 0..dup_batch {
301            storage.add(Some(vec![]), 1).await.unwrap();
302        }
303
304        // Flush happens.
305        assert!(storage.intermediate_file_id_counter > 0);
306
307        // Drain the storage.
308        let (indices, mut stream) = storage.drain().await.unwrap();
309        assert_eq!(indices.len(), batch);
310
311        for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
312            let segment = stream.next().await.unwrap().unwrap();
313            assert_eq!(segment.element_count, elem_count);
314
315            let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
316
317            // Check the correctness of the Bloom filter.
318            let bf = BloomFilter::from_vec(v)
319                .seed(&SEED)
320                .expected_items(segment.element_count);
321            for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
322                assert!(bf.contains(&elem));
323            }
324            assert_eq!(indices[i], *idx);
325        }
326
327        // Check the correctness of the duplicated segments.
328        let dup_seg = stream.next().await.unwrap().unwrap();
329        assert_eq!(dup_seg.element_count, 1);
330        assert!(stream.next().await.is_none());
331        assert!(indices[(batch - dup_batch)..batch]
332            .iter()
333            .all(|&x| x == batch - dup_batch));
334    }
335
336    #[tokio::test]
337    async fn test_finalized_bloom_filter_storage_all_dup() {
338        let mock_provider = MockExternalTempFileProvider::new();
339        let global_memory_usage = Arc::new(AtomicUsize::new(0));
340        let global_memory_usage_threshold = Some(1024 * 1024); // 1MB
341        let provider = Arc::new(mock_provider);
342        let mut storage = FinalizedBloomFilterStorage::new(
343            provider,
344            global_memory_usage.clone(),
345            global_memory_usage_threshold,
346        );
347
348        let batch = 1000;
349        for _ in 0..batch {
350            storage.add(Some(vec![]), 1).await.unwrap();
351        }
352
353        // Drain the storage.
354        let (indices, mut stream) = storage.drain().await.unwrap();
355
356        let bf = stream.next().await.unwrap().unwrap();
357        assert_eq!(bf.element_count, 1);
358
359        assert!(stream.next().await.is_none());
360
361        assert_eq!(indices.len(), batch);
362        assert!(indices.iter().all(|&x| x == 0));
363    }
364}