index/inverted_index/format/writer/
blob.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZeroUsize;
16
17use async_trait::async_trait;
18use futures::{AsyncWrite, AsyncWriteExt};
19use greptime_proto::v1::index::InvertedIndexMetas;
20use prost::Message;
21use snafu::ResultExt;
22
23use crate::bitmap::{Bitmap, BitmapType};
24use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
25use crate::inverted_index::format::writer::single::SingleIndexWriter;
26use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
27
28/// `InvertedIndexBlobWriter`, implemented [`InvertedIndexWriter`], manages
29/// writing of an inverted index to a blob storage.
30pub struct InvertedIndexBlobWriter<W> {
31    /// The underlying blob storage
32    blob_writer: W,
33
34    /// Tracks the total number of bytes written to the storage so far
35    written_size: u64,
36
37    /// Metadata about each index that has been written  
38    metas: InvertedIndexMetas,
39}
40
41#[async_trait]
42impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
43    async fn add_index(
44        &mut self,
45        name: String,
46        null_bitmap: Bitmap,
47        values: ValueStream,
48        bitmap_type: BitmapType,
49    ) -> Result<()> {
50        let single_writer = SingleIndexWriter::new(
51            name.clone(),
52            self.written_size,
53            null_bitmap,
54            values,
55            &mut self.blob_writer,
56            bitmap_type,
57        );
58        let metadata = single_writer.write().await?;
59
60        self.written_size += metadata.inverted_index_size;
61        self.metas.metas.insert(name, metadata);
62
63        Ok(())
64    }
65
66    async fn finish(
67        &mut self,
68        total_row_count: u64,
69        segment_row_count: NonZeroUsize,
70    ) -> Result<()> {
71        self.metas.segment_row_count = segment_row_count.get() as _;
72        self.metas.total_row_count = total_row_count;
73
74        let metas_bytes = self.metas.encode_to_vec();
75        self.blob_writer
76            .write_all(&metas_bytes)
77            .await
78            .context(WriteSnafu)?;
79
80        let footer_size = metas_bytes.len() as u32;
81        self.blob_writer
82            .write_all(&footer_size.to_le_bytes())
83            .await
84            .context(WriteSnafu)?;
85
86        self.blob_writer.flush().await.context(FlushSnafu)?;
87        self.blob_writer.close().await.context(CloseSnafu)?;
88        Ok(())
89    }
90}
91
92impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
93    pub fn new(blob_writer: W) -> InvertedIndexBlobWriter<W> {
94        InvertedIndexBlobWriter {
95            blob_writer,
96            written_size: 0,
97            metas: InvertedIndexMetas::default(),
98        }
99    }
100}
101
102#[cfg(test)]
103mod tests {
104    use futures::stream;
105    use greptime_proto::v1::index::BitmapType;
106
107    use super::*;
108    use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
109    use crate::Bytes;
110
111    fn unpack(fst_value: u64) -> [u32; 2] {
112        bytemuck::cast::<u64, [u32; 2]>(fst_value)
113    }
114
115    #[tokio::test]
116    async fn test_inverted_index_blob_writer_write_empty() {
117        let mut blob = Vec::new();
118        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
119        writer
120            .finish(8, NonZeroUsize::new(1).unwrap())
121            .await
122            .unwrap();
123
124        let reader = InvertedIndexBlobReader::new(blob);
125        let metadata = reader.metadata().await.unwrap();
126        assert_eq!(metadata.total_row_count, 8);
127        assert_eq!(metadata.segment_row_count, 1);
128        assert_eq!(metadata.metas.len(), 0);
129    }
130
131    #[tokio::test]
132    async fn test_inverted_index_blob_writer_write_basic() {
133        let mut blob = Vec::new();
134        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
135        writer
136            .add_index(
137                "tag0".to_string(),
138                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
139                Box::new(stream::iter(vec![
140                    Ok((
141                        Bytes::from("a"),
142                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
143                    )),
144                    Ok((
145                        Bytes::from("b"),
146                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
147                    )),
148                    Ok((
149                        Bytes::from("c"),
150                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
151                    )),
152                ])),
153                BitmapType::Roaring,
154            )
155            .await
156            .unwrap();
157        writer
158            .add_index(
159                "tag1".to_string(),
160                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
161                Box::new(stream::iter(vec![
162                    Ok((
163                        Bytes::from("x"),
164                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
165                    )),
166                    Ok((
167                        Bytes::from("y"),
168                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
169                    )),
170                    Ok((
171                        Bytes::from("z"),
172                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
173                    )),
174                ])),
175                BitmapType::Roaring,
176            )
177            .await
178            .unwrap();
179        writer
180            .finish(8, NonZeroUsize::new(1).unwrap())
181            .await
182            .unwrap();
183
184        let reader = InvertedIndexBlobReader::new(blob);
185        let metadata = reader.metadata().await.unwrap();
186        assert_eq!(metadata.total_row_count, 8);
187        assert_eq!(metadata.segment_row_count, 1);
188        assert_eq!(metadata.metas.len(), 2);
189
190        // tag0
191        let tag0 = metadata.metas.get("tag0").unwrap();
192        let stats0 = tag0.stats.as_ref().unwrap();
193        assert_eq!(stats0.distinct_count, 3);
194        assert_eq!(stats0.null_count, 1);
195        assert_eq!(stats0.min_value, Bytes::from("a"));
196        assert_eq!(stats0.max_value, Bytes::from("c"));
197        let fst0 = reader
198            .fst(
199                tag0.base_offset + tag0.relative_fst_offset as u64,
200                tag0.fst_size,
201            )
202            .await
203            .unwrap();
204        assert_eq!(fst0.len(), 3);
205        let [offset, size] = unpack(fst0.get(b"a").unwrap());
206        let bitmap = reader
207            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
208            .await
209            .unwrap();
210        assert_eq!(
211            bitmap,
212            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
213        );
214        let [offset, size] = unpack(fst0.get(b"b").unwrap());
215        let bitmap = reader
216            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
217            .await
218            .unwrap();
219        assert_eq!(
220            bitmap,
221            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
222        );
223        let [offset, size] = unpack(fst0.get(b"c").unwrap());
224        let bitmap = reader
225            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
226            .await
227            .unwrap();
228        assert_eq!(
229            bitmap,
230            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
231        );
232
233        // tag1
234        let tag1 = metadata.metas.get("tag1").unwrap();
235        let stats1 = tag1.stats.as_ref().unwrap();
236        assert_eq!(stats1.distinct_count, 3);
237        assert_eq!(stats1.null_count, 1);
238        assert_eq!(stats1.min_value, Bytes::from("x"));
239        assert_eq!(stats1.max_value, Bytes::from("z"));
240        let fst1 = reader
241            .fst(
242                tag1.base_offset + tag1.relative_fst_offset as u64,
243                tag1.fst_size,
244            )
245            .await
246            .unwrap();
247        assert_eq!(fst1.len(), 3);
248        let [offset, size] = unpack(fst1.get(b"x").unwrap());
249        let bitmap = reader
250            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
251            .await
252            .unwrap();
253        assert_eq!(
254            bitmap,
255            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
256        );
257        let [offset, size] = unpack(fst1.get(b"y").unwrap());
258        let bitmap = reader
259            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
260            .await
261            .unwrap();
262        assert_eq!(
263            bitmap,
264            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
265        );
266        let [offset, size] = unpack(fst1.get(b"z").unwrap());
267        let bitmap = reader
268            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
269            .await
270            .unwrap();
271        assert_eq!(
272            bitmap,
273            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
274        );
275    }
276}