index/bloom_filter/creator/
finalize_segment.rsuse std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use asynchronous_codec::{FramedRead, FramedWrite};
use fastbloom::BloomFilter;
use futures::stream::StreamExt;
use futures::{stream, AsyncWriteExt, Stream};
use snafu::ResultExt;
use super::intermediate_codec::IntermediateBloomFilterCodecV1;
use crate::bloom_filter::creator::{FALSE_POSITIVE_RATE, SEED};
use crate::bloom_filter::error::{IntermediateSnafu, IoSnafu, Result};
use crate::external_provider::ExternalTempFileProvider;
use crate::Bytes;
const MIN_MEMORY_USAGE_THRESHOLD: usize = 1024 * 1024; pub struct FinalizedBloomFilterStorage {
segment_indices: Vec<usize>,
in_memory: Vec<FinalizedBloomFilterSegment>,
intermediate_file_id_counter: usize,
intermediate_prefix: String,
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
memory_usage: usize,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
flushed_seg_count: usize,
}
impl FinalizedBloomFilterStorage {
pub fn new(
intermediate_provider: Arc<dyn ExternalTempFileProvider>,
global_memory_usage: Arc<AtomicUsize>,
global_memory_usage_threshold: Option<usize>,
) -> Self {
let external_prefix = format!("intm-bloom-filters-{}", uuid::Uuid::new_v4());
Self {
segment_indices: Vec::new(),
in_memory: Vec::new(),
intermediate_file_id_counter: 0,
intermediate_prefix: external_prefix,
intermediate_provider,
memory_usage: 0,
global_memory_usage,
global_memory_usage_threshold,
flushed_seg_count: 0,
}
}
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
pub async fn add(
&mut self,
elems: impl IntoIterator<Item = Bytes>,
element_count: usize,
) -> Result<()> {
let mut bf = BloomFilter::with_false_pos(FALSE_POSITIVE_RATE)
.seed(&SEED)
.expected_items(element_count);
for elem in elems.into_iter() {
bf.insert(&elem);
}
let fbf = FinalizedBloomFilterSegment::from(bf, element_count);
if self.in_memory.last() == Some(&fbf) {
self.segment_indices
.push(self.flushed_seg_count + self.in_memory.len() - 1);
return Ok(());
}
let memory_diff = fbf.bloom_filter_bytes.len();
self.memory_usage += memory_diff;
self.global_memory_usage
.fetch_add(memory_diff, Ordering::Relaxed);
self.in_memory.push(fbf);
self.segment_indices
.push(self.flushed_seg_count + self.in_memory.len() - 1);
if self.memory_usage < MIN_MEMORY_USAGE_THRESHOLD {
return Ok(());
}
if let Some(threshold) = self.global_memory_usage_threshold {
let global = self.global_memory_usage.load(Ordering::Relaxed);
if global > threshold {
self.flush_in_memory_to_disk().await?;
self.global_memory_usage
.fetch_sub(self.memory_usage, Ordering::Relaxed);
self.memory_usage = 0;
}
}
Ok(())
}
pub async fn drain(
&mut self,
) -> Result<(
Vec<usize>,
Pin<Box<dyn Stream<Item = Result<FinalizedBloomFilterSegment>> + Send + '_>>,
)> {
if self.intermediate_file_id_counter == 0 {
return Ok((
std::mem::take(&mut self.segment_indices),
Box::pin(stream::iter(self.in_memory.drain(..).map(Ok))),
));
}
let mut on_disk = self
.intermediate_provider
.read_all(&self.intermediate_prefix)
.await
.context(IntermediateSnafu)?;
on_disk.sort_unstable_by(|x, y| x.0.cmp(&y.0));
let streams = on_disk
.into_iter()
.map(|(_, reader)| FramedRead::new(reader, IntermediateBloomFilterCodecV1::default()));
let in_memory_stream = stream::iter(self.in_memory.drain(..)).map(Ok);
Ok((
std::mem::take(&mut self.segment_indices),
Box::pin(stream::iter(streams).flatten().chain(in_memory_stream)),
))
}
async fn flush_in_memory_to_disk(&mut self) -> Result<()> {
let file_id = self.intermediate_file_id_counter;
self.intermediate_file_id_counter += 1;
self.flushed_seg_count += self.in_memory.len();
let file_id = format!("{:08}", file_id);
let mut writer = self
.intermediate_provider
.create(&self.intermediate_prefix, &file_id)
.await
.context(IntermediateSnafu)?;
let fw = FramedWrite::new(&mut writer, IntermediateBloomFilterCodecV1::default());
if let Err(e) = stream::iter(self.in_memory.drain(..).map(Ok))
.forward(fw)
.await
{
writer.close().await.context(IoSnafu)?;
writer.flush().await.context(IoSnafu)?;
return Err(e);
}
Ok(())
}
}
impl Drop for FinalizedBloomFilterStorage {
fn drop(&mut self) {
self.global_memory_usage
.fetch_sub(self.memory_usage, Ordering::Relaxed);
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FinalizedBloomFilterSegment {
pub bloom_filter_bytes: Vec<u8>,
pub element_count: usize,
}
impl FinalizedBloomFilterSegment {
fn from(bf: BloomFilter, elem_count: usize) -> Self {
let bf_slice = bf.as_slice();
let mut bloom_filter_bytes = Vec::with_capacity(std::mem::size_of_val(bf_slice));
for &x in bf_slice {
bloom_filter_bytes.extend_from_slice(&x.to_le_bytes());
}
Self {
bloom_filter_bytes,
element_count: elem_count,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Mutex;
use futures::AsyncRead;
use tokio::io::duplex;
use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
use super::*;
use crate::bloom_filter::creator::tests::u64_vec_from_bytes;
use crate::external_provider::MockExternalTempFileProvider;
#[tokio::test]
async fn test_finalized_bloom_filter_storage() {
let mut mock_provider = MockExternalTempFileProvider::new();
let mock_files: Arc<Mutex<HashMap<String, Box<dyn AsyncRead + Unpin + Send>>>> =
Arc::new(Mutex::new(HashMap::new()));
mock_provider.expect_create().returning({
let files = Arc::clone(&mock_files);
move |file_group, file_id| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
let (writer, reader) = duplex(2 * 1024 * 1024);
files.insert(file_id.to_string(), Box::new(reader.compat()));
Ok(Box::new(writer.compat_write()))
}
});
mock_provider.expect_read_all().returning({
let files = Arc::clone(&mock_files);
move |file_group| {
assert!(file_group.starts_with("intm-bloom-filters-"));
let mut files = files.lock().unwrap();
Ok(files.drain().collect::<Vec<_>>())
}
});
let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,
);
let elem_count = 2000;
let batch = 1000;
let dup_batch = 200;
for i in 0..(batch - dup_batch) {
let elems = (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes());
storage.add(elems, elem_count).await.unwrap();
}
for _ in 0..dup_batch {
storage.add(Some(vec![]), 1).await.unwrap();
}
assert!(storage.intermediate_file_id_counter > 0);
let (indices, mut stream) = storage.drain().await.unwrap();
assert_eq!(indices.len(), batch);
for (i, idx) in indices.iter().enumerate().take(batch - dup_batch) {
let segment = stream.next().await.unwrap().unwrap();
assert_eq!(segment.element_count, elem_count);
let v = u64_vec_from_bytes(&segment.bloom_filter_bytes);
let bf = BloomFilter::from_vec(v)
.seed(&SEED)
.expected_items(segment.element_count);
for elem in (elem_count * i..elem_count * (i + 1)).map(|x| x.to_string().into_bytes()) {
assert!(bf.contains(&elem));
}
assert_eq!(indices[i], *idx);
}
let dup_seg = stream.next().await.unwrap().unwrap();
assert_eq!(dup_seg.element_count, 1);
assert!(stream.next().await.is_none());
assert!(indices[(batch - dup_batch)..batch]
.iter()
.all(|&x| x == batch - dup_batch));
}
#[tokio::test]
async fn test_finalized_bloom_filter_storage_all_dup() {
let mock_provider = MockExternalTempFileProvider::new();
let global_memory_usage = Arc::new(AtomicUsize::new(0));
let global_memory_usage_threshold = Some(1024 * 1024); let provider = Arc::new(mock_provider);
let mut storage = FinalizedBloomFilterStorage::new(
provider,
global_memory_usage.clone(),
global_memory_usage_threshold,
);
let batch = 1000;
for _ in 0..batch {
storage.add(Some(vec![]), 1).await.unwrap();
}
let (indices, mut stream) = storage.drain().await.unwrap();
let bf = stream.next().await.unwrap().unwrap();
assert_eq!(bf.element_count, 1);
assert!(stream.next().await.is_none());
assert_eq!(indices.len(), batch);
assert!(indices.iter().all(|&x| x == 0));
}
}