index/bloom_filter/creator/
intermediate_codec.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use asynchronous_codec::{BytesMut, Decoder, Encoder};
16use bytes::{Buf, BufMut};
17use snafu::{ensure, ResultExt};
18
19use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
20use crate::bloom_filter::error::{Error, InvalidIntermediateMagicSnafu, IoSnafu, Result};
21
22/// The magic number for the codec version 1 of the intermediate bloom filter.
23const CODEC_V1_MAGIC: &[u8; 4] = b"bi01";
24
25/// Codec of the intermediate finalized bloom filter segment.
26///
27/// # Format
28///
29/// [ magic ][ elem count ][    size    ][ bloom filter ][ elem count ][    size    ][ bloom filter ]...
30///    [4]       [8]            [8]           [size]         [8]            [8]           [size]
31#[derive(Debug, Default)]
32pub struct IntermediateBloomFilterCodecV1 {
33    handled_header_magic: bool,
34}
35
36impl Encoder for IntermediateBloomFilterCodecV1 {
37    type Item<'a> = FinalizedBloomFilterSegment;
38    type Error = Error;
39
40    fn encode(&mut self, item: FinalizedBloomFilterSegment, dst: &mut BytesMut) -> Result<()> {
41        if !self.handled_header_magic {
42            dst.extend_from_slice(CODEC_V1_MAGIC);
43            self.handled_header_magic = true;
44        }
45
46        let segment_bytes = item.bloom_filter_bytes;
47        let elem_count = item.element_count;
48
49        dst.reserve(2 * std::mem::size_of::<u64>() + segment_bytes.len());
50        dst.put_u64_le(elem_count as u64);
51        dst.put_u64_le(segment_bytes.len() as u64);
52        dst.extend_from_slice(&segment_bytes);
53        Ok(())
54    }
55}
56
57impl Decoder for IntermediateBloomFilterCodecV1 {
58    type Item = FinalizedBloomFilterSegment;
59    type Error = Error;
60
61    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
62        if !self.handled_header_magic {
63            let m_len = CODEC_V1_MAGIC.len();
64            if src.remaining() < m_len {
65                return Ok(None);
66            }
67            let magic_bytes = &src[..m_len];
68            ensure!(
69                magic_bytes == CODEC_V1_MAGIC,
70                InvalidIntermediateMagicSnafu {
71                    invalid: magic_bytes,
72                }
73            );
74            self.handled_header_magic = true;
75            src.advance(m_len);
76        }
77
78        let s = &src[..];
79
80        let u64_size = std::mem::size_of::<u64>();
81        let n_size = u64_size * 2;
82        if s.len() < n_size {
83            return Ok(None);
84        }
85
86        let element_count = u64::from_le_bytes(s[0..u64_size].try_into().unwrap()) as usize;
87        let segment_size = u64::from_le_bytes(s[u64_size..n_size].try_into().unwrap()) as usize;
88
89        if s.len() < n_size + segment_size {
90            return Ok(None);
91        }
92
93        let bloom_filter_bytes = s[n_size..n_size + segment_size].to_vec();
94        src.advance(n_size + segment_size);
95        Ok(Some(FinalizedBloomFilterSegment {
96            element_count,
97            bloom_filter_bytes,
98        }))
99    }
100}
101
102/// Required for [`Encoder`] and [`Decoder`] implementations.
103impl From<std::io::Error> for Error {
104    fn from(error: std::io::Error) -> Self {
105        Err::<(), std::io::Error>(error)
106            .context(IoSnafu)
107            .unwrap_err()
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use asynchronous_codec::{FramedRead, FramedWrite};
114    use futures::io::Cursor;
115    use futures::{SinkExt, StreamExt};
116
117    use super::*;
118    use crate::bloom_filter::creator::finalize_segment::FinalizedBloomFilterSegment;
119
120    #[test]
121    fn test_intermediate_bloom_filter_codec_v1_basic() {
122        let mut encoder = IntermediateBloomFilterCodecV1::default();
123        let mut buf = BytesMut::new();
124
125        let item1 = FinalizedBloomFilterSegment {
126            element_count: 2,
127            bloom_filter_bytes: vec![1, 2, 3, 4],
128        };
129        let item2 = FinalizedBloomFilterSegment {
130            element_count: 3,
131            bloom_filter_bytes: vec![5, 6, 7, 8],
132        };
133        let item3 = FinalizedBloomFilterSegment {
134            element_count: 4,
135            bloom_filter_bytes: vec![9, 10, 11, 12],
136        };
137
138        encoder.encode(item1.clone(), &mut buf).unwrap();
139        encoder.encode(item2.clone(), &mut buf).unwrap();
140        encoder.encode(item3.clone(), &mut buf).unwrap();
141
142        let mut buf = buf.freeze().try_into_mut().unwrap();
143
144        let mut decoder = IntermediateBloomFilterCodecV1::default();
145        let decoded_item1 = decoder.decode(&mut buf).unwrap().unwrap();
146        let decoded_item2 = decoder.decode(&mut buf).unwrap().unwrap();
147        let decoded_item3 = decoder.decode(&mut buf).unwrap().unwrap();
148
149        assert_eq!(item1, decoded_item1);
150        assert_eq!(item2, decoded_item2);
151        assert_eq!(item3, decoded_item3);
152    }
153
154    #[tokio::test]
155    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write() {
156        let item1 = FinalizedBloomFilterSegment {
157            element_count: 2,
158            bloom_filter_bytes: vec![1, 2, 3, 4],
159        };
160        let item2 = FinalizedBloomFilterSegment {
161            element_count: 3,
162            bloom_filter_bytes: vec![5, 6, 7, 8],
163        };
164        let item3 = FinalizedBloomFilterSegment {
165            element_count: 4,
166            bloom_filter_bytes: vec![9, 10, 11, 12],
167        };
168
169        let mut bytes = Cursor::new(vec![]);
170
171        let mut writer = FramedWrite::new(&mut bytes, IntermediateBloomFilterCodecV1::default());
172        writer.send(item1.clone()).await.unwrap();
173        writer.send(item2.clone()).await.unwrap();
174        writer.send(item3.clone()).await.unwrap();
175        writer.flush().await.unwrap();
176        writer.close().await.unwrap();
177
178        let bytes = bytes.into_inner();
179        let mut reader =
180            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
181        let decoded_item1 = reader.next().await.unwrap().unwrap();
182        let decoded_item2 = reader.next().await.unwrap().unwrap();
183        let decoded_item3 = reader.next().await.unwrap().unwrap();
184        assert!(reader.next().await.is_none());
185
186        assert_eq!(item1, decoded_item1);
187        assert_eq!(item2, decoded_item2);
188        assert_eq!(item3, decoded_item3);
189    }
190
191    #[tokio::test]
192    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_only_magic() {
193        let bytes = CODEC_V1_MAGIC.to_vec();
194        let mut reader =
195            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
196        assert!(reader.next().await.is_none());
197    }
198
199    #[tokio::test]
200    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_magic() {
201        let bytes = CODEC_V1_MAGIC[..3].to_vec();
202        let mut reader =
203            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
204        let e = reader.next().await.unwrap();
205        assert!(e.is_err());
206    }
207
208    #[tokio::test]
209    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_partial_item() {
210        let mut bytes = vec![];
211        bytes.extend_from_slice(CODEC_V1_MAGIC);
212        bytes.extend_from_slice(&2u64.to_le_bytes());
213        bytes.extend_from_slice(&4u64.to_le_bytes());
214
215        let mut reader =
216            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
217        let e = reader.next().await.unwrap();
218        assert!(e.is_err());
219    }
220
221    #[tokio::test]
222    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_magic() {
223        let mut bytes = vec![];
224        bytes.extend_from_slice(b"bi02");
225        bytes.extend_from_slice(&2u64.to_le_bytes());
226        bytes.extend_from_slice(&4u64.to_le_bytes());
227        bytes.extend_from_slice(&[1, 2, 3, 4]);
228
229        let mut reader =
230            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
231        let e = reader.next().await.unwrap();
232        assert!(e.is_err());
233    }
234
235    #[tokio::test]
236    async fn test_intermediate_bloom_filter_codec_v1_frame_read_write_corrupted_length() {
237        let mut bytes = vec![];
238        bytes.extend_from_slice(CODEC_V1_MAGIC);
239        bytes.extend_from_slice(&2u64.to_le_bytes());
240        bytes.extend_from_slice(&4u64.to_le_bytes());
241        bytes.extend_from_slice(&[1, 2, 3]);
242
243        let mut reader =
244            FramedRead::new(bytes.as_slice(), IntermediateBloomFilterCodecV1::default());
245        let e = reader.next().await.unwrap();
246        assert!(e.is_err());
247    }
248}