index/inverted_index/format/reader/
blob.rs1use std::ops::Range;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use bytes::Bytes;
20use common_base::range_read::RangeReader;
21use greptime_proto::v1::index::InvertedIndexMetas;
22use snafu::{ensure, ResultExt};
23
24use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
25use crate::inverted_index::format::reader::footer::{
26 InvertedIndexFooterReader, DEFAULT_PREFETCH_SIZE,
27};
28use crate::inverted_index::format::reader::InvertedIndexReader;
29use crate::inverted_index::format::MIN_BLOB_SIZE;
30
31pub struct InvertedIndexBlobReader<R> {
33 source: R,
35}
36
37impl<R> InvertedIndexBlobReader<R> {
38 pub fn new(source: R) -> Self {
39 Self { source }
40 }
41
42 fn validate_blob_size(blob_size: u64) -> Result<()> {
43 ensure!(
44 blob_size >= MIN_BLOB_SIZE,
45 UnexpectedBlobSizeSnafu {
46 min_blob_size: MIN_BLOB_SIZE,
47 actual_blob_size: blob_size,
48 }
49 );
50 Ok(())
51 }
52}
53
54#[async_trait]
55impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
56 async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
57 let buf = self
58 .source
59 .read(offset..offset + size as u64)
60 .await
61 .context(CommonIoSnafu)?;
62 Ok(buf.into())
63 }
64
65 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
66 self.source.read_vec(ranges).await.context(CommonIoSnafu)
67 }
68
69 async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
70 let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
71 let blob_size = metadata.content_length;
72 Self::validate_blob_size(blob_size)?;
73
74 let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
75 .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
76 footer_reader.metadata().await.map(Arc::new)
77 }
78}
79
80#[cfg(test)]
81mod tests {
82 use fst::MapBuilder;
83 use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
84 use prost::Message;
85
86 use super::*;
87 use crate::bitmap::Bitmap;
88
89 fn mock_fst() -> Vec<u8> {
90 let mut fst_buf = Vec::new();
91 let mut build = MapBuilder::new(&mut fst_buf).unwrap();
92 build.insert("key1".as_bytes(), 1).unwrap();
93 build.insert("key2".as_bytes(), 2).unwrap();
94 build.finish().unwrap();
95 fst_buf
96 }
97
98 fn mock_bitmap() -> Bitmap {
99 Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
100 }
101
102 fn mock_bitmap_bytes() -> Vec<u8> {
103 let mut buf = Vec::new();
104 mock_bitmap()
105 .serialize_into(BitmapType::Roaring, &mut buf)
106 .unwrap();
107 buf
108 }
109
110 fn create_inverted_index_blob() -> Vec<u8> {
111 let bitmap_size = mock_bitmap_bytes().len();
112 let fst_size = mock_fst().len();
113
114 let mut inverted_index = Vec::new();
116 inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_bitmap_bytes()); inverted_index.extend_from_slice(&mock_fst()); let meta = InvertedIndexMeta {
121 name: "tag0".to_string(),
122 base_offset: 0,
123 inverted_index_size: inverted_index.len() as _,
124 relative_null_bitmap_offset: bitmap_size as _,
125 null_bitmap_size: bitmap_size as _,
126 relative_fst_offset: (bitmap_size * 2) as _,
127 fst_size: fst_size as _,
128 bitmap_type: BitmapType::Roaring as _,
129 ..Default::default()
130 };
131
132 let meta1 = InvertedIndexMeta {
134 name: "tag1".to_string(),
135 base_offset: meta.inverted_index_size,
136 inverted_index_size: inverted_index.len() as _,
137 relative_null_bitmap_offset: bitmap_size as _,
138 null_bitmap_size: bitmap_size as _,
139 relative_fst_offset: (bitmap_size * 2) as _,
140 fst_size: fst_size as _,
141 bitmap_type: BitmapType::Roaring as _,
142 ..Default::default()
143 };
144
145 let mut metas = InvertedIndexMetas {
147 total_row_count: 10,
148 segment_row_count: 1,
149 ..Default::default()
150 };
151 metas.metas.insert(meta.name.clone(), meta);
152 metas.metas.insert(meta1.name.clone(), meta1);
153 let mut meta_buf = Vec::new();
154 metas.encode(&mut meta_buf).unwrap();
155
156 let mut blob = vec![];
157
158 blob.extend_from_slice(&inverted_index);
160
161 blob.extend_from_slice(&inverted_index);
163
164 blob.extend_from_slice(&meta_buf);
166 blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
167
168 blob
169 }
170
171 #[tokio::test]
172 async fn test_inverted_index_blob_reader_metadata() {
173 let blob = create_inverted_index_blob();
174 let blob_reader = InvertedIndexBlobReader::new(blob);
175
176 let metas = blob_reader.metadata().await.unwrap();
177 assert_eq!(metas.metas.len(), 2);
178
179 let meta0 = metas.metas.get("tag0").unwrap();
180 assert_eq!(meta0.name, "tag0");
181 assert_eq!(meta0.base_offset, 0);
182 assert_eq!(meta0.inverted_index_size, 102);
183 assert_eq!(meta0.relative_null_bitmap_offset, 26);
184 assert_eq!(meta0.null_bitmap_size, 26);
185 assert_eq!(meta0.relative_fst_offset, 52);
186 assert_eq!(meta0.fst_size, 50);
187
188 let meta1 = metas.metas.get("tag1").unwrap();
189 assert_eq!(meta1.name, "tag1");
190 assert_eq!(meta1.base_offset, 102);
191 assert_eq!(meta1.inverted_index_size, 102);
192 assert_eq!(meta1.relative_null_bitmap_offset, 26);
193 assert_eq!(meta1.null_bitmap_size, 26);
194 assert_eq!(meta1.relative_fst_offset, 52);
195 assert_eq!(meta1.fst_size, 50);
196 }
197
198 #[tokio::test]
199 async fn test_inverted_index_blob_reader_fst() {
200 let blob = create_inverted_index_blob();
201 let blob_reader = InvertedIndexBlobReader::new(blob);
202
203 let metas = blob_reader.metadata().await.unwrap();
204 let meta = metas.metas.get("tag0").unwrap();
205
206 let fst_map = blob_reader
207 .fst(
208 meta.base_offset + meta.relative_fst_offset as u64,
209 meta.fst_size,
210 )
211 .await
212 .unwrap();
213 assert_eq!(fst_map.len(), 2);
214 assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
215 assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
216
217 let meta = metas.metas.get("tag1").unwrap();
218 let fst_map = blob_reader
219 .fst(
220 meta.base_offset + meta.relative_fst_offset as u64,
221 meta.fst_size,
222 )
223 .await
224 .unwrap();
225 assert_eq!(fst_map.len(), 2);
226 assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
227 assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
228 }
229
230 #[tokio::test]
231 async fn test_inverted_index_blob_reader_bitmap() {
232 let blob = create_inverted_index_blob();
233 let blob_reader = InvertedIndexBlobReader::new(blob);
234
235 let metas = blob_reader.metadata().await.unwrap();
236 let meta = metas.metas.get("tag0").unwrap();
237
238 let bitmap = blob_reader
239 .bitmap(meta.base_offset, 26, BitmapType::Roaring)
240 .await
241 .unwrap();
242 assert_eq!(bitmap, mock_bitmap());
243 let bitmap = blob_reader
244 .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
245 .await
246 .unwrap();
247 assert_eq!(bitmap, mock_bitmap());
248
249 let metas = blob_reader.metadata().await.unwrap();
250 let meta = metas.metas.get("tag1").unwrap();
251
252 let bitmap = blob_reader
253 .bitmap(meta.base_offset, 26, BitmapType::Roaring)
254 .await
255 .unwrap();
256 assert_eq!(bitmap, mock_bitmap());
257 let bitmap = blob_reader
258 .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
259 .await
260 .unwrap();
261 assert_eq!(bitmap, mock_bitmap());
262 }
263}