index/inverted_index/format/writer/
blob.rs1use std::num::NonZeroUsize;
16
17use async_trait::async_trait;
18use futures::{AsyncWrite, AsyncWriteExt};
19use greptime_proto::v1::index::InvertedIndexMetas;
20use prost::Message;
21use snafu::ResultExt;
22
23use crate::bitmap::{Bitmap, BitmapType};
24use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
25use crate::inverted_index::format::writer::single::SingleIndexWriter;
26use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
27
28pub struct InvertedIndexBlobWriter<W> {
31 blob_writer: W,
33
34 written_size: u64,
36
37 metas: InvertedIndexMetas,
39}
40
41#[async_trait]
42impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
43 async fn add_index(
44 &mut self,
45 name: String,
46 null_bitmap: Bitmap,
47 values: ValueStream,
48 bitmap_type: BitmapType,
49 ) -> Result<()> {
50 let single_writer = SingleIndexWriter::new(
51 name.clone(),
52 self.written_size,
53 null_bitmap,
54 values,
55 &mut self.blob_writer,
56 bitmap_type,
57 );
58 let metadata = single_writer.write().await?;
59
60 self.written_size += metadata.inverted_index_size;
61 self.metas.metas.insert(name, metadata);
62
63 Ok(())
64 }
65
66 async fn finish(
67 &mut self,
68 total_row_count: u64,
69 segment_row_count: NonZeroUsize,
70 ) -> Result<()> {
71 self.metas.segment_row_count = segment_row_count.get() as _;
72 self.metas.total_row_count = total_row_count;
73
74 let metas_bytes = self.metas.encode_to_vec();
75 self.blob_writer
76 .write_all(&metas_bytes)
77 .await
78 .context(WriteSnafu)?;
79
80 let footer_size = metas_bytes.len() as u32;
81 self.blob_writer
82 .write_all(&footer_size.to_le_bytes())
83 .await
84 .context(WriteSnafu)?;
85
86 self.blob_writer.flush().await.context(FlushSnafu)?;
87 self.blob_writer.close().await.context(CloseSnafu)?;
88 Ok(())
89 }
90}
91
92impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
93 pub fn new(blob_writer: W) -> InvertedIndexBlobWriter<W> {
94 InvertedIndexBlobWriter {
95 blob_writer,
96 written_size: 0,
97 metas: InvertedIndexMetas::default(),
98 }
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use futures::stream;
105 use greptime_proto::v1::index::BitmapType;
106
107 use super::*;
108 use crate::Bytes;
109 use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
110
111 fn unpack(fst_value: u64) -> [u32; 2] {
112 bytemuck::cast::<u64, [u32; 2]>(fst_value)
113 }
114
115 #[tokio::test]
116 async fn test_inverted_index_blob_writer_write_empty() {
117 let mut blob = Vec::new();
118 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
119 writer
120 .finish(8, NonZeroUsize::new(1).unwrap())
121 .await
122 .unwrap();
123
124 let reader = InvertedIndexBlobReader::new(blob);
125 let metadata = reader.metadata(None).await.unwrap();
126 assert_eq!(metadata.total_row_count, 8);
127 assert_eq!(metadata.segment_row_count, 1);
128 assert_eq!(metadata.metas.len(), 0);
129 }
130
131 #[tokio::test]
132 async fn test_inverted_index_blob_writer_write_basic() {
133 let mut blob = Vec::new();
134 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
135 writer
136 .add_index(
137 "tag0".to_string(),
138 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
139 Box::new(stream::iter(vec![
140 Ok((
141 Bytes::from("a"),
142 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
143 )),
144 Ok((
145 Bytes::from("b"),
146 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
147 )),
148 Ok((
149 Bytes::from("c"),
150 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
151 )),
152 ])),
153 BitmapType::Roaring,
154 )
155 .await
156 .unwrap();
157 writer
158 .add_index(
159 "tag1".to_string(),
160 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
161 Box::new(stream::iter(vec![
162 Ok((
163 Bytes::from("x"),
164 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
165 )),
166 Ok((
167 Bytes::from("y"),
168 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
169 )),
170 Ok((
171 Bytes::from("z"),
172 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
173 )),
174 ])),
175 BitmapType::Roaring,
176 )
177 .await
178 .unwrap();
179 writer
180 .finish(8, NonZeroUsize::new(1).unwrap())
181 .await
182 .unwrap();
183
184 let reader = InvertedIndexBlobReader::new(blob);
185 let metadata = reader.metadata(None).await.unwrap();
186 assert_eq!(metadata.total_row_count, 8);
187 assert_eq!(metadata.segment_row_count, 1);
188 assert_eq!(metadata.metas.len(), 2);
189
190 let tag0 = metadata.metas.get("tag0").unwrap();
192 let stats0 = tag0.stats.as_ref().unwrap();
193 assert_eq!(stats0.distinct_count, 3);
194 assert_eq!(stats0.null_count, 1);
195 assert_eq!(stats0.min_value, Bytes::from("a"));
196 assert_eq!(stats0.max_value, Bytes::from("c"));
197 let fst0 = reader
198 .fst(
199 tag0.base_offset + tag0.relative_fst_offset as u64,
200 tag0.fst_size,
201 None,
202 )
203 .await
204 .unwrap();
205 assert_eq!(fst0.len(), 3);
206 let [offset, size] = unpack(fst0.get(b"a").unwrap());
207 let bitmap = reader
208 .bitmap(
209 tag0.base_offset + offset as u64,
210 size,
211 BitmapType::Roaring,
212 None,
213 )
214 .await
215 .unwrap();
216 assert_eq!(
217 bitmap,
218 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
219 );
220 let [offset, size] = unpack(fst0.get(b"b").unwrap());
221 let bitmap = reader
222 .bitmap(
223 tag0.base_offset + offset as u64,
224 size,
225 BitmapType::Roaring,
226 None,
227 )
228 .await
229 .unwrap();
230 assert_eq!(
231 bitmap,
232 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
233 );
234 let [offset, size] = unpack(fst0.get(b"c").unwrap());
235 let bitmap = reader
236 .bitmap(
237 tag0.base_offset + offset as u64,
238 size,
239 BitmapType::Roaring,
240 None,
241 )
242 .await
243 .unwrap();
244 assert_eq!(
245 bitmap,
246 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
247 );
248
249 let tag1 = metadata.metas.get("tag1").unwrap();
251 let stats1 = tag1.stats.as_ref().unwrap();
252 assert_eq!(stats1.distinct_count, 3);
253 assert_eq!(stats1.null_count, 1);
254 assert_eq!(stats1.min_value, Bytes::from("x"));
255 assert_eq!(stats1.max_value, Bytes::from("z"));
256 let fst1 = reader
257 .fst(
258 tag1.base_offset + tag1.relative_fst_offset as u64,
259 tag1.fst_size,
260 None,
261 )
262 .await
263 .unwrap();
264 assert_eq!(fst1.len(), 3);
265 let [offset, size] = unpack(fst1.get(b"x").unwrap());
266 let bitmap = reader
267 .bitmap(
268 tag1.base_offset + offset as u64,
269 size,
270 BitmapType::Roaring,
271 None,
272 )
273 .await
274 .unwrap();
275 assert_eq!(
276 bitmap,
277 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
278 );
279 let [offset, size] = unpack(fst1.get(b"y").unwrap());
280 let bitmap = reader
281 .bitmap(
282 tag1.base_offset + offset as u64,
283 size,
284 BitmapType::Roaring,
285 None,
286 )
287 .await
288 .unwrap();
289 assert_eq!(
290 bitmap,
291 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
292 );
293 let [offset, size] = unpack(fst1.get(b"z").unwrap());
294 let bitmap = reader
295 .bitmap(
296 tag1.base_offset + offset as u64,
297 size,
298 BitmapType::Roaring,
299 None,
300 )
301 .await
302 .unwrap();
303 assert_eq!(
304 bitmap,
305 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
306 );
307 }
308}