index/inverted_index/create/sort/
intermediate_rw.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Intermediate codec for external sorting.
16//!
17//! This module provides serialization and deserialization logic for
18//! handling intermediate data during the sorting process.
19//! The serialization format is as follows:
20//!
21//! ```text
22//! [magic][item][item]...[item]
23//!    [4]       [?]
24//!
25//! Each [item] is structured as:
26//! [value len][value][bitmap len][bitmap]
27//!     [8]       [?]       [8]        [?]
28//! ```
29//!
30//! Each item represents a value and its associated bitmap, serialized with their lengths for
31//! easier deserialization.
32
33mod codec_v1;
34
35use asynchronous_codec::{FramedRead, FramedWrite};
36use futures::{stream, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
37use snafu::ResultExt;
38
39use crate::bitmap::{Bitmap, BitmapType};
40use crate::inverted_index::create::sort::SortedStream;
41use crate::inverted_index::error::{
42    CloseSnafu, FlushSnafu, ReadSnafu, Result, UnknownIntermediateCodecMagicSnafu, WriteSnafu,
43};
44use crate::Bytes;
45
46/// `IntermediateWriter` serializes and writes intermediate data to the wrapped `writer`
47pub struct IntermediateWriter<W> {
48    writer: W,
49}
50
51impl<W: AsyncWrite + Unpin> IntermediateWriter<W> {
52    /// Creates a new `IntermediateWriter` wrapping an `AsyncWrite`
53    pub fn new(writer: W) -> IntermediateWriter<W> {
54        IntermediateWriter { writer }
55    }
56
57    /// Serializes and writes all provided values to the wrapped writer
58    pub async fn write_all(
59        mut self,
60        values: impl IntoIterator<Item = (Bytes, Bitmap)>,
61    ) -> Result<()> {
62        let (codec_magic, encoder) = (
63            codec_v1::CODEC_V1_MAGIC,
64            codec_v1::IntermediateItemEncoderV1 {
65                bitmap_type: BitmapType::Roaring,
66            },
67        );
68
69        self.writer
70            .write_all(codec_magic)
71            .await
72            .context(WriteSnafu)?;
73
74        let value_stream = stream::iter(values.into_iter().map(Ok));
75        let frame_write = FramedWrite::new(&mut self.writer, encoder);
76        // `forward()` will flush and close the writer when the stream ends
77        if let Err(e) = value_stream.forward(frame_write).await {
78            self.writer.flush().await.context(FlushSnafu)?;
79            self.writer.close().await.context(CloseSnafu)?;
80            return Err(e);
81        }
82
83        Ok(())
84    }
85}
86
87/// Reads intermediate serialized data from an `AsyncRead` source and converts it to a [`SortedStream`]
88pub struct IntermediateReader<R> {
89    reader: R,
90}
91
92impl<R: AsyncRead + Unpin + Send + 'static> IntermediateReader<R> {
93    pub fn new(reader: R) -> IntermediateReader<R> {
94        IntermediateReader { reader }
95    }
96
97    /// Reads the magic header, determines the codec, and returns a stream of deserialized values.
98    pub async fn into_stream(mut self) -> Result<SortedStream> {
99        let mut magic = [0u8; 4];
100        self.reader
101            .read_exact(&mut magic)
102            .await
103            .context(ReadSnafu)?;
104
105        let decoder = match &magic {
106            codec_v1::CODEC_V1_MAGIC => codec_v1::IntermediateItemDecoderV1 {
107                bitmap_type: BitmapType::Roaring,
108            },
109            _ => return UnknownIntermediateCodecMagicSnafu { magic }.fail(),
110        };
111
112        Ok(Box::new(FramedRead::new(self.reader, decoder)))
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use std::collections::BTreeMap;
119    use std::io::{Seek, SeekFrom};
120
121    use futures::io::{AllowStdIo, Cursor};
122    use tempfile::tempfile;
123
124    use super::*;
125    use crate::inverted_index::error::Error;
126
127    fn bitmap(bytes: &[u8]) -> Bitmap {
128        Bitmap::from_lsb0_bytes(bytes, BitmapType::Roaring)
129    }
130
131    #[tokio::test]
132    async fn test_intermediate_read_write_basic() {
133        let file_r = tempfile().unwrap();
134        let file_w = file_r.try_clone().unwrap();
135        let mut buf_r = AllowStdIo::new(file_r);
136        let buf_w = AllowStdIo::new(file_w);
137
138        let values = BTreeMap::from_iter([
139            (Bytes::from("a"), bitmap(&[0b10101010])),
140            (Bytes::from("b"), bitmap(&[0b01010101])),
141        ]);
142
143        let writer = IntermediateWriter::new(buf_w);
144        writer.write_all(values.clone()).await.unwrap();
145        // reset the handle
146        buf_r.seek(SeekFrom::Start(0)).unwrap();
147
148        let reader = IntermediateReader::new(buf_r);
149        let mut stream = reader.into_stream().await.unwrap();
150
151        let a = stream.next().await.unwrap().unwrap();
152        assert_eq!(a, (Bytes::from("a"), bitmap(&[0b10101010])));
153        let b = stream.next().await.unwrap().unwrap();
154        assert_eq!(b, (Bytes::from("b"), bitmap(&[0b01010101])));
155        assert!(stream.next().await.is_none());
156    }
157
158    #[tokio::test]
159    async fn test_intermediate_read_write_empty() {
160        let mut buf = vec![];
161
162        let values = BTreeMap::new();
163
164        let writer = IntermediateWriter::new(&mut buf);
165        writer.write_all(values.clone()).await.unwrap();
166
167        let reader = IntermediateReader::new(Cursor::new(buf));
168        let mut stream = reader.into_stream().await.unwrap();
169
170        assert!(stream.next().await.is_none());
171    }
172
173    #[tokio::test]
174    async fn test_intermediate_read_with_invalid_magic() {
175        let buf = b"invalid".to_vec();
176
177        let reader = IntermediateReader::new(Cursor::new(buf));
178        let result = reader.into_stream().await;
179        assert!(matches!(
180            result,
181            Err(Error::UnknownIntermediateCodecMagic { .. })
182        ))
183    }
184}