mito2/cache/index/
inverted_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::inverted_index::error::Result;
22use index::inverted_index::format::reader::InvertedIndexReader;
23use prost::Message;
24use store_api::storage::FileId;
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
30
31/// Cache for inverted index.
32pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
33pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
34
35impl InvertedIndexCache {
36    /// Creates a new inverted index cache.
37    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
38        Self::new_with_weighter(
39            index_metadata_cap,
40            index_content_cap,
41            page_size,
42            INDEX_TYPE_INVERTED_INDEX,
43            inverted_index_metadata_weight,
44            inverted_index_content_weight,
45        )
46    }
47}
48
49/// Calculates weight for inverted index metadata.
50fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
51    (k.as_bytes().len() + v.encoded_len()) as u32
52}
53
54/// Calculates weight for inverted index content.
55fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
56    (k.as_bytes().len() + v.len()) as u32
57}
58
59/// Inverted index blob reader with cache.
60pub struct CachedInvertedIndexBlobReader<R> {
61    file_id: FileId,
62    blob_size: u64,
63    inner: R,
64    cache: InvertedIndexCacheRef,
65}
66
67impl<R> CachedInvertedIndexBlobReader<R> {
68    /// Creates a new inverted index blob reader with cache.
69    pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
70        Self {
71            file_id,
72            blob_size,
73            inner,
74            cache,
75        }
76    }
77}
78
79#[async_trait]
80impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
81    async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
82        let inner = &self.inner;
83        self.cache
84            .get_or_load(
85                self.file_id,
86                self.blob_size,
87                offset,
88                size,
89                move |ranges| async move { inner.read_vec(&ranges).await },
90            )
91            .await
92    }
93
94    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
95        let mut pages = Vec::with_capacity(ranges.len());
96        for range in ranges {
97            let inner = &self.inner;
98            let page = self
99                .cache
100                .get_or_load(
101                    self.file_id,
102                    self.blob_size,
103                    range.start,
104                    (range.end - range.start) as u32,
105                    move |ranges| async move { inner.read_vec(&ranges).await },
106                )
107                .await?;
108
109            pages.push(Bytes::from(page));
110        }
111
112        Ok(pages)
113    }
114
115    async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
116        if let Some(cached) = self.cache.get_metadata(self.file_id) {
117            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
118            Ok(cached)
119        } else {
120            let meta = self.inner.metadata().await?;
121            self.cache.put_metadata(self.file_id, meta.clone());
122            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
123            Ok(meta)
124        }
125    }
126}
127
128#[cfg(test)]
129mod test {
130    use std::num::NonZeroUsize;
131
132    use futures::stream;
133    use index::Bytes;
134    use index::bitmap::{Bitmap, BitmapType};
135    use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
136    use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
137    use prometheus::register_int_counter_vec;
138    use rand::{Rng, RngCore};
139
140    use super::*;
141    use crate::sst::index::store::InstrumentedStore;
142    use crate::test_util::TestEnv;
143
144    // Repeat times for following little fuzz tests.
145    const FUZZ_REPEAT_TIMES: usize = 100;
146
147    // Fuzz test for index data page key
148    #[test]
149    fn fuzz_index_calculation() {
150        // randomly generate a large u8 array
151        let mut rng = rand::rng();
152        let mut data = vec![0u8; 1024 * 1024];
153        rng.fill_bytes(&mut data);
154
155        for _ in 0..FUZZ_REPEAT_TIMES {
156            let offset = rng.random_range(0..data.len() as u64);
157            let size = rng.random_range(0..data.len() as u32 - offset as u32);
158            let page_size: usize = rng.random_range(1..1024);
159
160            let indexes =
161                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
162            let page_num = indexes.len();
163            let mut read = Vec::with_capacity(size as usize);
164            for key in indexes.into_iter() {
165                let start = key.page_id as usize * page_size;
166                let page = if start + page_size < data.len() {
167                    &data[start..start + page_size]
168                } else {
169                    &data[start..]
170                };
171                read.extend_from_slice(page);
172            }
173            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
174            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
175            if read != data.get(expected_range).unwrap() {
176                panic!(
177                    "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
178                    offset,
179                    size,
180                    page_size,
181                    read.len(),
182                    size as usize,
183                    PageKey::calculate_range(offset, size, page_size as u64),
184                    page_num
185                );
186            }
187        }
188    }
189
190    fn unpack(fst_value: u64) -> [u32; 2] {
191        bytemuck::cast::<u64, [u32; 2]>(fst_value)
192    }
193
194    async fn create_inverted_index_blob() -> Vec<u8> {
195        let mut blob = Vec::new();
196        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
197        writer
198            .add_index(
199                "tag0".to_string(),
200                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
201                Box::new(stream::iter(vec![
202                    Ok((
203                        Bytes::from("a"),
204                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
205                    )),
206                    Ok((
207                        Bytes::from("b"),
208                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
209                    )),
210                    Ok((
211                        Bytes::from("c"),
212                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
213                    )),
214                ])),
215                index::bitmap::BitmapType::Roaring,
216            )
217            .await
218            .unwrap();
219        writer
220            .add_index(
221                "tag1".to_string(),
222                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
223                Box::new(stream::iter(vec![
224                    Ok((
225                        Bytes::from("x"),
226                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
227                    )),
228                    Ok((
229                        Bytes::from("y"),
230                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
231                    )),
232                    Ok((
233                        Bytes::from("z"),
234                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
235                    )),
236                ])),
237                index::bitmap::BitmapType::Roaring,
238            )
239            .await
240            .unwrap();
241        writer
242            .finish(8, NonZeroUsize::new(1).unwrap())
243            .await
244            .unwrap();
245
246        blob
247    }
248
249    #[tokio::test]
250    async fn test_inverted_index_cache() {
251        let blob = create_inverted_index_blob().await;
252
253        // Init a test range reader in local fs.
254        let mut env = TestEnv::new().await;
255        let file_size = blob.len() as u64;
256        let store = env.init_object_store_manager();
257        let temp_path = "data";
258        store.write(temp_path, blob).await.unwrap();
259        let store = InstrumentedStore::new(store);
260        let metric =
261            register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
262        let counter = metric.with_label_values(&["test"]);
263        let range_reader = store
264            .range_reader("data", &counter, &counter)
265            .await
266            .unwrap();
267
268        let reader = InvertedIndexBlobReader::new(range_reader);
269        let cached_reader = CachedInvertedIndexBlobReader::new(
270            FileId::random(),
271            file_size,
272            reader,
273            Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
274        );
275        let metadata = cached_reader.metadata().await.unwrap();
276        assert_eq!(metadata.total_row_count, 8);
277        assert_eq!(metadata.segment_row_count, 1);
278        assert_eq!(metadata.metas.len(), 2);
279        // tag0
280        let tag0 = metadata.metas.get("tag0").unwrap();
281        let stats0 = tag0.stats.as_ref().unwrap();
282        assert_eq!(stats0.distinct_count, 3);
283        assert_eq!(stats0.null_count, 1);
284        assert_eq!(stats0.min_value, Bytes::from("a"));
285        assert_eq!(stats0.max_value, Bytes::from("c"));
286        let fst0 = cached_reader
287            .fst(
288                tag0.base_offset + tag0.relative_fst_offset as u64,
289                tag0.fst_size,
290            )
291            .await
292            .unwrap();
293        assert_eq!(fst0.len(), 3);
294        let [offset, size] = unpack(fst0.get(b"a").unwrap());
295        let bitmap = cached_reader
296            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
297            .await
298            .unwrap();
299        assert_eq!(
300            bitmap,
301            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
302        );
303        let [offset, size] = unpack(fst0.get(b"b").unwrap());
304        let bitmap = cached_reader
305            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
306            .await
307            .unwrap();
308        assert_eq!(
309            bitmap,
310            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
311        );
312        let [offset, size] = unpack(fst0.get(b"c").unwrap());
313        let bitmap = cached_reader
314            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
315            .await
316            .unwrap();
317        assert_eq!(
318            bitmap,
319            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
320        );
321
322        // tag1
323        let tag1 = metadata.metas.get("tag1").unwrap();
324        let stats1 = tag1.stats.as_ref().unwrap();
325        assert_eq!(stats1.distinct_count, 3);
326        assert_eq!(stats1.null_count, 1);
327        assert_eq!(stats1.min_value, Bytes::from("x"));
328        assert_eq!(stats1.max_value, Bytes::from("z"));
329        let fst1 = cached_reader
330            .fst(
331                tag1.base_offset + tag1.relative_fst_offset as u64,
332                tag1.fst_size,
333            )
334            .await
335            .unwrap();
336        assert_eq!(fst1.len(), 3);
337        let [offset, size] = unpack(fst1.get(b"x").unwrap());
338        let bitmap = cached_reader
339            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
340            .await
341            .unwrap();
342        assert_eq!(
343            bitmap,
344            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
345        );
346        let [offset, size] = unpack(fst1.get(b"y").unwrap());
347        let bitmap = cached_reader
348            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
349            .await
350            .unwrap();
351        assert_eq!(
352            bitmap,
353            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
354        );
355        let [offset, size] = unpack(fst1.get(b"z").unwrap());
356        let bitmap = cached_reader
357            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
358            .await
359            .unwrap();
360        assert_eq!(
361            bitmap,
362            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
363        );
364
365        // fuzz test
366        let mut rng = rand::rng();
367        for _ in 0..FUZZ_REPEAT_TIMES {
368            let offset = rng.random_range(0..file_size);
369            let size = rng.random_range(0..file_size as u32 - offset as u32);
370            let expected = cached_reader.range_read(offset, size).await.unwrap();
371            let inner = &cached_reader.inner;
372            let read = cached_reader
373                .cache
374                .get_or_load(
375                    cached_reader.file_id,
376                    file_size,
377                    offset,
378                    size,
379                    |ranges| async move { inner.read_vec(&ranges).await },
380                )
381                .await
382                .unwrap();
383            assert_eq!(read, expected);
384        }
385    }
386}