index/inverted_index/create/sort/
intermediate_rw.rs1mod codec_v1;
34
35use asynchronous_codec::{FramedRead, FramedWrite};
36use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
37use snafu::ResultExt;
38
39use crate::bitmap::{Bitmap, BitmapType};
40use crate::inverted_index::create::sort::SortedStream;
41use crate::inverted_index::error::{
42 CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu,
43};
44use crate::Bytes;
45
46pub struct IntermediateWriter<W> {
48 writer: W,
49}
50
51impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
52 pub fn new(writer: W) -> IntermediateWriter<W> {
54 IntermediateWriter { writer }
55 }
56
57 pub async fn write_all(
59 mut self,
60 values: impl IntoIterator<Item = (Bytes, Bitmap)>,
61 ) -> Result<()> {
62 let (codec_magic, encoder) = (
63 codec_v1::CODEC_V1_MAGIC,
64 codec_v1::IntermediateItemEncoderV1 {
65 bitmap_type: BitmapType::Roaring,
66 },
67 );
68
69 self.writer
70 .write_all(codec_magic)
71 .await
72 .context(WriteSnafu)?;
73
74 let value_stream = stream::iter(values.into_iter().map(Ok));
75 let frame_write = FramedWrite::new(&mut self.writer, encoder);
76 if let Err(e) = value_stream.forward(frame_write).await {
78 self.writer.flush().await.context(FlushSnafu)?;
79 self.writer.close().await.context(CloseSnafu)?;
80 return Err(e);
81 }
82
83 Ok(())
84 }
85}
86
87pub struct IntermediateReader<R> {
89 reader: R,
90}
91
92impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
93 pub fn new(reader: R) -> IntermediateReader<R> {
94 IntermediateReader { reader }
95 }
96
97 pub async fn into_stream(mut self) -> Result<SortedStream> {
99 let mut magic = [0u8; 4];
100 self.reader
101 .read_exact(&mut magic)
102 .await
103 .context(ReadSnafu)?;
104
105 let decoder = match &magic {
106 codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateItemDecoderV1 {
107 bitmap_type: BitmapType::Roaring,
108 },
109 _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(),
110 };
111
112 Ok(Box::new(FramedRead::new(self.reader, decoder)))
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use std::collections::BTreeMap;
119 use std::io::{Seek, SeekFrom};
120
121 use futures::io::{AllowStdIo, Cursor};
122 use tempfile::tempfile;
123
124 use super::*;
125 use crate::inverted_index::error::Error;
126
127 fn bitmap(bytes: &[u8]) -> Bitmap {
128 Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
129 }
130
131 #[tokio::test]
132 async fn test_intermediate_read_write_basic() {
133 let file_r = tempfile().unwrap();
134 let file_w = file_r.try_clone().unwrap();
135 let mut buf_r = AllowStdIo::new(file_r);
136 let buf_w = AllowStdIo::new(file_w);
137
138 let values = BTreeMap::from_iter([
139 (Bytes::from("a"), bitmap(&[0b10101010])),
140 (Bytes::from("b"), bitmap(&[0b01010101])),
141 ]);
142
143 let writer = IntermediateWriter::new(buf_w);
144 writer.write_all(values.clone()).await.unwrap();
145 buf_r.seek(SeekFrom::Start(0)).unwrap();
147
148 let reader = IntermediateReader::new(buf_r);
149 let mut stream = reader.into_stream().await.unwrap();
150
151 let a = stream.next().await.unwrap().unwrap();
152 assert_eq!(a, (Bytes::from("a"), bitmap(&[0b10101010])));
153 let b = stream.next().await.unwrap().unwrap();
154 assert_eq!(b, (Bytes::from("b"), bitmap(&[0b01010101])));
155 assert!(stream.next().await.is_none());
156 }
157
158 #[tokio::test]
159 async fn test_intermediate_read_write_empty() {
160 let mut buf = vec![];
161
162 let values = BTreeMap::new();
163
164 let writer = IntermediateWriter::new(&mut buf);
165 writer.write_all(values.clone()).await.unwrap();
166
167 let reader = IntermediateReader::new(Cursor::new(buf));
168 let mut stream = reader.into_stream().await.unwrap();
169
170 assert!(stream.next().await.is_none());
171 }
172
173 #[tokio::test]
174 async fn test_intermediate_read_with_invalid_magic() {
175 let buf = b"invalid".to_vec();
176
177 let reader = IntermediateReader::new(Cursor::new(buf));
178 let result = reader.into_stream().await;
179 assert!(matches!(
180 result,
181 Err(Error::UnknownIntermediateCodecMagic { .. })
182 ))
183 }
184}