index/bloom_filter/creator/
finalize_segment.rs1use std::pin::Pin;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use asynchronous_codec::{FramedRead, FramedWrite};
20use fastbloom::BloomFilter;
21use futures::stream::StreamExt;
22use futures::{stream, AsyncWriteExt, Stream};
23use snafu::ResultExt;
24
25use crate::bloom_filter::creator::intermediate_codec::IntermediateBloomFilterCodecV1;
26use crate::bloom_filter::creator::SEED;
27use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
28use crate::external_provider::ExternalTempFileProvider;
29use crate::Bytes;
30
31const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; pub struct FinalizedBloomFilterStorage {
36 false_positive_rate: f64,
38
39 segment_indices: Vec<usize>,
41
42 in_memory: Vec<FinalizedBloomFilterSegment>,
44
45 intermediate_file_id_counter: usize,
47
48 intermediate_prefix: String,
50
51 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
53
54 memory_usage: usize,
56
57 global_memory_usage: Arc<AtomicUsize>,
60
61 global_memory_usage_threshold: Option<usize>,
63
64 flushed_seg_count: usize,
66}
67
68impl FinalizedBloomFilterStorage {
69 pub fn new(
71 false_positive_rate: f64,
72 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
73 global_memory_usage: Arc<AtomicUsize>,
74 global_memory_usage_threshold: Option<usize>,
75 ) -> Self {
76 let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
77 Self {
78 false_positive_rate,
79 segment_indices: Vec::new(),
80 in_memory: Vec::new(),
81 intermediate_file_id_counter: 0,
82 intermediate_prefix: external_prefix,
83 intermediate_provider,
84 memory_usage: 0,
85 global_memory_usage,
86 global_memory_usage_threshold,
87 flushed_seg_count: 0,
88 }
89 }
90
91 pub fn memory_usage(&self) -> usize {
93 self.memory_usage
94 }
95
96 pub async fn add(
100 &mut self,
101 elems: impl IntoIterator<Item = Bytes>,
102 element_count: usize,
103 ) -> Result<()> {
104 let mut bf = BloomFilter::with_false_pos(self.false_positive_rate)
105 .seed(&SEED)
106 .expected_items(element_count);
107 for elem in elems.into_iter() {
108 bf.insert(&elem);
109 }
110
111 let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
112
113 if self.in_memory.last() == Some(&fbf) {
115 self.segment_indices
116 .push(self.flushed_seg_count + self.in_memory.len() - 1);
117 return Ok(());
118 }
119
120 let memory_diff = fbf.bloom_filter_bytes.len();
122 self.memory_usage += memory_diff;
123 self.global_memory_usage
124 .fetch_add(memory_diff, Ordering::Relaxed);
125
126 self.in_memory.push(fbf);
128 self.segment_indices
129 .push(self.flushed_seg_count + self.in_memory.len() - 1);
130
131 if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
135 return Ok(());
136 }
137
138 if let Some(threshold) = self.global_memory_usage_threshold {
140 let global = self.global_memory_usage.load(Ordering::Relaxed);
141
142 if global > threshold {
143 self.flush_in_memory_to_disk().await?;
144
145 self.global_memory_usage
146 .fetch_sub(self.memory_usage, Ordering::Relaxed);
147 self.memory_usage = 0;
148 }
149 }
150
151 Ok(())
152 }
153
154 pub async fn drain(
156 &mut self,
157 ) -> Result<(
158 Vec<usize>,
159 Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
160 )> {
161 if self.intermediate_file_id_counter == 0 {
163 return Ok((
164 std::mem::take(&mut self.segment_indices),
165 Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
166 ));
167 }
168
169 let mut on_disk = self
171 .intermediate_provider
172 .read_all(&self.intermediate_prefix)
173 .await
174 .context(IntermediateSnafu)?;
175 on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
176
177 let streams = on_disk
178 .into_iter()
179 .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
180
181 let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
182 Ok((
183 std::mem::take(&mut self.segment_indices),
184 Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
185 ))
186 }
187
188 async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
190 let file_id = self.intermediate_file_id_counter;
191 self.intermediate_file_id_counter += 1;
192 self.flushed_seg_count += self.in_memory.len();
193
194 let file_id = format!("{:08}", file_id);
195 let mut writer = self
196 .intermediate_provider
197 .create(&self.intermediate_prefix, &file_id)
198 .await
199 .context(IntermediateSnafu)?;
200
201 let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
202 if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
204 .forward(fw)
205 .await
206 {
207 writer.close().await.context(IoSnafu)?;
208 writer.flush().await.context(IoSnafu)?;
209 return Err(e);
210 }
211
212 Ok(())
213 }
214}
215
216impl Drop for FinalizedBloomFilterStorage {
217 fn drop(&mut self) {
218 self.global_memory_usage
219 .fetch_sub(self.memory_usage, Ordering::Relaxed);
220 }
221}
222
223#[derive(Debug, Clone, PartialEq, Eq)]
225pub struct FinalizedBloomFilterSegment {
226 pub bloom_filter_bytes: Vec<u8>,
228
229 pub element_count: usize,
231}
232
233impl FinalizedBloomFilterSegment {
234 fn from(bf: BloomFilter, elem_count: usize) -> Self {
235 let bf_slice = bf.as_slice();
236 let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
237 for &x in bf_slice {
238 bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
239 }
240
241 Self {
242 bloom_filter_bytes,
243 element_count: elem_count,
244 }
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::collections::HashMap;
251 use std::sync::Mutex;
252
253 use futures::AsyncRead;
254 use tokio::io::duplex;
255 use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
256
257 use super::*;
258 use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
259 use crate::external_provider::MockExternalTempFileProvider;
260
261 #[tokio::test]
262 async fn test_finalized_bloom_filter_storage() {
263 let mut mock_provider = MockExternalTempFileProvider::new();
264
265 let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
266 Arc::new(Mutex::new(HashMap::new()));
267
268 mock_provider.expect_create().returning({
269 let files = Arc::clone(&mock_files);
270 move |file_group, file_id| {
271 assert!(file_group.starts_with("intm-bloom-filters-"));
272 let mut files = files.lock().unwrap();
273 let (writer, reader) = duplex(2 * 1024 * 1024);
274 files.insert(file_id.to_string(), Box::new(reader.compat()));
275 Ok(Box::new(writer.compat_write()))
276 }
277 });
278
279 mock_provider.expect_read_all().returning({
280 let files = Arc::clone(&mock_files);
281 move |file_group| {
282 assert!(file_group.starts_with("intm-bloom-filters-"));
283 let mut files = files.lock().unwrap();
284 Ok(files.drain().collect::<Vec<_>>())
285 }
286 });
287
288 let global_memory_usage = Arc::new(AtomicUsize::new(0));
289 let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
291 let mut storage = FinalizedBloomFilterStorage::new(
292 0.01,
293 provider,
294 global_memory_usage.clone(),
295 global_memory_usage_threshold,
296 );
297
298 let elem_count = 2000;
299 let batch = 1000;
300 let dup_batch = 200;
301
302 for i in 0..(batch - dup_batch) {
303 let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
304 storage.add(elems, elem_count).await.unwrap();
305 }
306 for _ in 0..dup_batch {
307 storage.add(Some(vec![]), 1).await.unwrap();
308 }
309
310 assert!(storage.intermediate_file_id_counter > 0);
312
313 let (indices, mut stream) = storage.drain().await.unwrap();
315 assert_eq!(indices.len(), batch);
316
317 for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
318 let segment = stream.next().await.unwrap().unwrap();
319 assert_eq!(segment.element_count, elem_count);
320
321 let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
322
323 let bf = BloomFilter::from_vec(v)
325 .seed(&SEED)
326 .expected_items(segment.element_count);
327 for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
328 assert!(bf.contains(&elem));
329 }
330 assert_eq!(indices[i], *idx);
331 }
332
333 let dup_seg = stream.next().await.unwrap().unwrap();
335 assert_eq!(dup_seg.element_count, 1);
336 assert!(stream.next().await.is_none());
337 assert!(indices[(batch - dup_batch)..batch]
338 .iter()
339 .all(|&x| x == batch - dup_batch));
340 }
341
342 #[tokio::test]
343 async fn test_finalized_bloom_filter_storage_all_dup() {
344 let mock_provider = MockExternalTempFileProvider::new();
345 let global_memory_usage = Arc::new(AtomicUsize::new(0));
346 let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
348 let mut storage = FinalizedBloomFilterStorage::new(
349 0.01,
350 provider,
351 global_memory_usage.clone(),
352 global_memory_usage_threshold,
353 );
354
355 let batch = 1000;
356 for _ in 0..batch {
357 storage.add(Some(vec![]), 1).await.unwrap();
358 }
359
360 let (indices, mut stream) = storage.drain().await.unwrap();
362
363 let bf = stream.next().await.unwrap().unwrap();
364 assert_eq!(bf.element_count, 1);
365
366 assert!(stream.next().await.is_none());
367
368 assert_eq!(indices.len(), batch);
369 assert!(indices.iter().all(|&x| x == 0));
370 }
371}