index/inverted_index/format/writer/
single.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use fst::MapBuilder;
16use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt};
17use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats};
18use snafu::ResultExt;
19
20use crate::bitmap::{Bitmap, BitmapType};
21use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu};
22use crate::Bytes;
23
24/// `SingleIndexWriter` writes values to the blob storage for an individual inverted index
25pub struct SingleIndexWriter<W, S> {
26    /// The underlying blob storage
27    blob_writer: W,
28
29    /// The null bitmap to be written
30    null_bitmap: Bitmap,
31
32    /// The stream of values to be written, yielded lexicographically
33    values: S,
34
35    /// Builder for constructing the FST
36    fst: MapBuilder<Vec<u8>>,
37
38    /// Metadata about the index
39    meta: InvertedIndexMeta,
40
41    /// The type of bitmap to use
42    bitmap_type: BitmapType,
43
44    /// Buffer for writing the blob
45    buf: Vec<u8>,
46}
47
48impl<W, S> SingleIndexWriter<W, S>
49where
50    W: AsyncWrite + Send + Unpin,
51    S: Stream<Item = Result<(Bytes, Bitmap)>> + Send + Unpin,
52{
53    /// Constructs a new `SingleIndexWriter`
54    pub fn new(
55        name: String,
56        base_offset: u64,
57        null_bitmap: Bitmap,
58        values: S,
59        blob_writer: W,
60        bitmap_type: BitmapType,
61    ) -> SingleIndexWriter<W, S> {
62        SingleIndexWriter {
63            blob_writer,
64            null_bitmap,
65            values,
66            fst: MapBuilder::memory(),
67            bitmap_type,
68            buf: Vec::new(),
69            meta: InvertedIndexMeta {
70                name,
71                base_offset,
72                stats: Some(InvertedIndexStats::default()),
73                bitmap_type: bitmap_type.into(),
74                ..Default::default()
75            },
76        }
77    }
78
79    /// Writes the null bitmap, values with their bitmaps, and constructs the FST map.
80    pub async fn write(mut self) -> Result<InvertedIndexMeta> {
81        self.write_null_bitmap().await?;
82
83        while let Some(result) = self.values.next().await {
84            let (bytes, bitmap) = result?;
85            self.append_value(bytes, bitmap).await?;
86        }
87
88        self.finish_fst_construction().await
89    }
90
91    /// Writes the null bitmap to the blob and updates the metadata accordingly
92    async fn write_null_bitmap(&mut self) -> Result<()> {
93        self.buf.clear();
94        self.null_bitmap
95            .serialize_into(self.bitmap_type, &mut self.buf)
96            .expect("Write to vec should not fail");
97        self.blob_writer
98            .write_all(&self.buf)
99            .await
100            .context(WriteSnafu)?;
101
102        self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _;
103        self.meta.null_bitmap_size = self.buf.len() as _;
104        self.meta.inverted_index_size += self.meta.null_bitmap_size as u64;
105
106        // update stats
107        if let Some(stats) = self.meta.stats.as_mut() {
108            let null_count = self.null_bitmap.count_ones();
109            stats.null_count = null_count as u64;
110        }
111
112        Ok(())
113    }
114
115    /// Appends a value and its bitmap to the blob, updates the FST, and the metadata
116    async fn append_value(&mut self, value: Bytes, bitmap: Bitmap) -> Result<()> {
117        self.buf.clear();
118        bitmap
119            .serialize_into(self.bitmap_type, &mut self.buf)
120            .expect("Write to vec should not fail");
121        self.blob_writer
122            .write_all(&self.buf)
123            .await
124            .context(WriteSnafu)?;
125
126        let offset = self.meta.inverted_index_size as u32;
127        let size = self.buf.len() as u32;
128        self.meta.inverted_index_size += size as u64;
129
130        let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]);
131        self.fst.insert(&value, packed).context(FstInsertSnafu)?;
132
133        // update stats
134        if let Some(stats) = self.meta.stats.as_mut() {
135            stats.distinct_count += 1;
136
137            // update min/max, assume values are appended in lexicographic order
138            if stats.distinct_count == 1 {
139                stats.min_value.clone_from(&value);
140            }
141            stats.max_value = value;
142        }
143
144        Ok(())
145    }
146
147    /// Writes the compiled FST to the blob and finalizes the metadata
148    async fn finish_fst_construction(mut self) -> Result<InvertedIndexMeta> {
149        let fst_bytes = self.fst.into_inner().context(FstCompileSnafu)?;
150        self.blob_writer
151            .write_all(&fst_bytes)
152            .await
153            .context(WriteSnafu)?;
154
155        self.meta.relative_fst_offset = self.meta.inverted_index_size as _;
156        self.meta.fst_size = fst_bytes.len() as _;
157        self.meta.inverted_index_size += self.meta.fst_size as u64;
158        Ok(self.meta)
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use futures::stream;
165
166    use super::*;
167    use crate::inverted_index::error::Error;
168    use crate::Bytes;
169
170    #[tokio::test]
171    async fn test_single_index_writer_write_empty() {
172        let mut blob = Vec::new();
173        let writer = SingleIndexWriter::new(
174            "test".to_string(),
175            0,
176            Bitmap::new_roaring(),
177            stream::empty(),
178            &mut blob,
179            BitmapType::Roaring,
180        );
181
182        let meta = writer.write().await.unwrap();
183        assert_eq!(meta.name, "test");
184        assert_eq!(meta.base_offset, 0);
185        assert_eq!(meta.stats, Some(InvertedIndexStats::default()));
186    }
187
188    #[tokio::test]
189    async fn test_single_index_writer_write_basic() {
190        let mut blob = Vec::new();
191        let writer = SingleIndexWriter::new(
192            "test".to_string(),
193            0,
194            Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
195            stream::iter(vec![
196                Ok((
197                    Bytes::from("a"),
198                    Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
199                )),
200                Ok((
201                    Bytes::from("b"),
202                    Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
203                )),
204                Ok((
205                    Bytes::from("c"),
206                    Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
207                )),
208            ]),
209            &mut blob,
210            BitmapType::Roaring,
211        );
212        let meta = writer.write().await.unwrap();
213
214        assert_eq!(meta.name, "test");
215        assert_eq!(meta.base_offset, 0);
216        let stats = meta.stats.as_ref().unwrap();
217        assert_eq!(stats.distinct_count, 3);
218        assert_eq!(stats.null_count, 1);
219        assert_eq!(stats.min_value, Bytes::from("a"));
220        assert_eq!(stats.max_value, Bytes::from("c"));
221    }
222
223    #[tokio::test]
224    async fn test_single_index_writer_write_out_of_order() {
225        let mut blob = Vec::new();
226        let writer = SingleIndexWriter::new(
227            "test".to_string(),
228            0,
229            Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
230            stream::iter(vec![
231                Ok((
232                    Bytes::from("b"),
233                    Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
234                )),
235                Ok((
236                    Bytes::from("a"),
237                    Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
238                )),
239                Ok((
240                    Bytes::from("c"),
241                    Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
242                )),
243            ]),
244            &mut blob,
245            BitmapType::Roaring,
246        );
247        let res = writer.write().await;
248        assert!(matches!(res, Err(Error::FstInsert { .. })));
249    }
250}