mito2/cache/index/
bloom_filter_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures::future::try_join_all;
22use index::bloom_filter::error::Result;
23use index::bloom_filter::reader::BloomFilterReader;
24use store_api::storage::ColumnId;
25
26use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28use crate::sst::file::FileId;
29
30const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
31
32/// Tag for bloom filter index cache.
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum Tag {
35    Skipping,
36    Fulltext,
37}
38
39/// Cache for bloom filter index.
40pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
41pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
42
43impl BloomFilterIndexCache {
44    /// Creates a new bloom filter index cache.
45    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
46        Self::new_with_weighter(
47            index_metadata_cap,
48            index_content_cap,
49            page_size,
50            INDEX_TYPE_BLOOM_FILTER_INDEX,
51            bloom_filter_index_metadata_weight,
52            bloom_filter_index_content_weight,
53        )
54    }
55}
56
57/// Calculates weight for bloom filter index metadata.
58fn bloom_filter_index_metadata_weight(
59    k: &(FileId, ColumnId, Tag),
60    _: &Arc<BloomFilterMeta>,
61) -> u32 {
62    (k.0.as_bytes().len()
63        + std::mem::size_of::<ColumnId>()
64        + std::mem::size_of::<BloomFilterMeta>()) as u32
65}
66
67/// Calculates weight for bloom filter index content.
68fn bloom_filter_index_content_weight(
69    (k, _): &((FileId, ColumnId, Tag), PageKey),
70    v: &Bytes,
71) -> u32 {
72    (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
73}
74
75/// Bloom filter index blob reader with cache.
76pub struct CachedBloomFilterIndexBlobReader<R> {
77    file_id: FileId,
78    column_id: ColumnId,
79    tag: Tag,
80    blob_size: u64,
81    inner: R,
82    cache: BloomFilterIndexCacheRef,
83}
84
85impl<R> CachedBloomFilterIndexBlobReader<R> {
86    /// Creates a new bloom filter index blob reader with cache.
87    pub fn new(
88        file_id: FileId,
89        column_id: ColumnId,
90        tag: Tag,
91        blob_size: u64,
92        inner: R,
93        cache: BloomFilterIndexCacheRef,
94    ) -> Self {
95        Self {
96            file_id,
97            column_id,
98            tag,
99            blob_size,
100            inner,
101            cache,
102        }
103    }
104}
105
106#[async_trait]
107impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
108    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
109        let inner = &self.inner;
110        self.cache
111            .get_or_load(
112                (self.file_id, self.column_id, self.tag),
113                self.blob_size,
114                offset,
115                size,
116                move |ranges| async move { inner.read_vec(&ranges).await },
117            )
118            .await
119            .map(|b| b.into())
120    }
121
122    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
123        let fetch = ranges.iter().map(|range| {
124            let inner = &self.inner;
125            self.cache.get_or_load(
126                (self.file_id, self.column_id, self.tag),
127                self.blob_size,
128                range.start,
129                (range.end - range.start) as u32,
130                move |ranges| async move { inner.read_vec(&ranges).await },
131            )
132        });
133        Ok(try_join_all(fetch)
134            .await?
135            .into_iter()
136            .map(Bytes::from)
137            .collect::<Vec<_>>())
138    }
139
140    /// Reads the meta information of the bloom filter.
141    async fn metadata(&self) -> Result<BloomFilterMeta> {
142        if let Some(cached) = self
143            .cache
144            .get_metadata((self.file_id, self.column_id, self.tag))
145        {
146            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
147            Ok((*cached).clone())
148        } else {
149            let meta = self.inner.metadata().await?;
150            self.cache.put_metadata(
151                (self.file_id, self.column_id, self.tag),
152                Arc::new(meta.clone()),
153            );
154            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
155            Ok(meta)
156        }
157    }
158}
159
160#[cfg(test)]
161mod test {
162    use rand::{Rng, RngCore};
163
164    use super::*;
165
166    const FUZZ_REPEAT_TIMES: usize = 100;
167
168    #[test]
169    fn fuzz_index_calculation() {
170        let mut rng = rand::rng();
171        let mut data = vec![0u8; 1024 * 1024];
172        rng.fill_bytes(&mut data);
173
174        for _ in 0..FUZZ_REPEAT_TIMES {
175            let offset = rng.random_range(0..data.len() as u64);
176            let size = rng.random_range(0..data.len() as u32 - offset as u32);
177            let page_size: usize = rng.random_range(1..1024);
178
179            let indexes =
180                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
181            let page_num = indexes.len();
182            let mut read = Vec::with_capacity(size as usize);
183            for key in indexes.into_iter() {
184                let start = key.page_id as usize * page_size;
185                let page = if start + page_size < data.len() {
186                    &data[start..start + page_size]
187                } else {
188                    &data[start..]
189                };
190                read.extend_from_slice(page);
191            }
192            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
193            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
194            assert_eq!(
195                read,
196                data.get(expected_range).unwrap(),
197                "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
198                offset,
199                size,
200                page_size,
201                read.len(),
202                size as usize,
203                PageKey::calculate_range(offset, size, page_size as u64),
204                page_num
205            );
206        }
207    }
208}