index/bloom_filter/creator/
intermediate_codec.rs1use asynchronous_codec::{BytesMut, Decoder, Encoder};
16use bytes::{Buf, BufMut};
17use snafu::{ensure, ResultExt};
18
19use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
20use crate::bloom_filter::error::{Error, InvalidIntermediateMagicSnafu, IoSnafu, Result};
21
22const CODEC_V1_MAGIC: &[u8; 4] = b"bi01";
24
25#[derive(Debug, Default)]
32pub struct IntermediateBloomFilterCodecV1 {
33 handled_header_magic: bool,
34}
35
36impl Encoder for IntermediateBloomFilterCodecV1 {
37 type Item<'a> = FinalizedBloomFilterSegment;
38 type Error = Error;
39
40 fn encode(&mut self, item: FinalizedBloomFilterSegment, dst: &mut BytesMut) -> Result<()> {
41 if !self.handled_header_magic {
42 dst.extend_from_slice(CODEC_V1_MAGIC);
43 self.handled_header_magic = true;
44 }
45
46 let segment_bytes = item.bloom_filter_bytes;
47 let elem_count = item.element_count;
48
49 dst.reserve(2 * std::mem::size_of::<u64>() + segment_bytes.len());
50 dst.put_u64_le(elem_count as u64);
51 dst.put_u64_le(segment_bytes.len() as u64);
52 dst.extend_from_slice(&segment_bytes);
53 Ok(())
54 }
55}
56
57impl Decoder for IntermediateBloomFilterCodecV1 {
58 type Item = FinalizedBloomFilterSegment;
59 type Error = Error;
60
61 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
62 if !self.handled_header_magic {
63 let m_len = CODEC_V1_MAGIC.len();
64 if src.remaining() < m_len {
65 return Ok(None);
66 }
67 let magic_bytes = &src[..m_len];
68 ensure!(
69 magic_bytes == CODEC_V1_MAGIC,
70 InvalidIntermediateMagicSnafu {
71 invalid: magic_bytes,
72 }
73 );
74 self.handled_header_magic = true;
75 src.advance(m_len);
76 }
77
78 let s = &src[..];
79
80 let u64_size = std::mem::size_of::<u64>();
81 let n_size = u64_size * 2;
82 if s.len() < n_size {
83 return Ok(None);
84 }
85
86 let element_count = u64::from_le_bytes(s[0..u64_size].try_into().unwrap()) as usize;
87 let segment_size = u64::from_le_bytes(s[u64_size..n_size].try_into().unwrap()) as usize;
88
89 if s.len() < n_size + segment_size {
90 return Ok(None);
91 }
92
93 let bloom_filter_bytes = s[n_size..n_size + segment_size].to_vec();
94 src.advance(n_size + segment_size);
95 Ok(Some(FinalizedBloomFilterSegment {
96 element_count,
97 bloom_filter_bytes,
98 }))
99 }
100}
101
102impl From<std::io::Error> for Error {
104 fn from(error: std::io::Error) -> Self {
105 Err::<(), std::io::Error>(error)
106 .context(IoSnafu)
107 .unwrap_err()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use asynchronous_codec::{FramedRead, FramedWrite};
114 use futures::io::Cursor;
115 use futures::{SinkExt, StreamExt};
116
117 use super::*;
118 use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
119
120 #[test]
121 fn test_intermediate_bloom_filter_codec_v1_basic() {
122 let mut encoder = IntermediateBloomFilterCodecV1::default();
123 let mut buf = BytesMut::new();
124
125 let item1 = FinalizedBloomFilterSegment {
126 element_count: 2,
127 bloom_filter_bytes: vec![1, 2, 3, 4],
128 };
129 let item2 = FinalizedBloomFilterSegment {
130 element_count: 3,
131 bloom_filter_bytes: vec![5, 6, 7, 8],
132 };
133 let item3 = FinalizedBloomFilterSegment {
134 element_count: 4,
135 bloom_filter_bytes: vec![9, 10, 11, 12],
136 };
137
138 encoder.encode(item1.clone(), &mut buf).unwrap();
139 encoder.encode(item2.clone(), &mut buf).unwrap();
140 encoder.encode(item3.clone(), &mut buf).unwrap();
141
142 let mut buf = buf.freeze().try_into_mut().unwrap();
143
144 let mut decoder = IntermediateBloomFilterCodecV1::default();
145 let decoded_item1 = decoder.decode(&mut buf).unwrap().unwrap();
146 let decoded_item2 = decoder.decode(&mut buf).unwrap().unwrap();
147 let decoded_item3 = decoder.decode(&mut buf).unwrap().unwrap();
148
149 assert_eq!(item1, decoded_item1);
150 assert_eq!(item2, decoded_item2);
151 assert_eq!(item3, decoded_item3);
152 }
153
154 #[tokio::test]
155 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write() {
156 let item1 = FinalizedBloomFilterSegment {
157 element_count: 2,
158 bloom_filter_bytes: vec![1, 2, 3, 4],
159 };
160 let item2 = FinalizedBloomFilterSegment {
161 element_count: 3,
162 bloom_filter_bytes: vec![5, 6, 7, 8],
163 };
164 let item3 = FinalizedBloomFilterSegment {
165 element_count: 4,
166 bloom_filter_bytes: vec![9, 10, 11, 12],
167 };
168
169 let mut bytes = Cursor::new(vec![]);
170
171 let mut writer = FramedWrite::new(&mut bytes, IntermediateBloomFilterCodecV1::default());
172 writer.send(item1.clone()).await.unwrap();
173 writer.send(item2.clone()).await.unwrap();
174 writer.send(item3.clone()).await.unwrap();
175 writer.flush().await.unwrap();
176 writer.close().await.unwrap();
177
178 let bytes = bytes.into_inner();
179 let mut reader =
180 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
181 let decoded_item1 = reader.next().await.unwrap().unwrap();
182 let decoded_item2 = reader.next().await.unwrap().unwrap();
183 let decoded_item3 = reader.next().await.unwrap().unwrap();
184 assert!(reader.next().await.is_none());
185
186 assert_eq!(item1, decoded_item1);
187 assert_eq!(item2, decoded_item2);
188 assert_eq!(item3, decoded_item3);
189 }
190
191 #[tokio::test]
192 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_only_magic() {
193 let bytes = CODEC_V1_MAGIC.to_vec();
194 let mut reader =
195 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
196 assert!(reader.next().await.is_none());
197 }
198
199 #[tokio::test]
200 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_magic() {
201 let bytes = CODEC_V1_MAGIC[..3].to_vec();
202 let mut reader =
203 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
204 let e = reader.next().await.unwrap();
205 assert!(e.is_err());
206 }
207
208 #[tokio::test]
209 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_item() {
210 let mut bytes = vec![];
211 bytes.extend_from_slice(CODEC_V1_MAGIC);
212 bytes.extend_from_slice(&2u64.to_le_bytes());
213 bytes.extend_from_slice(&4u64.to_le_bytes());
214
215 let mut reader =
216 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
217 let e = reader.next().await.unwrap();
218 assert!(e.is_err());
219 }
220
221 #[tokio::test]
222 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_magic() {
223 let mut bytes = vec![];
224 bytes.extend_from_slice(b"bi02");
225 bytes.extend_from_slice(&2u64.to_le_bytes());
226 bytes.extend_from_slice(&4u64.to_le_bytes());
227 bytes.extend_from_slice(&[1, 2, 3, 4]);
228
229 let mut reader =
230 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
231 let e = reader.next().await.unwrap();
232 assert!(e.is_err());
233 }
234
235 #[tokio::test]
236 async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_length() {
237 let mut bytes = vec![];
238 bytes.extend_from_slice(CODEC_V1_MAGIC);
239 bytes.extend_from_slice(&2u64.to_le_bytes());
240 bytes.extend_from_slice(&4u64.to_le_bytes());
241 bytes.extend_from_slice(&[1, 2, 3]);
242
243 let mut reader =
244 FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
245 let e = reader.next().await.unwrap();
246 assert!(e.is_err());
247 }
248}