mito2/cache/index/
inverted_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::InvertedIndexMetas;
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures::future::try_join_all;
22use index::inverted_index::error::Result;
23use index::inverted_index::format::reader::InvertedIndexReader;
24use prost::Message;
25
26use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28use crate::sst::file::FileId;
29
30const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
31
32/// Cache for inverted index.
33pub type InvertedIndexCache = IndexCache<FileId, InvertedIndexMetas>;
34pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
35
36impl InvertedIndexCache {
37    /// Creates a new inverted index cache.
38    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
39        Self::new_with_weighter(
40            index_metadata_cap,
41            index_content_cap,
42            page_size,
43            INDEX_TYPE_INVERTED_INDEX,
44            inverted_index_metadata_weight,
45            inverted_index_content_weight,
46        )
47    }
48}
49
50/// Calculates weight for inverted index metadata.
51fn inverted_index_metadata_weight(k: &FileId, v: &Arc<InvertedIndexMetas>) -> u32 {
52    (k.as_bytes().len() + v.encoded_len()) as u32
53}
54
55/// Calculates weight for inverted index content.
56fn inverted_index_content_weight((k, _): &(FileId, PageKey), v: &Bytes) -> u32 {
57    (k.as_bytes().len() + v.len()) as u32
58}
59
60/// Inverted index blob reader with cache.
61pub struct CachedInvertedIndexBlobReader<R> {
62    file_id: FileId,
63    blob_size: u64,
64    inner: R,
65    cache: InvertedIndexCacheRef,
66}
67
68impl<R> CachedInvertedIndexBlobReader<R> {
69    /// Creates a new inverted index blob reader with cache.
70    pub fn new(file_id: FileId, blob_size: u64, inner: R, cache: InvertedIndexCacheRef) -> Self {
71        Self {
72            file_id,
73            blob_size,
74            inner,
75            cache,
76        }
77    }
78}
79
80#[async_trait]
81impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
82    async fn range_read(&self, offset: u64, size: u32) -> Result<Vec<u8>> {
83        let inner = &self.inner;
84        self.cache
85            .get_or_load(
86                self.file_id,
87                self.blob_size,
88                offset,
89                size,
90                move |ranges| async move { inner.read_vec(&ranges).await },
91            )
92            .await
93    }
94
95    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
96        let fetch = ranges.iter().map(|range| {
97            let inner = &self.inner;
98            self.cache.get_or_load(
99                self.file_id,
100                self.blob_size,
101                range.start,
102                (range.end - range.start) as u32,
103                move |ranges| async move { inner.read_vec(&ranges).await },
104            )
105        });
106        Ok(try_join_all(fetch)
107            .await?
108            .into_iter()
109            .map(Bytes::from)
110            .collect::<Vec<_>>())
111    }
112
113    async fn metadata(&self) -> Result<Arc<InvertedIndexMetas>> {
114        if let Some(cached) = self.cache.get_metadata(self.file_id) {
115            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
116            Ok(cached)
117        } else {
118            let meta = self.inner.metadata().await?;
119            self.cache.put_metadata(self.file_id, meta.clone());
120            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
121            Ok(meta)
122        }
123    }
124}
125
126#[cfg(test)]
127mod test {
128    use std::num::NonZeroUsize;
129
130    use futures::stream;
131    use index::bitmap::{Bitmap, BitmapType};
132    use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
133    use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
134    use index::Bytes;
135    use prometheus::register_int_counter_vec;
136    use rand::{Rng, RngCore};
137
138    use super::*;
139    use crate::sst::index::store::InstrumentedStore;
140    use crate::test_util::TestEnv;
141
142    // Repeat times for following little fuzz tests.
143    const FUZZ_REPEAT_TIMES: usize = 100;
144
145    // Fuzz test for index data page key
146    #[test]
147    fn fuzz_index_calculation() {
148        // randomly generate a large u8 array
149        let mut rng = rand::rng();
150        let mut data = vec![0u8; 1024 * 1024];
151        rng.fill_bytes(&mut data);
152
153        for _ in 0..FUZZ_REPEAT_TIMES {
154            let offset = rng.random_range(0..data.len() as u64);
155            let size = rng.random_range(0..data.len() as u32 - offset as u32);
156            let page_size: usize = rng.random_range(1..1024);
157
158            let indexes =
159                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
160            let page_num = indexes.len();
161            let mut read = Vec::with_capacity(size as usize);
162            for key in indexes.into_iter() {
163                let start = key.page_id as usize * page_size;
164                let page = if start + page_size < data.len() {
165                    &data[start..start + page_size]
166                } else {
167                    &data[start..]
168                };
169                read.extend_from_slice(page);
170            }
171            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
172            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
173            if read != data.get(expected_range).unwrap() {
174                panic!(
175                    "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
176                    offset, size, page_size, read.len(), size as usize,
177                    PageKey::calculate_range(offset, size, page_size as u64),
178                    page_num
179                );
180            }
181        }
182    }
183
184    fn unpack(fst_value: u64) -> [u32; 2] {
185        bytemuck::cast::<u64, [u32; 2]>(fst_value)
186    }
187
188    async fn create_inverted_index_blob() -> Vec<u8> {
189        let mut blob = Vec::new();
190        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
191        writer
192            .add_index(
193                "tag0".to_string(),
194                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
195                Box::new(stream::iter(vec![
196                    Ok((
197                        Bytes::from("a"),
198                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
199                    )),
200                    Ok((
201                        Bytes::from("b"),
202                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
203                    )),
204                    Ok((
205                        Bytes::from("c"),
206                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
207                    )),
208                ])),
209                index::bitmap::BitmapType::Roaring,
210            )
211            .await
212            .unwrap();
213        writer
214            .add_index(
215                "tag1".to_string(),
216                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
217                Box::new(stream::iter(vec![
218                    Ok((
219                        Bytes::from("x"),
220                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
221                    )),
222                    Ok((
223                        Bytes::from("y"),
224                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
225                    )),
226                    Ok((
227                        Bytes::from("z"),
228                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
229                    )),
230                ])),
231                index::bitmap::BitmapType::Roaring,
232            )
233            .await
234            .unwrap();
235        writer
236            .finish(8, NonZeroUsize::new(1).unwrap())
237            .await
238            .unwrap();
239
240        blob
241    }
242
243    #[tokio::test]
244    async fn test_inverted_index_cache() {
245        let blob = create_inverted_index_blob().await;
246
247        // Init a test range reader in local fs.
248        let mut env = TestEnv::new();
249        let file_size = blob.len() as u64;
250        let store = env.init_object_store_manager();
251        let temp_path = "data";
252        store.write(temp_path, blob).await.unwrap();
253        let store = InstrumentedStore::new(store);
254        let metric =
255            register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
256        let counter = metric.with_label_values(&["test"]);
257        let range_reader = store
258            .range_reader("data", &counter, &counter)
259            .await
260            .unwrap();
261
262        let reader = InvertedIndexBlobReader::new(range_reader);
263        let cached_reader = CachedInvertedIndexBlobReader::new(
264            FileId::random(),
265            file_size,
266            reader,
267            Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
268        );
269        let metadata = cached_reader.metadata().await.unwrap();
270        assert_eq!(metadata.total_row_count, 8);
271        assert_eq!(metadata.segment_row_count, 1);
272        assert_eq!(metadata.metas.len(), 2);
273        // tag0
274        let tag0 = metadata.metas.get("tag0").unwrap();
275        let stats0 = tag0.stats.as_ref().unwrap();
276        assert_eq!(stats0.distinct_count, 3);
277        assert_eq!(stats0.null_count, 1);
278        assert_eq!(stats0.min_value, Bytes::from("a"));
279        assert_eq!(stats0.max_value, Bytes::from("c"));
280        let fst0 = cached_reader
281            .fst(
282                tag0.base_offset + tag0.relative_fst_offset as u64,
283                tag0.fst_size,
284            )
285            .await
286            .unwrap();
287        assert_eq!(fst0.len(), 3);
288        let [offset, size] = unpack(fst0.get(b"a").unwrap());
289        let bitmap = cached_reader
290            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
291            .await
292            .unwrap();
293        assert_eq!(
294            bitmap,
295            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
296        );
297        let [offset, size] = unpack(fst0.get(b"b").unwrap());
298        let bitmap = cached_reader
299            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
300            .await
301            .unwrap();
302        assert_eq!(
303            bitmap,
304            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
305        );
306        let [offset, size] = unpack(fst0.get(b"c").unwrap());
307        let bitmap = cached_reader
308            .bitmap(tag0.base_offset + offset as u64, size, BitmapType::Roaring)
309            .await
310            .unwrap();
311        assert_eq!(
312            bitmap,
313            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
314        );
315
316        // tag1
317        let tag1 = metadata.metas.get("tag1").unwrap();
318        let stats1 = tag1.stats.as_ref().unwrap();
319        assert_eq!(stats1.distinct_count, 3);
320        assert_eq!(stats1.null_count, 1);
321        assert_eq!(stats1.min_value, Bytes::from("x"));
322        assert_eq!(stats1.max_value, Bytes::from("z"));
323        let fst1 = cached_reader
324            .fst(
325                tag1.base_offset + tag1.relative_fst_offset as u64,
326                tag1.fst_size,
327            )
328            .await
329            .unwrap();
330        assert_eq!(fst1.len(), 3);
331        let [offset, size] = unpack(fst1.get(b"x").unwrap());
332        let bitmap = cached_reader
333            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
334            .await
335            .unwrap();
336        assert_eq!(
337            bitmap,
338            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
339        );
340        let [offset, size] = unpack(fst1.get(b"y").unwrap());
341        let bitmap = cached_reader
342            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
343            .await
344            .unwrap();
345        assert_eq!(
346            bitmap,
347            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
348        );
349        let [offset, size] = unpack(fst1.get(b"z").unwrap());
350        let bitmap = cached_reader
351            .bitmap(tag1.base_offset + offset as u64, size, BitmapType::Roaring)
352            .await
353            .unwrap();
354        assert_eq!(
355            bitmap,
356            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
357        );
358
359        // fuzz test
360        let mut rng = rand::rng();
361        for _ in 0..FUZZ_REPEAT_TIMES {
362            let offset = rng.random_range(0..file_size);
363            let size = rng.random_range(0..file_size as u32 - offset as u32);
364            let expected = cached_reader.range_read(offset, size).await.unwrap();
365            let inner = &cached_reader.inner;
366            let read = cached_reader
367                .cache
368                .get_or_load(
369                    cached_reader.file_id,
370                    file_size,
371                    offset,
372                    size,
373                    |ranges| async move { inner.read_vec(&ranges).await },
374                )
375                .await
376                .unwrap();
377            assert_eq!(read, expected);
378        }
379    }
380}