index/inverted_index/format/reader/
blob.rs1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use common_base::range_read::RangeReader;
22use greptime_proto::v1::index::InvertedIndexMetas;
23use snafu::{ResultExt, ensure};
24
25use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
26use crate::inverted_index::format::MIN_BLOB_SIZE;
27use crate::inverted_index::format::reader::footer::{
28 DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
29};
30use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
31
32pub struct InvertedIndexBlobReader<R> {
34 source: R,
36}
37
38impl<R> InvertedIndexBlobReader<R> {
39 pub fn new(source: R) -> Self {
40 Self { source }
41 }
42
43 fn validate_blob_size(blob_size: u64) -> Result<()> {
44 ensure!(
45 blob_size >= MIN_BLOB_SIZE,
46 UnexpectedBlobSizeSnafu {
47 min_blob_size: MIN_BLOB_SIZE,
48 actual_blob_size: blob_size,
49 }
50 );
51 Ok(())
52 }
53}
54
55#[async_trait]
56impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
57 async fn range_read<'a>(
58 &self,
59 offset: u64,
60 size: u32,
61 metrics: Option<&'a mut InvertedIndexReadMetrics>,
62 ) -> Result<Vec<u8>> {
63 let start = metrics.as_ref().map(|_| Instant::now());
64
65 let buf = self
66 .source
67 .read(offset..offset + size as u64)
68 .await
69 .context(CommonIoSnafu)?;
70
71 if let Some(m) = metrics {
72 m.total_bytes += size as u64;
73 m.total_ranges += 1;
74 m.fetch_elapsed += start.unwrap().elapsed();
75 }
76
77 Ok(buf.into())
78 }
79
80 async fn read_vec<'a>(
81 &self,
82 ranges: &[Range<u64>],
83 metrics: Option<&'a mut InvertedIndexReadMetrics>,
84 ) -> Result<Vec<Bytes>> {
85 let start = metrics.as_ref().map(|_| Instant::now());
86
87 let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
88
89 if let Some(m) = metrics {
90 m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
91 m.total_ranges += ranges.len();
92 m.fetch_elapsed += start.unwrap().elapsed();
93 }
94
95 Ok(result)
96 }
97
98 async fn metadata<'a>(
99 &self,
100 metrics: Option<&'a mut InvertedIndexReadMetrics>,
101 ) -> Result<Arc<InvertedIndexMetas>> {
102 let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
103 let blob_size = metadata.content_length;
104 Self::validate_blob_size(blob_size)?;
105
106 let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
107 .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
108 footer_reader.metadata(metrics).await.map(Arc::new)
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use fst::MapBuilder;
115 use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
116 use prost::Message;
117
118 use super::*;
119 use crate::bitmap::Bitmap;
120
121 fn mock_fst() -> Vec<u8> {
122 let mut fst_buf = Vec::new();
123 let mut build = MapBuilder::new(&mut fst_buf).unwrap();
124 build.insert("key1".as_bytes(), 1).unwrap();
125 build.insert("key2".as_bytes(), 2).unwrap();
126 build.finish().unwrap();
127 fst_buf
128 }
129
130 fn mock_bitmap() -> Bitmap {
131 Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
132 }
133
134 fn mock_bitmap_bytes() -> Vec<u8> {
135 let mut buf = Vec::new();
136 mock_bitmap()
137 .serialize_into(BitmapType::Roaring, &mut buf)
138 .unwrap();
139 buf
140 }
141
142 fn create_inverted_index_blob() -> Vec<u8> {
143 let bitmap_size = mock_bitmap_bytes().len();
144 let fst_size = mock_fst().len();
145
146 let mut inverted_index = Vec::new();
148 inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_fst()); let meta = InvertedIndexMeta {
153 name: "tag0".to_string(),
154 base_offset: 0,
155 inverted_index_size: inverted_index.len() as _,
156 relative_null_bitmap_offset: bitmap_size as _,
157 null_bitmap_size: bitmap_size as _,
158 relative_fst_offset: (bitmap_size * 2) as _,
159 fst_size: fst_size as _,
160 bitmap_type: BitmapType::Roaring as _,
161 ..Default::default()
162 };
163
164 let meta1 = InvertedIndexMeta {
166 name: "tag1".to_string(),
167 base_offset: meta.inverted_index_size,
168 inverted_index_size: inverted_index.len() as _,
169 relative_null_bitmap_offset: bitmap_size as _,
170 null_bitmap_size: bitmap_size as _,
171 relative_fst_offset: (bitmap_size * 2) as _,
172 fst_size: fst_size as _,
173 bitmap_type: BitmapType::Roaring as _,
174 ..Default::default()
175 };
176
177 let mut metas = InvertedIndexMetas {
179 total_row_count: 10,
180 segment_row_count: 1,
181 ..Default::default()
182 };
183 metas.metas.insert(meta.name.clone(), meta);
184 metas.metas.insert(meta1.name.clone(), meta1);
185 let mut meta_buf = Vec::new();
186 metas.encode(&mut meta_buf).unwrap();
187
188 let mut blob = vec![];
189
190 blob.extend_from_slice(&inverted_index);
192
193 blob.extend_from_slice(&inverted_index);
195
196 blob.extend_from_slice(&meta_buf);
198 blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
199
200 blob
201 }
202
203 #[tokio::test]
204 async fn test_inverted_index_blob_reader_metadata() {
205 let blob = create_inverted_index_blob();
206 let blob_reader = InvertedIndexBlobReader::new(blob);
207
208 let metas = blob_reader.metadata(None).await.unwrap();
209 assert_eq!(metas.metas.len(), 2);
210
211 let meta0 = metas.metas.get("tag0").unwrap();
212 assert_eq!(meta0.name, "tag0");
213 assert_eq!(meta0.base_offset, 0);
214 assert_eq!(meta0.inverted_index_size, 102);
215 assert_eq!(meta0.relative_null_bitmap_offset, 26);
216 assert_eq!(meta0.null_bitmap_size, 26);
217 assert_eq!(meta0.relative_fst_offset, 52);
218 assert_eq!(meta0.fst_size, 50);
219
220 let meta1 = metas.metas.get("tag1").unwrap();
221 assert_eq!(meta1.name, "tag1");
222 assert_eq!(meta1.base_offset, 102);
223 assert_eq!(meta1.inverted_index_size, 102);
224 assert_eq!(meta1.relative_null_bitmap_offset, 26);
225 assert_eq!(meta1.null_bitmap_size, 26);
226 assert_eq!(meta1.relative_fst_offset, 52);
227 assert_eq!(meta1.fst_size, 50);
228 }
229
230 #[tokio::test]
231 async fn test_inverted_index_blob_reader_fst() {
232 let blob = create_inverted_index_blob();
233 let blob_reader = InvertedIndexBlobReader::new(blob);
234
235 let metas = blob_reader.metadata(None).await.unwrap();
236 let meta = metas.metas.get("tag0").unwrap();
237
238 let fst_map = blob_reader
239 .fst(
240 meta.base_offset + meta.relative_fst_offset as u64,
241 meta.fst_size,
242 None,
243 )
244 .await
245 .unwrap();
246 assert_eq!(fst_map.len(), 2);
247 assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
248 assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
249
250 let meta = metas.metas.get("tag1").unwrap();
251 let fst_map = blob_reader
252 .fst(
253 meta.base_offset + meta.relative_fst_offset as u64,
254 meta.fst_size,
255 None,
256 )
257 .await
258 .unwrap();
259 assert_eq!(fst_map.len(), 2);
260 assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
261 assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
262 }
263
264 #[tokio::test]
265 async fn test_inverted_index_blob_reader_bitmap() {
266 let blob = create_inverted_index_blob();
267 let blob_reader = InvertedIndexBlobReader::new(blob);
268
269 let metas = blob_reader.metadata(None).await.unwrap();
270 let meta = metas.metas.get("tag0").unwrap();
271
272 let bitmap = blob_reader
273 .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
274 .await
275 .unwrap();
276 assert_eq!(bitmap, mock_bitmap());
277 let bitmap = blob_reader
278 .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
279 .await
280 .unwrap();
281 assert_eq!(bitmap, mock_bitmap());
282
283 let metas = blob_reader.metadata(None).await.unwrap();
284 let meta = metas.metas.get("tag1").unwrap();
285
286 let bitmap = blob_reader
287 .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
288 .await
289 .unwrap();
290 assert_eq!(bitmap, mock_bitmap());
291 let bitmap = blob_reader
292 .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
293 .await
294 .unwrap();
295 assert_eq!(bitmap, mock_bitmap());
296 }
297}