index/inverted_index/create/sort/intermediate_rw/
codec_v1.rs1use std::io;
16
17use asynchronous_codec::{BytesMut, Decoder, Encoder};
18use bytes::{Buf, BufMut};
19use greptime_proto::v1::index::BitmapType;
20use snafu::ResultExt;
21
22use crate::bitmap::Bitmap;
23use crate::inverted_index::error::{CommonIoSnafu, Error, Result};
24use crate::Bytes;
25
26const U64_LENGTH: usize = std::mem::size_of::<u64>();
27
28pub const CODEC_V1_MAGIC: &[u8; 4] = b"im01";
30
31pub struct IntermediateItemEncoderV1 {
33 pub bitmap_type: BitmapType,
34}
35
36impl Encoder for IntermediateItemEncoderV1 {
38 type Item<'a> = (Bytes, Bitmap);
39 type Error = Error;
40
41 fn encode(&mut self, item: (Bytes, Bitmap), dst: &mut BytesMut) -> Result<()> {
42 let value_bytes = item.0;
43 let bitmap_size = item.1.serialized_size(self.bitmap_type);
44
45 dst.reserve(U64_LENGTH * 2 + value_bytes.len() + bitmap_size);
46 dst.put_u64_le(value_bytes.len() as u64);
47 dst.extend_from_slice(&value_bytes);
48 dst.put_u64_le(bitmap_size as u64);
49 item.1
50 .serialize_into(self.bitmap_type, &mut dst.writer())
51 .context(CommonIoSnafu)?;
52
53 Ok(())
54 }
55}
56
57pub struct IntermediateItemDecoderV1 {
59 pub bitmap_type: BitmapType,
60}
61
62impl Decoder for IntermediateItemDecoderV1 {
64 type Item = (Bytes, Bitmap);
65 type Error = Error;
66
67 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>> {
73 if src.len() < U64_LENGTH {
78 return Ok(None);
79 }
80 let (value_len, buf) = src.split_at(U64_LENGTH);
81 let value_len = u64::from_le_bytes(value_len.try_into().unwrap()) as usize;
82
83 if buf.len() < value_len {
85 return Ok(None);
86 }
87 let (value_bytes, buf) = buf.split_at(value_len);
88
89 if buf.len() < U64_LENGTH {
91 return Ok(None);
92 }
93 let (bitmap_len, buf) = buf.split_at(U64_LENGTH);
94 let bitmap_len = u64::from_le_bytes(bitmap_len.try_into().unwrap()) as usize;
95
96 if buf.len() < bitmap_len {
98 return Ok(None);
99 }
100
101 let bitmap = Bitmap::deserialize_from(&buf[..bitmap_len], self.bitmap_type)
102 .context(CommonIoSnafu)?;
103
104 let item = (value_bytes.to_vec(), bitmap);
105
106 src.advance(U64_LENGTH * 2 + value_len + bitmap_len);
107 Ok(Some(item))
108 }
109}
110
111impl From<io::Error> for Error {
113 fn from(error: io::Error) -> Self {
114 Err::<(), io::Error>(error)
115 .context(CommonIoSnafu)
116 .unwrap_err()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 fn bitmap(bytes: &[u8]) -> Bitmap {
125 Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
126 }
127
128 #[test]
129 fn test_intermediate_codec_basic() {
130 let mut encoder = IntermediateItemEncoderV1 {
131 bitmap_type: BitmapType::Roaring,
132 };
133 let mut buf = BytesMut::new();
134
135 let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
136 encoder.encode(item.clone(), &mut buf).unwrap();
137
138 let mut decoder = IntermediateItemDecoderV1 {
139 bitmap_type: BitmapType::Roaring,
140 };
141 assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
142 assert_eq!(decoder.decode(&mut buf).unwrap(), None);
143
144 let item1 = (b"world".to_vec(), bitmap(&[0b01010101]));
145 encoder.encode(item.clone(), &mut buf).unwrap();
146 encoder.encode(item1.clone(), &mut buf).unwrap();
147 assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
148 assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item1);
149 assert_eq!(decoder.decode(&mut buf).unwrap(), None);
150 assert!(buf.is_empty());
151 }
152
153 #[test]
154 fn test_intermediate_codec_empty_item() {
155 let mut encoder = IntermediateItemEncoderV1 {
156 bitmap_type: BitmapType::Roaring,
157 };
158 let mut buf = BytesMut::new();
159
160 let item = (b"".to_vec(), bitmap(&[]));
161 encoder.encode(item.clone(), &mut buf).unwrap();
162
163 let mut decoder = IntermediateItemDecoderV1 {
164 bitmap_type: BitmapType::Roaring,
165 };
166 assert_eq!(decoder.decode(&mut buf).unwrap().unwrap(), item);
167 assert_eq!(decoder.decode(&mut buf).unwrap(), None);
168 assert!(buf.is_empty());
169 }
170
171 #[test]
172 fn test_intermediate_codec_partial() {
173 let mut encoder = IntermediateItemEncoderV1 {
174 bitmap_type: BitmapType::Roaring,
175 };
176 let mut buf = BytesMut::new();
177
178 let item = (b"hello".to_vec(), bitmap(&[0b10101010]));
179 encoder.encode(item.clone(), &mut buf).unwrap();
180
181 let partial_length = U64_LENGTH + 3;
182 let mut partial_bytes = buf.split_to(partial_length);
183
184 let mut decoder = IntermediateItemDecoderV1 {
185 bitmap_type: BitmapType::Roaring,
186 };
187 assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None); partial_bytes.extend_from_slice(&buf[..]);
189 assert_eq!(decoder.decode(&mut partial_bytes).unwrap().unwrap(), item);
190 assert_eq!(decoder.decode(&mut partial_bytes).unwrap(), None);
191 assert!(partial_bytes.is_empty());
192 }
193}