index/bloom_filter/creator/
finalize_segment.rs1use std::pin::Pin;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18
19use asynchronous_codec::{FramedRead, FramedWrite};
20use fastbloom::BloomFilter;
21use futures::stream::StreamExt;
22use futures::{stream, AsyncWriteExt, Stream};
23use snafu::ResultExt;
24
25use crate::bloom_filter::creator::intermediate_codec::IntermediateBloomFilterCodecV1;
26use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
27use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
28use crate::external_provider::ExternalTempFileProvider;
29use crate::Bytes;
30
31const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; pub struct FinalizedBloomFilterStorage {
36 segment_indices: Vec<usize>,
38
39 in_memory: Vec<FinalizedBloomFilterSegment>,
41
42 intermediate_file_id_counter: usize,
44
45 intermediate_prefix: String,
47
48 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
50
51 memory_usage: usize,
53
54 global_memory_usage: Arc<AtomicUsize>,
57
58 global_memory_usage_threshold: Option<usize>,
60
61 flushed_seg_count: usize,
63}
64
65impl FinalizedBloomFilterStorage {
66 pub fn new(
68 intermediate_provider: Arc<dyn ExternalTempFileProvider>,
69 global_memory_usage: Arc<AtomicUsize>,
70 global_memory_usage_threshold: Option<usize>,
71 ) -> Self {
72 let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
73 Self {
74 segment_indices: Vec::new(),
75 in_memory: Vec::new(),
76 intermediate_file_id_counter: 0,
77 intermediate_prefix: external_prefix,
78 intermediate_provider,
79 memory_usage: 0,
80 global_memory_usage,
81 global_memory_usage_threshold,
82 flushed_seg_count: 0,
83 }
84 }
85
86 pub fn memory_usage(&self) -> usize {
88 self.memory_usage
89 }
90
91 pub async fn add(
95 &mut self,
96 elems: impl IntoIterator<Item = Bytes>,
97 element_count: usize,
98 ) -> Result<()> {
99 let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
100 .seed(&SEED)
101 .expected_items(element_count);
102 for elem in elems.into_iter() {
103 bf.insert(&elem);
104 }
105
106 let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
107
108 if self.in_memory.last() == Some(&fbf) {
110 self.segment_indices
111 .push(self.flushed_seg_count + self.in_memory.len() - 1);
112 return Ok(());
113 }
114
115 let memory_diff = fbf.bloom_filter_bytes.len();
117 self.memory_usage += memory_diff;
118 self.global_memory_usage
119 .fetch_add(memory_diff, Ordering::Relaxed);
120
121 self.in_memory.push(fbf);
123 self.segment_indices
124 .push(self.flushed_seg_count + self.in_memory.len() - 1);
125
126 if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
130 return Ok(());
131 }
132
133 if let Some(threshold) = self.global_memory_usage_threshold {
135 let global = self.global_memory_usage.load(Ordering::Relaxed);
136
137 if global > threshold {
138 self.flush_in_memory_to_disk().await?;
139
140 self.global_memory_usage
141 .fetch_sub(self.memory_usage, Ordering::Relaxed);
142 self.memory_usage = 0;
143 }
144 }
145
146 Ok(())
147 }
148
149 pub async fn drain(
151 &mut self,
152 ) -> Result<(
153 Vec<usize>,
154 Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
155 )> {
156 if self.intermediate_file_id_counter == 0 {
158 return Ok((
159 std::mem::take(&mut self.segment_indices),
160 Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
161 ));
162 }
163
164 let mut on_disk = self
166 .intermediate_provider
167 .read_all(&self.intermediate_prefix)
168 .await
169 .context(IntermediateSnafu)?;
170 on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
171
172 let streams = on_disk
173 .into_iter()
174 .map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
175
176 let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
177 Ok((
178 std::mem::take(&mut self.segment_indices),
179 Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
180 ))
181 }
182
183 async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
185 let file_id = self.intermediate_file_id_counter;
186 self.intermediate_file_id_counter += 1;
187 self.flushed_seg_count += self.in_memory.len();
188
189 let file_id = format!("{:08}", file_id);
190 let mut writer = self
191 .intermediate_provider
192 .create(&self.intermediate_prefix, &file_id)
193 .await
194 .context(IntermediateSnafu)?;
195
196 let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
197 if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
199 .forward(fw)
200 .await
201 {
202 writer.close().await.context(IoSnafu)?;
203 writer.flush().await.context(IoSnafu)?;
204 return Err(e);
205 }
206
207 Ok(())
208 }
209}
210
211impl Drop for FinalizedBloomFilterStorage {
212 fn drop(&mut self) {
213 self.global_memory_usage
214 .fetch_sub(self.memory_usage, Ordering::Relaxed);
215 }
216}
217
218#[derive(Debug, Clone, PartialEq, Eq)]
220pub struct FinalizedBloomFilterSegment {
221 pub bloom_filter_bytes: Vec<u8>,
223
224 pub element_count: usize,
226}
227
228impl FinalizedBloomFilterSegment {
229 fn from(bf: BloomFilter, elem_count: usize) -> Self {
230 let bf_slice = bf.as_slice();
231 let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
232 for &x in bf_slice {
233 bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
234 }
235
236 Self {
237 bloom_filter_bytes,
238 element_count: elem_count,
239 }
240 }
241}
242
243#[cfg(test)]
244mod tests {
245 use std::collections::HashMap;
246 use std::sync::Mutex;
247
248 use futures::AsyncRead;
249 use tokio::io::duplex;
250 use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
251
252 use super::*;
253 use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
254 use crate::external_provider::MockExternalTempFileProvider;
255
256 #[tokio::test]
257 async fn test_finalized_bloom_filter_storage() {
258 let mut mock_provider = MockExternalTempFileProvider::new();
259
260 let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
261 Arc::new(Mutex::new(HashMap::new()));
262
263 mock_provider.expect_create().returning({
264 let files = Arc::clone(&mock_files);
265 move |file_group, file_id| {
266 assert!(file_group.starts_with("intm-bloom-filters-"));
267 let mut files = files.lock().unwrap();
268 let (writer, reader) = duplex(2 * 1024 * 1024);
269 files.insert(file_id.to_string(), Box::new(reader.compat()));
270 Ok(Box::new(writer.compat_write()))
271 }
272 });
273
274 mock_provider.expect_read_all().returning({
275 let files = Arc::clone(&mock_files);
276 move |file_group| {
277 assert!(file_group.starts_with("intm-bloom-filters-"));
278 let mut files = files.lock().unwrap();
279 Ok(files.drain().collect::<Vec<_>>())
280 }
281 });
282
283 let global_memory_usage = Arc::new(AtomicUsize::new(0));
284 let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
286 let mut storage = FinalizedBloomFilterStorage::new(
287 provider,
288 global_memory_usage.clone(),
289 global_memory_usage_threshold,
290 );
291
292 let elem_count = 2000;
293 let batch = 1000;
294 let dup_batch = 200;
295
296 for i in 0..(batch - dup_batch) {
297 let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
298 storage.add(elems, elem_count).await.unwrap();
299 }
300 for _ in 0..dup_batch {
301 storage.add(Some(vec![]), 1).await.unwrap();
302 }
303
304 assert!(storage.intermediate_file_id_counter > 0);
306
307 let (indices, mut stream) = storage.drain().await.unwrap();
309 assert_eq!(indices.len(), batch);
310
311 for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
312 let segment = stream.next().await.unwrap().unwrap();
313 assert_eq!(segment.element_count, elem_count);
314
315 let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
316
317 let bf = BloomFilter::from_vec(v)
319 .seed(&SEED)
320 .expected_items(segment.element_count);
321 for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
322 assert!(bf.contains(&elem));
323 }
324 assert_eq!(indices[i], *idx);
325 }
326
327 let dup_seg = stream.next().await.unwrap().unwrap();
329 assert_eq!(dup_seg.element_count, 1);
330 assert!(stream.next().await.is_none());
331 assert!(indices[(batch - dup_batch)..batch]
332 .iter()
333 .all(|&x| x == batch - dup_batch));
334 }
335
336 #[tokio::test]
337 async fn test_finalized_bloom_filter_storage_all_dup() {
338 let mock_provider = MockExternalTempFileProvider::new();
339 let global_memory_usage = Arc::new(AtomicUsize::new(0));
340 let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
342 let mut storage = FinalizedBloomFilterStorage::new(
343 provider,
344 global_memory_usage.clone(),
345 global_memory_usage_threshold,
346 );
347
348 let batch = 1000;
349 for _ in 0..batch {
350 storage.add(Some(vec![]), 1).await.unwrap();
351 }
352
353 let (indices, mut stream) = storage.drain().await.unwrap();
355
356 let bf = stream.next().await.unwrap().unwrap();
357 assert_eq!(bf.element_count, 1);
358
359 assert!(stream.next().await.is_none());
360
361 assert_eq!(indices.len(), batch);
362 assert!(indices.iter().all(|&x| x == 0));
363 }
364}