mito2/cache/index/
inverted_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use core::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::index::InvertedIndexMetas;
20use async_trait::async_trait;
21use bytes::Bytes;
22use index::inverted_index::error::Result;
23use index::inverted_index::format::reader::{InvertedIndexReadMetrics, InvertedIndexReader};
24use prost::Message;
25use store_api::storage::{FileId, IndexVersion};
26
27use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
28use crate::metrics::{CACHE_HIT, CACHE_MISS};
29
30const INDEX_TYPE_INVERTED_INDEX: &str = "inverted_index";
31
32/// Cache for inverted index.
33pub type InvertedIndexCache = IndexCache<(FileId, IndexVersion), InvertedIndexMetas>;
34pub type InvertedIndexCacheRef = Arc<InvertedIndexCache>;
35
36impl InvertedIndexCache {
37    /// Creates a new inverted index cache.
38    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
39        Self::new_with_weighter(
40            index_metadata_cap,
41            index_content_cap,
42            page_size,
43            INDEX_TYPE_INVERTED_INDEX,
44            inverted_index_metadata_weight,
45            inverted_index_content_weight,
46        )
47    }
48
49    /// Removes all cached entries for the given `file_id`.
50    pub fn invalidate_file(&self, file_id: FileId) {
51        self.invalidate_if(move |key| key.0 == file_id);
52    }
53}
54
55/// Calculates weight for inverted index metadata.
56fn inverted_index_metadata_weight(k: &(FileId, IndexVersion), v: &Arc<InvertedIndexMetas>) -> u32 {
57    (k.0.as_bytes().len() + size_of::<IndexVersion>() + v.encoded_len()) as u32
58}
59
60/// Calculates weight for inverted index content.
61fn inverted_index_content_weight((k, _): &((FileId, IndexVersion), PageKey), v: &Bytes) -> u32 {
62    (k.0.as_bytes().len() + size_of::<IndexVersion>() + v.len()) as u32
63}
64
65/// Inverted index blob reader with cache.
66pub struct CachedInvertedIndexBlobReader<R> {
67    file_id: FileId,
68    index_version: IndexVersion,
69    blob_size: u64,
70    inner: R,
71    cache: InvertedIndexCacheRef,
72}
73
74impl<R> CachedInvertedIndexBlobReader<R> {
75    /// Creates a new inverted index blob reader with cache.
76    pub fn new(
77        file_id: FileId,
78        index_version: IndexVersion,
79        blob_size: u64,
80        inner: R,
81        cache: InvertedIndexCacheRef,
82    ) -> Self {
83        Self {
84            file_id,
85            index_version,
86            blob_size,
87            inner,
88            cache,
89        }
90    }
91}
92
93#[async_trait]
94impl<R: InvertedIndexReader> InvertedIndexReader for CachedInvertedIndexBlobReader<R> {
95    async fn range_read<'a>(
96        &self,
97        offset: u64,
98        size: u32,
99        metrics: Option<&'a mut InvertedIndexReadMetrics>,
100    ) -> Result<Vec<u8>> {
101        let start = metrics.as_ref().map(|_| Instant::now());
102
103        let inner = &self.inner;
104        let (result, cache_metrics) = self
105            .cache
106            .get_or_load(
107                (self.file_id, self.index_version),
108                self.blob_size,
109                offset,
110                size,
111                move |ranges| async move { inner.read_vec(&ranges, None).await },
112            )
113            .await?;
114
115        if let Some(m) = metrics {
116            m.total_bytes += cache_metrics.page_bytes;
117            m.total_ranges += cache_metrics.num_pages;
118            m.cache_hit += cache_metrics.cache_hit;
119            m.cache_miss += cache_metrics.cache_miss;
120            m.fetch_elapsed += start.unwrap().elapsed();
121        }
122
123        Ok(result)
124    }
125
126    async fn read_vec<'a>(
127        &self,
128        ranges: &[Range<u64>],
129        metrics: Option<&'a mut InvertedIndexReadMetrics>,
130    ) -> Result<Vec<Bytes>> {
131        let start = metrics.as_ref().map(|_| Instant::now());
132
133        let mut pages = Vec::with_capacity(ranges.len());
134        let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
135        for range in ranges {
136            let inner = &self.inner;
137            let (page, cache_metrics) = self
138                .cache
139                .get_or_load(
140                    (self.file_id, self.index_version),
141                    self.blob_size,
142                    range.start,
143                    (range.end - range.start) as u32,
144                    move |ranges| async move { inner.read_vec(&ranges, None).await },
145                )
146                .await?;
147
148            total_cache_metrics.merge(&cache_metrics);
149            pages.push(Bytes::from(page));
150        }
151
152        if let Some(m) = metrics {
153            m.total_bytes += total_cache_metrics.page_bytes;
154            m.total_ranges += total_cache_metrics.num_pages;
155            m.cache_hit += total_cache_metrics.cache_hit;
156            m.cache_miss += total_cache_metrics.cache_miss;
157            m.fetch_elapsed += start.unwrap().elapsed();
158        }
159
160        Ok(pages)
161    }
162
163    async fn metadata<'a>(
164        &self,
165        metrics: Option<&'a mut InvertedIndexReadMetrics>,
166    ) -> Result<Arc<InvertedIndexMetas>> {
167        if let Some(cached) = self.cache.get_metadata((self.file_id, self.index_version)) {
168            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
169            if let Some(m) = metrics {
170                m.cache_hit += 1;
171            }
172            Ok(cached)
173        } else {
174            let meta = self.inner.metadata(metrics).await?;
175            self.cache
176                .put_metadata((self.file_id, self.index_version), meta.clone());
177            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
178            Ok(meta)
179        }
180    }
181}
182
183#[cfg(test)]
184mod test {
185    use std::num::NonZeroUsize;
186
187    use futures::stream;
188    use index::Bytes;
189    use index::bitmap::{Bitmap, BitmapType};
190    use index::inverted_index::format::reader::{InvertedIndexBlobReader, InvertedIndexReader};
191    use index::inverted_index::format::writer::{InvertedIndexBlobWriter, InvertedIndexWriter};
192    use prometheus::register_int_counter_vec;
193    use rand::{Rng, RngCore};
194
195    use super::*;
196    use crate::sst::index::store::InstrumentedStore;
197    use crate::test_util::TestEnv;
198
199    // Repeat times for following little fuzz tests.
200    const FUZZ_REPEAT_TIMES: usize = 100;
201
202    // Fuzz test for index data page key
203    #[test]
204    fn fuzz_index_calculation() {
205        // randomly generate a large u8 array
206        let mut rng = rand::rng();
207        let mut data = vec![0u8; 1024 * 1024];
208        rng.fill_bytes(&mut data);
209
210        for _ in 0..FUZZ_REPEAT_TIMES {
211            let offset = rng.random_range(0..data.len() as u64);
212            let size = rng.random_range(0..data.len() as u32 - offset as u32);
213            let page_size: usize = rng.random_range(1..1024);
214
215            let indexes =
216                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
217            let page_num = indexes.len();
218            let mut read = Vec::with_capacity(size as usize);
219            for key in indexes.into_iter() {
220                let start = key.page_id as usize * page_size;
221                let page = if start + page_size < data.len() {
222                    &data[start..start + page_size]
223                } else {
224                    &data[start..]
225                };
226                read.extend_from_slice(page);
227            }
228            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
229            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
230            if read != data.get(expected_range).unwrap() {
231                panic!(
232                    "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
233                    offset,
234                    size,
235                    page_size,
236                    read.len(),
237                    size as usize,
238                    PageKey::calculate_range(offset, size, page_size as u64),
239                    page_num
240                );
241            }
242        }
243    }
244
245    fn unpack(fst_value: u64) -> [u32; 2] {
246        bytemuck::cast::<u64, [u32; 2]>(fst_value)
247    }
248
249    async fn create_inverted_index_blob() -> Vec<u8> {
250        let mut blob = Vec::new();
251        let mut writer = InvertedIndexBlobWriter::new(&mut blob);
252        writer
253            .add_index(
254                "tag0".to_string(),
255                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
256                Box::new(stream::iter(vec![
257                    Ok((
258                        Bytes::from("a"),
259                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
260                    )),
261                    Ok((
262                        Bytes::from("b"),
263                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
264                    )),
265                    Ok((
266                        Bytes::from("c"),
267                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
268                    )),
269                ])),
270                index::bitmap::BitmapType::Roaring,
271            )
272            .await
273            .unwrap();
274        writer
275            .add_index(
276                "tag1".to_string(),
277                Bitmap::from_lsb0_bytes(&[0b0000_0001, 0b0000_0000], BitmapType::Roaring),
278                Box::new(stream::iter(vec![
279                    Ok((
280                        Bytes::from("x"),
281                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
282                    )),
283                    Ok((
284                        Bytes::from("y"),
285                        Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring),
286                    )),
287                    Ok((
288                        Bytes::from("z"),
289                        Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring),
290                    )),
291                ])),
292                index::bitmap::BitmapType::Roaring,
293            )
294            .await
295            .unwrap();
296        writer
297            .finish(8, NonZeroUsize::new(1).unwrap())
298            .await
299            .unwrap();
300
301        blob
302    }
303
304    #[tokio::test]
305    async fn test_inverted_index_cache() {
306        let blob = create_inverted_index_blob().await;
307
308        // Init a test range reader in local fs.
309        let mut env = TestEnv::new().await;
310        let file_size = blob.len() as u64;
311        let index_version = 0;
312        let store = env.init_object_store_manager();
313        let temp_path = "data";
314        store.write(temp_path, blob).await.unwrap();
315        let store = InstrumentedStore::new(store);
316        let metric =
317            register_int_counter_vec!("test_bytes", "a counter for test", &["test"]).unwrap();
318        let counter = metric.with_label_values(&["test"]);
319        let range_reader = store
320            .range_reader("data", &counter, &counter)
321            .await
322            .unwrap();
323
324        let reader = InvertedIndexBlobReader::new(range_reader);
325        let cached_reader = CachedInvertedIndexBlobReader::new(
326            FileId::random(),
327            index_version,
328            file_size,
329            reader,
330            Arc::new(InvertedIndexCache::new(8192, 8192, 50)),
331        );
332        let metadata = cached_reader.metadata(None).await.unwrap();
333        assert_eq!(metadata.total_row_count, 8);
334        assert_eq!(metadata.segment_row_count, 1);
335        assert_eq!(metadata.metas.len(), 2);
336        // tag0
337        let tag0 = metadata.metas.get("tag0").unwrap();
338        let stats0 = tag0.stats.as_ref().unwrap();
339        assert_eq!(stats0.distinct_count, 3);
340        assert_eq!(stats0.null_count, 1);
341        assert_eq!(stats0.min_value, Bytes::from("a"));
342        assert_eq!(stats0.max_value, Bytes::from("c"));
343        let fst0 = cached_reader
344            .fst(
345                tag0.base_offset + tag0.relative_fst_offset as u64,
346                tag0.fst_size,
347                None,
348            )
349            .await
350            .unwrap();
351        assert_eq!(fst0.len(), 3);
352        let [offset, size] = unpack(fst0.get(b"a").unwrap());
353        let bitmap = cached_reader
354            .bitmap(
355                tag0.base_offset + offset as u64,
356                size,
357                BitmapType::Roaring,
358                None,
359            )
360            .await
361            .unwrap();
362        assert_eq!(
363            bitmap,
364            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
365        );
366        let [offset, size] = unpack(fst0.get(b"b").unwrap());
367        let bitmap = cached_reader
368            .bitmap(
369                tag0.base_offset + offset as u64,
370                size,
371                BitmapType::Roaring,
372                None,
373            )
374            .await
375            .unwrap();
376        assert_eq!(
377            bitmap,
378            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
379        );
380        let [offset, size] = unpack(fst0.get(b"c").unwrap());
381        let bitmap = cached_reader
382            .bitmap(
383                tag0.base_offset + offset as u64,
384                size,
385                BitmapType::Roaring,
386                None,
387            )
388            .await
389            .unwrap();
390        assert_eq!(
391            bitmap,
392            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
393        );
394
395        // tag1
396        let tag1 = metadata.metas.get("tag1").unwrap();
397        let stats1 = tag1.stats.as_ref().unwrap();
398        assert_eq!(stats1.distinct_count, 3);
399        assert_eq!(stats1.null_count, 1);
400        assert_eq!(stats1.min_value, Bytes::from("x"));
401        assert_eq!(stats1.max_value, Bytes::from("z"));
402        let fst1 = cached_reader
403            .fst(
404                tag1.base_offset + tag1.relative_fst_offset as u64,
405                tag1.fst_size,
406                None,
407            )
408            .await
409            .unwrap();
410        assert_eq!(fst1.len(), 3);
411        let [offset, size] = unpack(fst1.get(b"x").unwrap());
412        let bitmap = cached_reader
413            .bitmap(
414                tag1.base_offset + offset as u64,
415                size,
416                BitmapType::Roaring,
417                None,
418            )
419            .await
420            .unwrap();
421        assert_eq!(
422            bitmap,
423            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
424        );
425        let [offset, size] = unpack(fst1.get(b"y").unwrap());
426        let bitmap = cached_reader
427            .bitmap(
428                tag1.base_offset + offset as u64,
429                size,
430                BitmapType::Roaring,
431                None,
432            )
433            .await
434            .unwrap();
435        assert_eq!(
436            bitmap,
437            Bitmap::from_lsb0_bytes(&[0b0010_0000], BitmapType::Roaring)
438        );
439        let [offset, size] = unpack(fst1.get(b"z").unwrap());
440        let bitmap = cached_reader
441            .bitmap(
442                tag1.base_offset + offset as u64,
443                size,
444                BitmapType::Roaring,
445                None,
446            )
447            .await
448            .unwrap();
449        assert_eq!(
450            bitmap,
451            Bitmap::from_lsb0_bytes(&[0b0000_0001], BitmapType::Roaring)
452        );
453
454        // fuzz test
455        let mut rng = rand::rng();
456        for _ in 0..FUZZ_REPEAT_TIMES {
457            let offset = rng.random_range(0..file_size);
458            let size = rng.random_range(0..file_size as u32 - offset as u32);
459            let expected = cached_reader.range_read(offset, size, None).await.unwrap();
460            let inner = &cached_reader.inner;
461            let (read, _cache_metrics) = cached_reader
462                .cache
463                .get_or_load(
464                    (cached_reader.file_id, cached_reader.index_version),
465                    file_size,
466                    offset,
467                    size,
468                    |ranges| async move { inner.read_vec(&ranges, None).await },
469                )
470                .await
471                .unwrap();
472            assert_eq!(read, expected);
473        }
474    }
475}