index/inverted_index/format/reader/
blob.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use async_trait::async_trait;
20use bytes::Bytes;
21use common_base::range_read::RangeReader;
22use greptime_proto::v1::index::InvertedIndexMetas;
23use snafu::{ResultExt, ensure};
24
25use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
26use crate::inverted_index::format::MIN_BLOB_SIZE;
27use crate::inverted_index::format::reader::footer::{
28    DEFAULT_PREFETCH_SIZE, InvertedIndexFooterReader,
29};
30use crate::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
31
32/// Inverted index blob reader, implements [`InvertedIndexReader`]
33pub struct InvertedIndexBlobReader<R> {
34    /// The blob
35    source: R,
36}
37
38impl<R> InvertedIndexBlobReader<R> {
39    pub fn new(source: R) -> Self {
40        Self { source }
41    }
42
43    fn validate_blob_size(blob_size: u64) -> Result<()> {
44        ensure!(
45            blob_size >= MIN_BLOB_SIZE,
46            UnexpectedBlobSizeSnafu {
47                min_blob_size: MIN_BLOB_SIZE,
48                actual_blob_size: blob_size,
49            }
50        );
51        Ok(())
52    }
53}
54
55#[async_trait]
56impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
57    async fn range_read<'a>(
58        &self,
59        offset: u64,
60        size: u32,
61        metrics: Option<&'a mut InvertedIndexReadMetrics>,
62    ) -> Result<Vec<u8>> {
63        let start = metrics.as_ref().map(|_| Instant::now());
64
65        let buf = self
66            .source
67            .read(offset..offset + size as u64)
68            .await
69            .context(CommonIoSnafu)?;
70
71        if let Some(m) = metrics {
72            m.total_bytes += size as u64;
73            m.total_ranges += 1;
74            m.fetch_elapsed += start.unwrap().elapsed();
75        }
76
77        Ok(buf.into())
78    }
79
80    async fn read_vec<'a>(
81        &self,
82        ranges: &[Range<u64>],
83        metrics: Option<&'a mut InvertedIndexReadMetrics>,
84    ) -> Result<Vec<Bytes>> {
85        let start = metrics.as_ref().map(|_| Instant::now());
86
87        let result = self.source.read_vec(ranges).await.context(CommonIoSnafu)?;
88
89        if let Some(m) = metrics {
90            m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
91            m.total_ranges += ranges.len();
92            m.fetch_elapsed += start.unwrap().elapsed();
93        }
94
95        Ok(result)
96    }
97
98    async fn metadata<'a>(
99        &self,
100        metrics: Option<&'a mut InvertedIndexReadMetrics>,
101    ) -> Result<Arc<InvertedIndexMetas>> {
102        let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
103        let blob_size = metadata.content_length;
104        Self::validate_blob_size(blob_size)?;
105
106        let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
107            .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
108        footer_reader.metadata(metrics).await.map(Arc::new)
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use fst::MapBuilder;
115    use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
116    use prost::Message;
117
118    use super::*;
119    use crate::bitmap::Bitmap;
120
121    fn mock_fst() -> Vec<u8> {
122        let mut fst_buf = Vec::new();
123        let mut build = MapBuilder::new(&mut fst_buf).unwrap();
124        build.insert("key1".as_bytes(), 1).unwrap();
125        build.insert("key2".as_bytes(), 2).unwrap();
126        build.finish().unwrap();
127        fst_buf
128    }
129
130    fn mock_bitmap() -> Bitmap {
131        Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
132    }
133
134    fn mock_bitmap_bytes() -> Vec<u8> {
135        let mut buf = Vec::new();
136        mock_bitmap()
137            .serialize_into(BitmapType::Roaring, &mut buf)
138            .unwrap();
139        buf
140    }
141
142    fn create_inverted_index_blob() -> Vec<u8> {
143        let bitmap_size = mock_bitmap_bytes().len();
144        let fst_size = mock_fst().len();
145
146        // first index
147        let mut inverted_index = Vec::new();
148        inverted_index.extend_from_slice(&mock_bitmap_bytes()); // value bitmap
149        inverted_index.extend_from_slice(&mock_bitmap_bytes()); // null bitmap
150        inverted_index.extend_from_slice(&mock_fst()); // fst
151
152        let meta = InvertedIndexMeta {
153            name: "tag0".to_string(),
154            base_offset: 0,
155            inverted_index_size: inverted_index.len() as _,
156            relative_null_bitmap_offset: bitmap_size as _,
157            null_bitmap_size: bitmap_size as _,
158            relative_fst_offset: (bitmap_size * 2) as _,
159            fst_size: fst_size as _,
160            bitmap_type: BitmapType::Roaring as _,
161            ..Default::default()
162        };
163
164        // second index
165        let meta1 = InvertedIndexMeta {
166            name: "tag1".to_string(),
167            base_offset: meta.inverted_index_size,
168            inverted_index_size: inverted_index.len() as _,
169            relative_null_bitmap_offset: bitmap_size as _,
170            null_bitmap_size: bitmap_size as _,
171            relative_fst_offset: (bitmap_size * 2) as _,
172            fst_size: fst_size as _,
173            bitmap_type: BitmapType::Roaring as _,
174            ..Default::default()
175        };
176
177        // metas
178        let mut metas = InvertedIndexMetas {
179            total_row_count: 10,
180            segment_row_count: 1,
181            ..Default::default()
182        };
183        metas.metas.insert(meta.name.clone(), meta);
184        metas.metas.insert(meta1.name.clone(), meta1);
185        let mut meta_buf = Vec::new();
186        metas.encode(&mut meta_buf).unwrap();
187
188        let mut blob = vec![];
189
190        // first index
191        blob.extend_from_slice(&inverted_index);
192
193        // second index
194        blob.extend_from_slice(&inverted_index);
195
196        // footer
197        blob.extend_from_slice(&meta_buf);
198        blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
199
200        blob
201    }
202
203    #[tokio::test]
204    async fn test_inverted_index_blob_reader_metadata() {
205        let blob = create_inverted_index_blob();
206        let blob_reader = InvertedIndexBlobReader::new(blob);
207
208        let metas = blob_reader.metadata(None).await.unwrap();
209        assert_eq!(metas.metas.len(), 2);
210
211        let meta0 = metas.metas.get("tag0").unwrap();
212        assert_eq!(meta0.name, "tag0");
213        assert_eq!(meta0.base_offset, 0);
214        assert_eq!(meta0.inverted_index_size, 102);
215        assert_eq!(meta0.relative_null_bitmap_offset, 26);
216        assert_eq!(meta0.null_bitmap_size, 26);
217        assert_eq!(meta0.relative_fst_offset, 52);
218        assert_eq!(meta0.fst_size, 50);
219
220        let meta1 = metas.metas.get("tag1").unwrap();
221        assert_eq!(meta1.name, "tag1");
222        assert_eq!(meta1.base_offset, 102);
223        assert_eq!(meta1.inverted_index_size, 102);
224        assert_eq!(meta1.relative_null_bitmap_offset, 26);
225        assert_eq!(meta1.null_bitmap_size, 26);
226        assert_eq!(meta1.relative_fst_offset, 52);
227        assert_eq!(meta1.fst_size, 50);
228    }
229
230    #[tokio::test]
231    async fn test_inverted_index_blob_reader_fst() {
232        let blob = create_inverted_index_blob();
233        let blob_reader = InvertedIndexBlobReader::new(blob);
234
235        let metas = blob_reader.metadata(None).await.unwrap();
236        let meta = metas.metas.get("tag0").unwrap();
237
238        let fst_map = blob_reader
239            .fst(
240                meta.base_offset + meta.relative_fst_offset as u64,
241                meta.fst_size,
242                None,
243            )
244            .await
245            .unwrap();
246        assert_eq!(fst_map.len(), 2);
247        assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
248        assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
249
250        let meta = metas.metas.get("tag1").unwrap();
251        let fst_map = blob_reader
252            .fst(
253                meta.base_offset + meta.relative_fst_offset as u64,
254                meta.fst_size,
255                None,
256            )
257            .await
258            .unwrap();
259        assert_eq!(fst_map.len(), 2);
260        assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
261        assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
262    }
263
264    #[tokio::test]
265    async fn test_inverted_index_blob_reader_bitmap() {
266        let blob = create_inverted_index_blob();
267        let blob_reader = InvertedIndexBlobReader::new(blob);
268
269        let metas = blob_reader.metadata(None).await.unwrap();
270        let meta = metas.metas.get("tag0").unwrap();
271
272        let bitmap = blob_reader
273            .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
274            .await
275            .unwrap();
276        assert_eq!(bitmap, mock_bitmap());
277        let bitmap = blob_reader
278            .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
279            .await
280            .unwrap();
281        assert_eq!(bitmap, mock_bitmap());
282
283        let metas = blob_reader.metadata(None).await.unwrap();
284        let meta = metas.metas.get("tag1").unwrap();
285
286        let bitmap = blob_reader
287            .bitmap(meta.base_offset, 26, BitmapType::Roaring, None)
288            .await
289            .unwrap();
290        assert_eq!(bitmap, mock_bitmap());
291        let bitmap = blob_reader
292            .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring, None)
293            .await
294            .unwrap();
295        assert_eq!(bitmap, mock_bitmap());
296    }
297}