index/inverted_index/format/reader/
blob.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use bytes::Bytes;
20use common_base::range_read::RangeReader;
21use greptime_proto::v1::index::InvertedIndexMetas;
22use snafu::{ensure, ResultExt};
23
24use crate::inverted_index::error::{CommonIoSnafu, Result, UnexpectedBlobSizeSnafu};
25use crate::inverted_index::format::reader::footer::{
26    InvertedIndexFooterReader, DEFAULT_PREFETCH_SIZE,
27};
28use crate::inverted_index::format::reader::InvertedIndexReader;
29use crate::inverted_index::format::MIN_BLOB_SIZE;
30
31/// Inverted index blob reader, implements [`InvertedIndexReader`]
32pub struct InvertedIndexBlobReader<R> {
33    /// The blob
34    source: R,
35}
36
37impl<R> InvertedIndexBlobReader<R> {
38    pub fn new(source: R) -> Self {
39        Self { source }
40    }
41
42    fn validate_blob_size(blob_size: u64) -> Result<()> {
43        ensure!(
44            blob_size >= MIN_BLOB_SIZE,
45            UnexpectedBlobSizeSnafu {
46                min_blob_size: MIN_BLOB_SIZE,
47                actual_blob_size: blob_size,
48            }
49        );
50        Ok(())
51    }
52}
53
54#[async_trait]
55impl<R: RangeReader + Sync> InvertedIndexReader for InvertedIndexBlobReader<R> {
56    async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
57        let buf = self
58            .source
59            .read(offset..offset + size as u64)
60            .await
61            .context(CommonIoSnafu)?;
62        Ok(buf.into())
63    }
64
65    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
66        self.source.read_vec(ranges).await.context(CommonIoSnafu)
67    }
68
69    async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
70        let metadata = self.source.metadata().await.context(CommonIoSnafu)?;
71        let blob_size = metadata.content_length;
72        Self::validate_blob_size(blob_size)?;
73
74        let mut footer_reader = InvertedIndexFooterReader::new(&self.source, blob_size)
75            .with_prefetch_size(DEFAULT_PREFETCH_SIZE);
76        footer_reader.metadata().await.map(Arc::new)
77    }
78}
79
80#[cfg(test)]
81mod tests {
82    use fst::MapBuilder;
83    use greptime_proto::v1::index::{BitmapType, InvertedIndexMeta, InvertedIndexMetas};
84    use prost::Message;
85
86    use super::*;
87    use crate::bitmap::Bitmap;
88
89    fn mock_fst() -> Vec<u8> {
90        let mut fst_buf = Vec::new();
91        let mut build = MapBuilder::new(&mut fst_buf).unwrap();
92        build.insert("key1".as_bytes(), 1).unwrap();
93        build.insert("key2".as_bytes(), 2).unwrap();
94        build.finish().unwrap();
95        fst_buf
96    }
97
98    fn mock_bitmap() -> Bitmap {
99        Bitmap::from_lsb0_bytes(&[0b10101010, 0b10000000], BitmapType::Roaring)
100    }
101
102    fn mock_bitmap_bytes() -> Vec<u8> {
103        let mut buf = Vec::new();
104        mock_bitmap()
105            .serialize_into(BitmapType::Roaring, &mut buf)
106            .unwrap();
107        buf
108    }
109
110    fn create_inverted_index_blob() -> Vec<u8> {
111        let bitmap_size = mock_bitmap_bytes().len();
112        let fst_size = mock_fst().len();
113
114        // first index
115        let mut inverted_index = Vec::new();
116        inverted_index.extend_from_slice(&mock_bitmap_bytes()); // value bitmap
117        inverted_index.extend_from_slice(&mock_bitmap_bytes()); // null bitmap
118        inverted_index.extend_from_slice(&mock_fst()); // fst
119
120        let meta = InvertedIndexMeta {
121            name: "tag0".to_string(),
122            base_offset: 0,
123            inverted_index_size: inverted_index.len() as _,
124            relative_null_bitmap_offset: bitmap_size as _,
125            null_bitmap_size: bitmap_size as _,
126            relative_fst_offset: (bitmap_size * 2) as _,
127            fst_size: fst_size as _,
128            bitmap_type: BitmapType::Roaring as _,
129            ..Default::default()
130        };
131
132        // second index
133        let meta1 = InvertedIndexMeta {
134            name: "tag1".to_string(),
135            base_offset: meta.inverted_index_size,
136            inverted_index_size: inverted_index.len() as _,
137            relative_null_bitmap_offset: bitmap_size as _,
138            null_bitmap_size: bitmap_size as _,
139            relative_fst_offset: (bitmap_size * 2) as _,
140            fst_size: fst_size as _,
141            bitmap_type: BitmapType::Roaring as _,
142            ..Default::default()
143        };
144
145        // metas
146        let mut metas = InvertedIndexMetas {
147            total_row_count: 10,
148            segment_row_count: 1,
149            ..Default::default()
150        };
151        metas.metas.insert(meta.name.clone(), meta);
152        metas.metas.insert(meta1.name.clone(), meta1);
153        let mut meta_buf = Vec::new();
154        metas.encode(&mut meta_buf).unwrap();
155
156        let mut blob = vec![];
157
158        // first index
159        blob.extend_from_slice(&inverted_index);
160
161        // second index
162        blob.extend_from_slice(&inverted_index);
163
164        // footer
165        blob.extend_from_slice(&meta_buf);
166        blob.extend_from_slice(&(meta_buf.len() as u32).to_le_bytes());
167
168        blob
169    }
170
171    #[tokio::test]
172    async fn test_inverted_index_blob_reader_metadata() {
173        let blob = create_inverted_index_blob();
174        let blob_reader = InvertedIndexBlobReader::new(blob);
175
176        let metas = blob_reader.metadata().await.unwrap();
177        assert_eq!(metas.metas.len(), 2);
178
179        let meta0 = metas.metas.get("tag0").unwrap();
180        assert_eq!(meta0.name, "tag0");
181        assert_eq!(meta0.base_offset, 0);
182        assert_eq!(meta0.inverted_index_size, 102);
183        assert_eq!(meta0.relative_null_bitmap_offset, 26);
184        assert_eq!(meta0.null_bitmap_size, 26);
185        assert_eq!(meta0.relative_fst_offset, 52);
186        assert_eq!(meta0.fst_size, 50);
187
188        let meta1 = metas.metas.get("tag1").unwrap();
189        assert_eq!(meta1.name, "tag1");
190        assert_eq!(meta1.base_offset, 102);
191        assert_eq!(meta1.inverted_index_size, 102);
192        assert_eq!(meta1.relative_null_bitmap_offset, 26);
193        assert_eq!(meta1.null_bitmap_size, 26);
194        assert_eq!(meta1.relative_fst_offset, 52);
195        assert_eq!(meta1.fst_size, 50);
196    }
197
198    #[tokio::test]
199    async fn test_inverted_index_blob_reader_fst() {
200        let blob = create_inverted_index_blob();
201        let blob_reader = InvertedIndexBlobReader::new(blob);
202
203        let metas = blob_reader.metadata().await.unwrap();
204        let meta = metas.metas.get("tag0").unwrap();
205
206        let fst_map = blob_reader
207            .fst(
208                meta.base_offset + meta.relative_fst_offset as u64,
209                meta.fst_size,
210            )
211            .await
212            .unwrap();
213        assert_eq!(fst_map.len(), 2);
214        assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
215        assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
216
217        let meta = metas.metas.get("tag1").unwrap();
218        let fst_map = blob_reader
219            .fst(
220                meta.base_offset + meta.relative_fst_offset as u64,
221                meta.fst_size,
222            )
223            .await
224            .unwrap();
225        assert_eq!(fst_map.len(), 2);
226        assert_eq!(fst_map.get("key1".as_bytes()), Some(1));
227        assert_eq!(fst_map.get("key2".as_bytes()), Some(2));
228    }
229
230    #[tokio::test]
231    async fn test_inverted_index_blob_reader_bitmap() {
232        let blob = create_inverted_index_blob();
233        let blob_reader = InvertedIndexBlobReader::new(blob);
234
235        let metas = blob_reader.metadata().await.unwrap();
236        let meta = metas.metas.get("tag0").unwrap();
237
238        let bitmap = blob_reader
239            .bitmap(meta.base_offset, 26, BitmapType::Roaring)
240            .await
241            .unwrap();
242        assert_eq!(bitmap, mock_bitmap());
243        let bitmap = blob_reader
244            .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
245            .await
246            .unwrap();
247        assert_eq!(bitmap, mock_bitmap());
248
249        let metas = blob_reader.metadata().await.unwrap();
250        let meta = metas.metas.get("tag1").unwrap();
251
252        let bitmap = blob_reader
253            .bitmap(meta.base_offset, 26, BitmapType::Roaring)
254            .await
255            .unwrap();
256        assert_eq!(bitmap, mock_bitmap());
257        let bitmap = blob_reader
258            .bitmap(meta.base_offset + 26, 26, BitmapType::Roaring)
259            .await
260            .unwrap();
261        assert_eq!(bitmap, mock_bitmap());
262    }
263}