index/inverted_index/create/sort/intermediate_rw/
codec_v1.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16
17use asynchronous_codec::{BytesMut, Decoder, Encoder};
18use bytes::{Buf, BufMut};
19use greptime_proto::v1::index::BitmapType;
20use snafu::ResultExt;
21
22use crate::bitmap::Bitmap;
23use crate::inverted_index::error::{CommonIoSnafu, Error, Result};
24use crate::Bytes;
25
26const U64_LENGTH: usize = std::mem::size_of::<u64>();
27
28/// Magic bytes for this intermediate codec version
29pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01";
30
31/// Serializes items of external sorting intermediate files.
32pub struct IntermediateItemEncoderV1 {
33    pub bitmap_type: BitmapType,
34}
35
36/// [`FramedWrite`] requires the [`Encoder`] trait to be implemented.
37impl Encoder for IntermediateItemEncoderV1 {
38    type Item<'a> = (Bytes, Bitmap);
39    type Error = Error;
40
41    fn encode(&mut self, item: (Bytes, Bitmap), dst: &mut BytesMut) -> Result<()> {
42        let value_bytes = item.0;
43        let bitmap_size = item.1.serialized_size(self.bitmap_type);
44
45        dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_size);
46        dst.put_u64_le(value_bytes.len() as u64);
47        dst.extend_from_slice(&value_bytes);
48        dst.put_u64_le(bitmap_size as u64);
49        item.1
50            .serialize_into(self.bitmap_type, &mut dst.writer())
51            .context(CommonIoSnafu)?;
52
53        Ok(())
54    }
55}
56
57/// Deserializes items of external sorting intermediate files.
58pub struct IntermediateItemDecoderV1 {
59    pub bitmap_type: BitmapType,
60}
61
62/// [`FramedRead`] requires the [`Decoder`] trait to be implemented.
63impl Decoder for IntermediateItemDecoderV1 {
64    type Item = (Bytes, Bitmap);
65    type Error = Error;
66
67    /// Decodes the `src` into `(Bytes, RoaringBitmap)`. Returns `None` if
68    /// the `src` does not contain enough data for a complete item.
69    ///
70    /// Only after successful decoding, the `src` is advanced. Otherwise,
71    /// it is left untouched to wait for filling more data and retrying.
72    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
73        // [value len][value][bitmap len][bitmap]
74        //     [8]     [?]       [8]       [?]
75
76        // decode value len
77        if src.len() < U64_LENGTH {
78            return Ok(None);
79        }
80        let (value_len, buf) = src.split_at(U64_LENGTH);
81        let value_len = u64::from_le_bytes(value_len.try_into().unwrap()) as usize;
82
83        // decode value
84        if buf.len() < value_len {
85            return Ok(None);
86        }
87        let (value_bytes, buf) = buf.split_at(value_len);
88
89        // decode bitmap len
90        if buf.len() < U64_LENGTH {
91            return Ok(None);
92        }
93        let (bitmap_len, buf) = buf.split_at(U64_LENGTH);
94        let bitmap_len = u64::from_le_bytes(bitmap_len.try_into().unwrap()) as usize;
95
96        // decode bitmap
97        if buf.len() < bitmap_len {
98            return Ok(None);
99        }
100
101        let bitmap = Bitmap::deserialize_from(&buf[..bitmap_len], self.bitmap_type)
102            .context(CommonIoSnafu)?;
103
104        let item = (value_bytes.to_vec(), bitmap);
105
106        src.advance(U64_LENGTH * 2 + value_len + bitmap_len);
107        Ok(Some(item))
108    }
109}
110
111/// Required for [`Encoder`] and [`Decoder`] implementations.
112impl From<io::Error> for Error {
113    fn from(error: io::Error) -> Self {
114        Err::<(), io::Error>(error)
115            .context(CommonIoSnafu)
116            .unwrap_err()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    fn bitmap(bytes: &[u8]) -> Bitmap {
125        Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
126    }
127
128    #[test]
129    fn test_intermediate_codec_basic() {
130        let mut encoder = IntermediateItemEncoderV1 {
131            bitmap_type: BitmapType::Roaring,
132        };
133        let mut buf = BytesMut::new();
134
135        let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
136        encoder.encode(item.clone(), &mut buf).unwrap();
137
138        let mut decoder = IntermediateItemDecoderV1 {
139            bitmap_type: BitmapType::Roaring,
140        };
141        assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
142        assert_eq!(decoder.decode(&mut buf).unwrap(), None);
143
144        let item1 = (b"world".to_vec(), bitmap(&[0b01010101]));
145        encoder.encode(item.clone(), &mut buf).unwrap();
146        encoder.encode(item1.clone(), &mut buf).unwrap();
147        assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
148        assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item1);
149        assert_eq!(decoder.decode(&mut buf).unwrap(), None);
150        assert!(buf.is_empty());
151    }
152
153    #[test]
154    fn test_intermediate_codec_empty_item() {
155        let mut encoder = IntermediateItemEncoderV1 {
156            bitmap_type: BitmapType::Roaring,
157        };
158        let mut buf = BytesMut::new();
159
160        let item = (b"".to_vec(), bitmap(&[]));
161        encoder.encode(item.clone(), &mut buf).unwrap();
162
163        let mut decoder = IntermediateItemDecoderV1 {
164            bitmap_type: BitmapType::Roaring,
165        };
166        assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
167        assert_eq!(decoder.decode(&mut buf).unwrap(), None);
168        assert!(buf.is_empty());
169    }
170
171    #[test]
172    fn test_intermediate_codec_partial() {
173        let mut encoder = IntermediateItemEncoderV1 {
174            bitmap_type: BitmapType::Roaring,
175        };
176        let mut buf = BytesMut::new();
177
178        let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
179        encoder.encode(item.clone(), &mut buf).unwrap();
180
181        let partial_length = U64_LENGTH + 3;
182        let mut partial_bytes = buf.split_to(partial_length);
183
184        let mut decoder = IntermediateItemDecoderV1 {
185            bitmap_type: BitmapType::Roaring,
186        };
187        assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); // not enough data
188        partial_bytes.extend_from_slice(&buf[..]);
189        assert_eq!(decoder.decode(&mut partial_bytes).unwrap().unwrap(), item);
190        assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None);
191        assert!(partial_bytes.is_empty());
192    }
193}