index/inverted_index/format/writer/
single.rs1use fst::MapBuilder;
16use futures::{AsyncWrite, AsyncWriteExt, Stream, StreamExt};
17use greptime_proto::v1::index::{InvertedIndexMeta, InvertedIndexStats};
18use snafu::ResultExt;
19
20use crate::bitmap::{Bitmap, BitmapType};
21use crate::inverted_index::error::{FstCompileSnafu, FstInsertSnafu, Result, WriteSnafu};
22use crate::Bytes;
23
24pub struct SingleIndexWriter<W, S> {
26 blob_writer: W,
28
29 null_bitmap: Bitmap,
31
32 values: S,
34
35 fst: MapBuilder<Vec<u8>>,
37
38 meta: InvertedIndexMeta,
40
41 bitmap_type: BitmapType,
43
44 buf: Vec<u8>,
46}
47
48impl<W, S> SingleIndexWriter<W, S>
49where
50 W: AsyncWrite + Send + Unpin,
51 S: Stream<Item = Result<(Bytes, Bitmap)>> + Send + Unpin,
52{
53 pub fn new(
55 name: String,
56 base_offset: u64,
57 null_bitmap: Bitmap,
58 values: S,
59 blob_writer: W,
60 bitmap_type: BitmapType,
61 ) -> SingleIndexWriter<W, S> {
62 SingleIndexWriter {
63 blob_writer,
64 null_bitmap,
65 values,
66 fst: MapBuilder::memory(),
67 bitmap_type,
68 buf: Vec::new(),
69 meta: InvertedIndexMeta {
70 name,
71 base_offset,
72 stats: Some(InvertedIndexStats::default()),
73 bitmap_type: bitmap_type.into(),
74 ..Default::default()
75 },
76 }
77 }
78
79 pub async fn write(mut self) -> Result<InvertedIndexMeta> {
81 self.write_null_bitmap().await?;
82
83 while let Some(result) = self.values.next().await {
84 let (bytes, bitmap) = result?;
85 self.append_value(bytes, bitmap).await?;
86 }
87
88 self.finish_fst_construction().await
89 }
90
91 async fn write_null_bitmap(&mut self) -> Result<()> {
93 self.buf.clear();
94 self.null_bitmap
95 .serialize_into(self.bitmap_type, &mut self.buf)
96 .expect("Write to vec should not fail");
97 self.blob_writer
98 .write_all(&self.buf)
99 .await
100 .context(WriteSnafu)?;
101
102 self.meta.relative_null_bitmap_offset = self.meta.inverted_index_size as _;
103 self.meta.null_bitmap_size = self.buf.len() as _;
104 self.meta.inverted_index_size += self.meta.null_bitmap_size as u64;
105
106 if let Some(stats) = self.meta.stats.as_mut() {
108 let null_count = self.null_bitmap.count_ones();
109 stats.null_count = null_count as u64;
110 }
111
112 Ok(())
113 }
114
115 async fn append_value(&mut self, value: Bytes, bitmap: Bitmap) -> Result<()> {
117 self.buf.clear();
118 bitmap
119 .serialize_into(self.bitmap_type, &mut self.buf)
120 .expect("Write to vec should not fail");
121 self.blob_writer
122 .write_all(&self.buf)
123 .await
124 .context(WriteSnafu)?;
125
126 let offset = self.meta.inverted_index_size as u32;
127 let size = self.buf.len() as u32;
128 self.meta.inverted_index_size += size as u64;
129
130 let packed = bytemuck::cast::<[u32; 2], u64>([offset, size]);
131 self.fst.insert(&value, packed).context(FstInsertSnafu)?;
132
133 if let Some(stats) = self.meta.stats.as_mut() {
135 stats.distinct_count += 1;
136
137 if stats.distinct_count == 1 {
139 stats.min_value.clone_from(&value);
140 }
141 stats.max_value = value;
142 }
143
144 Ok(())
145 }
146
147 async fn finish_fst_construction(mut self) -> Result<InvertedIndexMeta> {
149 let fst_bytes = self.fst.into_inner().context(FstCompileSnafu)?;
150 self.blob_writer
151 .write_all(&fst_bytes)
152 .await
153 .context(WriteSnafu)?;
154
155 self.meta.relative_fst_offset = self.meta.inverted_index_size as _;
156 self.meta.fst_size = fst_bytes.len() as _;
157 self.meta.inverted_index_size += self.meta.fst_size as u64;
158 Ok(self.meta)
159 }
160}
161
162#[cfg(test)]
163mod tests {
164 use futures::stream;
165
166 use super::*;
167 use crate::inverted_index::error::Error;
168 use crate::Bytes;
169
170 #[tokio::test]
171 async fn test_single_index_writer_write_empty() {
172 let mut blob = Vec::new();
173 let writer = SingleIndexWriter::new(
174 "test".to_string(),
175 0,
176 Bitmap::new_roaring(),
177 stream::empty(),
178 &mut blob,
179 BitmapType::Roaring,
180 );
181
182 let meta = writer.write().await.unwrap();
183 assert_eq!(meta.name, "test");
184 assert_eq!(meta.base_offset, 0);
185 assert_eq!(meta.stats, Some(InvertedIndexStats::default()));
186 }
187
188 #[tokio::test]
189 async fn test_single_index_writer_write_basic() {
190 let mut blob = Vec::new();
191 let writer = SingleIndexWriter::new(
192 "test".to_string(),
193 0,
194 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
195 stream::iter(vec![
196 Ok((
197 Bytes::from("a"),
198 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
199 )),
200 Ok((
201 Bytes::from("b"),
202 Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
203 )),
204 Ok((
205 Bytes::from("c"),
206 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
207 )),
208 ]),
209 &mut blob,
210 BitmapType::Roaring,
211 );
212 let meta = writer.write().await.unwrap();
213
214 assert_eq!(meta.name, "test");
215 assert_eq!(meta.base_offset, 0);
216 let stats = meta.stats.as_ref().unwrap();
217 assert_eq!(stats.distinct_count, 3);
218 assert_eq!(stats.null_count, 1);
219 assert_eq!(stats.min_value, Bytes::from("a"));
220 assert_eq!(stats.max_value, Bytes::from("c"));
221 }
222
223 #[tokio::test]
224 async fn test_single_index_writer_write_out_of_order() {
225 let mut blob = Vec::new();
226 let writer = SingleIndexWriter::new(
227 "test".to_string(),
228 0,
229 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
230 stream::iter(vec![
231 Ok((
232 Bytes::from("b"),
233 Bitmap::from_lsb0_bytes(&[0b0000_0000], BitmapType::Roaring),
234 )),
235 Ok((
236 Bytes::from("a"),
237 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
238 )),
239 Ok((
240 Bytes::from("c"),
241 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
242 )),
243 ]),
244 &mut blob,
245 BitmapType::Roaring,
246 );
247 let res = writer.write().await;
248 assert!(matches!(res, Err(Error::FstInsert { .. })));
249 }
250}