mito2/cache/index/
inverted_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::inverted_index::error::Result;
22use index::inverted_index::format::reader::InvertedIndexReader;
23use prost::Message;
24use store_api::storage::FileId;
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
30
31/// Cache for inverted index.
32pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
33pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
34
35impl InvertedIndexCache {
36    /// Creates a new inverted index cache.
37    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
38        Self::new_with_weighter(
39            index_metadata_cap,
40            index_content_cap,
41            page_size,
42            INDEX_TYPE_INVERTED_INDEX,
43            inverted_index_metadata_weight,
44            inverted_index_content_weight,
45        )
46    }
47
48    /// Removes all cached entries for the given `file_id`.
49    pub fn invalidate_file(&self, file_id: FileId) {
50        self.invalidate_if(move |key| *key == file_id);
51    }
52}
53
54/// Calculates weight for inverted index metadata.
55fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
56    (k.as_bytes().len() + v.encoded_len()) as u32
57}
58
59/// Calculates weight for inverted index content.
60fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
61    (k.as_bytes().len() + v.len()) as u32
62}
63
64/// Inverted index blob reader with cache.
65pub struct CachedInvertedIndexBlobReader<R> {
66    file_id: FileId,
67    blob_size: u64,
68    inner: R,
69    cache: InvertedIndexCacheRef,
70}
71
72impl<R> CachedInvertedIndexBlobReader<R> {
73    /// Creates a new inverted index blob reader with cache.
74    pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
75        Self {
76            file_id,
77            blob_size,
78            inner,
79            cache,
80        }
81    }
82}
83
84#[async_trait]
85impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
86    async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
87        let inner = &self.inner;
88        self.cache
89            .get_or_load(
90                self.file_id,
91                self.blob_size,
92                offset,
93                size,
94                move |ranges| async move { inner.read_vec(&ranges).await },
95            )
96            .await
97    }
98
99    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
100        let mut pages = Vec::with_capacity(ranges.len());
101        for range in ranges {
102            let inner = &self.inner;
103            let page = self
104                .cache
105                .get_or_load(
106                    self.file_id,
107                    self.blob_size,
108                    range.start,
109                    (range.end - range.start) as u32,
110                    move |ranges| async move { inner.read_vec(&ranges).await },
111                )
112                .await?;
113
114            pages.push(Bytes::from(page));
115        }
116
117        Ok(pages)
118    }
119
120    async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
121        if let Some(cached) = self.cache.get_metadata(self.file_id) {
122            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
123            Ok(cached)
124        } else {
125            let meta = self.inner.metadata().await?;
126            self.cache.put_metadata(self.file_id, meta.clone());
127            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
128            Ok(meta)
129        }
130    }
131}
132
133#[cfg(test)]
134mod test {
135    use std::num::NonZeroUsize;
136
137    use futures::stream;
138    use index::Bytes;
139    use index::bitmap::{Bitmap, BitmapType};
140    use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
141    use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
142    use prometheus::register_int_counter_vec;
143    use rand::{Rng, RngCore};
144
145    use super::*;
146    use crate::sst::index::store::InstrumentedStore;
147    use crate::test_util::TestEnv;
148
149    // Repeat times for following little fuzz tests.
150    const FUZZ_REPEAT_TIMES: usize = 100;
151
152    // Fuzz test for index data page key
153    #[test]
154    fn fuzz_index_calculation() {
155        // randomly generate a large u8 array
156        let mut rng = rand::rng();
157        let mut data = vec![0u8; 1024 * 1024];
158        rng.fill_bytes(&mut data);
159
160        for _ in 0..FUZZ_REPEAT_TIMES {
161            let offset = rng.random_range(0..data.len() as u64);
162            let size = rng.random_range(0..data.len() as u32 - offset as u32);
163            let page_size: usize = rng.random_range(1..1024);
164
165            let indexes =
166                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
167            let page_num = indexes.len();
168            let mut read = Vec::with_capacity(size as usize);
169            for key in indexes.into_iter() {
170                let start = key.page_id as usize * page_size;
171                let page = if start + page_size < data.len() {
172                    &data[start..start + page_size]
173                } else {
174                    &data[start..]
175                };
176                read.extend_from_slice(page);
177            }
178            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
179            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
180            if read != data.get(expected_range).unwrap() {
181                panic!(
182                    "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
183                    offset,
184                    size,
185                    page_size,
186                    read.len(),
187                    size as usize,
188                    PageKey::calculate_range(offset, size, page_size as u64),
189                    page_num
190                );
191            }
192        }
193    }
194
195    fn unpack(fst_value: u64) -> [u32; 2] {
196        bytemuck::cast::<u64, [u32; 2]>(fst_value)
197    }
198
199    async fn create_inverted_index_blob() -> Vec<u8> {
200        let mut blob = Vec::new();
201        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
202        writer
203            .add_index(
204                "tag0".to_string(),
205                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
206                Box::new(stream::iter(vec![
207                    Ok((
208                        Bytes::from("a"),
209                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
210                    )),
211                    Ok((
212                        Bytes::from("b"),
213                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
214                    )),
215                    Ok((
216                        Bytes::from("c"),
217                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
218                    )),
219                ])),
220                index::bitmap::BitmapType::Roaring,
221            )
222            .await
223            .unwrap();
224        writer
225            .add_index(
226                "tag1".to_string(),
227                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
228                Box::new(stream::iter(vec![
229                    Ok((
230                        Bytes::from("x"),
231                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
232                    )),
233                    Ok((
234                        Bytes::from("y"),
235                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
236                    )),
237                    Ok((
238                        Bytes::from("z"),
239                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
240                    )),
241                ])),
242                index::bitmap::BitmapType::Roaring,
243            )
244            .await
245            .unwrap();
246        writer
247            .finish(8, NonZeroUsize::new(1).unwrap())
248            .await
249            .unwrap();
250
251        blob
252    }
253
254    #[tokio::test]
255    async fn test_inverted_index_cache() {
256        let blob = create_inverted_index_blob().await;
257
258        // Init a test range reader in local fs.
259        let mut env = TestEnv::new().await;
260        let file_size = blob.len() as u64;
261        let store = env.init_object_store_manager();
262        let temp_path = "data";
263        store.write(temp_path, blob).await.unwrap();
264        let store = InstrumentedStore::new(store);
265        let metric =
266            register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
267        let counter = metric.with_label_values(&["test"]);
268        let range_reader = store
269            .range_reader("data", &counter, &counter)
270            .await
271            .unwrap();
272
273        let reader = InvertedIndexBlobReader::new(range_reader);
274        let cached_reader = CachedInvertedIndexBlobReader::new(
275            FileId::random(),
276            file_size,
277            reader,
278            Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
279        );
280        let metadata = cached_reader.metadata().await.unwrap();
281        assert_eq!(metadata.total_row_count, 8);
282        assert_eq!(metadata.segment_row_count, 1);
283        assert_eq!(metadata.metas.len(), 2);
284        // tag0
285        let tag0 = metadata.metas.get("tag0").unwrap();
286        let stats0 = tag0.stats.as_ref().unwrap();
287        assert_eq!(stats0.distinct_count, 3);
288        assert_eq!(stats0.null_count, 1);
289        assert_eq!(stats0.min_value, Bytes::from("a"));
290        assert_eq!(stats0.max_value, Bytes::from("c"));
291        let fst0 = cached_reader
292            .fst(
293                tag0.base_offset + tag0.relative_fst_offset as u64,
294                tag0.fst_size,
295            )
296            .await
297            .unwrap();
298        assert_eq!(fst0.len(), 3);
299        let [offset, size] = unpack(fst0.get(b"a").unwrap());
300        let bitmap = cached_reader
301            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
302            .await
303            .unwrap();
304        assert_eq!(
305            bitmap,
306            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
307        );
308        let [offset, size] = unpack(fst0.get(b"b").unwrap());
309        let bitmap = cached_reader
310            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
311            .await
312            .unwrap();
313        assert_eq!(
314            bitmap,
315            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
316        );
317        let [offset, size] = unpack(fst0.get(b"c").unwrap());
318        let bitmap = cached_reader
319            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
320            .await
321            .unwrap();
322        assert_eq!(
323            bitmap,
324            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
325        );
326
327        // tag1
328        let tag1 = metadata.metas.get("tag1").unwrap();
329        let stats1 = tag1.stats.as_ref().unwrap();
330        assert_eq!(stats1.distinct_count, 3);
331        assert_eq!(stats1.null_count, 1);
332        assert_eq!(stats1.min_value, Bytes::from("x"));
333        assert_eq!(stats1.max_value, Bytes::from("z"));
334        let fst1 = cached_reader
335            .fst(
336                tag1.base_offset + tag1.relative_fst_offset as u64,
337                tag1.fst_size,
338            )
339            .await
340            .unwrap();
341        assert_eq!(fst1.len(), 3);
342        let [offset, size] = unpack(fst1.get(b"x").unwrap());
343        let bitmap = cached_reader
344            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
345            .await
346            .unwrap();
347        assert_eq!(
348            bitmap,
349            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
350        );
351        let [offset, size] = unpack(fst1.get(b"y").unwrap());
352        let bitmap = cached_reader
353            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
354            .await
355            .unwrap();
356        assert_eq!(
357            bitmap,
358            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
359        );
360        let [offset, size] = unpack(fst1.get(b"z").unwrap());
361        let bitmap = cached_reader
362            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
363            .await
364            .unwrap();
365        assert_eq!(
366            bitmap,
367            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
368        );
369
370        // fuzz test
371        let mut rng = rand::rng();
372        for _ in 0..FUZZ_REPEAT_TIMES {
373            let offset = rng.random_range(0..file_size);
374            let size = rng.random_range(0..file_size as u32 - offset as u32);
375            let expected = cached_reader.range_read(offset, size).await.unwrap();
376            let inner = &cached_reader.inner;
377            let read = cached_reader
378                .cache
379                .get_or_load(
380                    cached_reader.file_id,
381                    file_size,
382                    offset,
383                    size,
384                    |ranges| async move { inner.read_vec(&ranges).await },
385                )
386                .await
387                .unwrap();
388            assert_eq!(read, expected);
389        }
390    }
391}