index/inverted_index/format/writer/
blob.rs1use std::num::NonZeroUsize;
16
17use async_trait::async_trait;
18use futures::{AsyncWrite, AsyncWriteExt};
19use greptime_proto::v1::index::InvertedIndexMetas;
20use prost::Message;
21use snafu::ResultExt;
22
23use crate::bitmap::{Bitmap, BitmapType};
24use crate::inverted_index::error::{CloseSnafu, FlushSnafu, Result, WriteSnafu};
25use crate::inverted_index::format::writer::single::SingleIndexWriter;
26use crate::inverted_index::format::writer::{InvertedIndexWriter, ValueStream};
27
28pub struct InvertedIndexBlobWriter<W> {
31 blob_writer: W,
33
34 written_size: u64,
36
37 metas: InvertedIndexMetas,
39}
40
41#[async_trait]
42impl<W: AsyncWrite + Send + Unpin> InvertedIndexWriter for InvertedIndexBlobWriter<W> {
43 async fn add_index(
44 &mut self,
45 name: String,
46 null_bitmap: Bitmap,
47 values: ValueStream,
48 bitmap_type: BitmapType,
49 ) -> Result<()> {
50 let single_writer = SingleIndexWriter::new(
51 name.clone(),
52 self.written_size,
53 null_bitmap,
54 values,
55 &mut self.blob_writer,
56 bitmap_type,
57 );
58 let metadata = single_writer.write().await?;
59
60 self.written_size += metadata.inverted_index_size;
61 self.metas.metas.insert(name, metadata);
62
63 Ok(())
64 }
65
66 async fn finish(
67 &mut self,
68 total_row_count: u64,
69 segment_row_count: NonZeroUsize,
70 ) -> Result<()> {
71 self.metas.segment_row_count = segment_row_count.get() as _;
72 self.metas.total_row_count = total_row_count;
73
74 let metas_bytes = self.metas.encode_to_vec();
75 self.blob_writer
76 .write_all(&metas_bytes)
77 .await
78 .context(WriteSnafu)?;
79
80 let footer_size = metas_bytes.len() as u32;
81 self.blob_writer
82 .write_all(&footer_size.to_le_bytes())
83 .await
84 .context(WriteSnafu)?;
85
86 self.blob_writer.flush().await.context(FlushSnafu)?;
87 self.blob_writer.close().await.context(CloseSnafu)?;
88 Ok(())
89 }
90}
91
92impl<W: AsyncWrite + Send + Unpin> InvertedIndexBlobWriter<W> {
93 pub fn new(blob_writer: W) -> InvertedIndexBlobWriter<W> {
94 InvertedIndexBlobWriter {
95 blob_writer,
96 written_size: 0,
97 metas: InvertedIndexMetas::default(),
98 }
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use futures::stream;
105 use greptime_proto::v1::index::BitmapType;
106
107 use super::*;
108 use crate::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
109 use crate::Bytes;
110
111 fn unpack(fst_value: u64) -> [u32; 2] {
112 bytemuck::cast::<u64, [u32; 2]>(fst_value)
113 }
114
115 #[tokio::test]
116 async fn test_inverted_index_blob_writer_write_empty() {
117 let mut blob = Vec::new();
118 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
119 writer
120 .finish(8, NonZeroUsize::new(1).unwrap())
121 .await
122 .unwrap();
123
124 let reader = InvertedIndexBlobReader::new(blob);
125 let metadata = reader.metadata().await.unwrap();
126 assert_eq!(metadata.total_row_count, 8);
127 assert_eq!(metadata.segment_row_count, 1);
128 assert_eq!(metadata.metas.len(), 0);
129 }
130
131 #[tokio::test]
132 async fn test_inverted_index_blob_writer_write_basic() {
133 let mut blob = Vec::new();
134 let mut writer = InvertedIndexBlobWriter::new(&mut blob);
135 writer
136 .add_index(
137 "tag0".to_string(),
138 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
139 Box::new(stream::iter(vec![
140 Ok((
141 Bytes::from("a"),
142 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
143 )),
144 Ok((
145 Bytes::from("b"),
146 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
147 )),
148 Ok((
149 Bytes::from("c"),
150 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
151 )),
152 ])),
153 BitmapType::Roaring,
154 )
155 .await
156 .unwrap();
157 writer
158 .add_index(
159 "tag1".to_string(),
160 Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
161 Box::new(stream::iter(vec![
162 Ok((
163 Bytes::from("x"),
164 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
165 )),
166 Ok((
167 Bytes::from("y"),
168 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
169 )),
170 Ok((
171 Bytes::from("z"),
172 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
173 )),
174 ])),
175 BitmapType::Roaring,
176 )
177 .await
178 .unwrap();
179 writer
180 .finish(8, NonZeroUsize::new(1).unwrap())
181 .await
182 .unwrap();
183
184 let reader = InvertedIndexBlobReader::new(blob);
185 let metadata = reader.metadata().await.unwrap();
186 assert_eq!(metadata.total_row_count, 8);
187 assert_eq!(metadata.segment_row_count, 1);
188 assert_eq!(metadata.metas.len(), 2);
189
190 let tag0 = metadata.metas.get("tag0").unwrap();
192 let stats0 = tag0.stats.as_ref().unwrap();
193 assert_eq!(stats0.distinct_count, 3);
194 assert_eq!(stats0.null_count, 1);
195 assert_eq!(stats0.min_value, Bytes::from("a"));
196 assert_eq!(stats0.max_value, Bytes::from("c"));
197 let fst0 = reader
198 .fst(
199 tag0.base_offset + tag0.relative_fst_offset as u64,
200 tag0.fst_size,
201 )
202 .await
203 .unwrap();
204 assert_eq!(fst0.len(), 3);
205 let [offset, size] = unpack(fst0.get(b"a").unwrap());
206 let bitmap = reader
207 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
208 .await
209 .unwrap();
210 assert_eq!(
211 bitmap,
212 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
213 );
214 let [offset, size] = unpack(fst0.get(b"b").unwrap());
215 let bitmap = reader
216 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
217 .await
218 .unwrap();
219 assert_eq!(
220 bitmap,
221 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
222 );
223 let [offset, size] = unpack(fst0.get(b"c").unwrap());
224 let bitmap = reader
225 .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
226 .await
227 .unwrap();
228 assert_eq!(
229 bitmap,
230 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
231 );
232
233 let tag1 = metadata.metas.get("tag1").unwrap();
235 let stats1 = tag1.stats.as_ref().unwrap();
236 assert_eq!(stats1.distinct_count, 3);
237 assert_eq!(stats1.null_count, 1);
238 assert_eq!(stats1.min_value, Bytes::from("x"));
239 assert_eq!(stats1.max_value, Bytes::from("z"));
240 let fst1 = reader
241 .fst(
242 tag1.base_offset + tag1.relative_fst_offset as u64,
243 tag1.fst_size,
244 )
245 .await
246 .unwrap();
247 assert_eq!(fst1.len(), 3);
248 let [offset, size] = unpack(fst1.get(b"x").unwrap());
249 let bitmap = reader
250 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
251 .await
252 .unwrap();
253 assert_eq!(
254 bitmap,
255 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
256 );
257 let [offset, size] = unpack(fst1.get(b"y").unwrap());
258 let bitmap = reader
259 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
260 .await
261 .unwrap();
262 assert_eq!(
263 bitmap,
264 Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
265 );
266 let [offset, size] = unpack(fst1.get(b"z").unwrap());
267 let bitmap = reader
268 .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
269 .await
270 .unwrap();
271 assert_eq!(
272 bitmap,
273 Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
274 );
275 }
276}