mito2/cache/index/
bloom_filter_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::ColumnId;
24
25use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27use crate::sst::file::FileId;
28
29const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
30
31/// Tag for bloom filter index cache.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Tag {
34    Skipping,
35    Fulltext,
36}
37
38/// Cache for bloom filter index.
39pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
40pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
41
42impl BloomFilterIndexCache {
43    /// Creates a new bloom filter index cache.
44    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
45        Self::new_with_weighter(
46            index_metadata_cap,
47            index_content_cap,
48            page_size,
49            INDEX_TYPE_BLOOM_FILTER_INDEX,
50            bloom_filter_index_metadata_weight,
51            bloom_filter_index_content_weight,
52        )
53    }
54}
55
56/// Calculates weight for bloom filter index metadata.
57fn bloom_filter_index_metadata_weight(
58    k: &(FileId, ColumnId, Tag),
59    _: &Arc<BloomFilterMeta>,
60) -> u32 {
61    (k.0.as_bytes().len()
62        + std::mem::size_of::<ColumnId>()
63        + std::mem::size_of::<BloomFilterMeta>()) as u32
64}
65
66/// Calculates weight for bloom filter index content.
67fn bloom_filter_index_content_weight(
68    (k, _): &((FileId, ColumnId, Tag), PageKey),
69    v: &Bytes,
70) -> u32 {
71    (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
72}
73
74/// Bloom filter index blob reader with cache.
75pub struct CachedBloomFilterIndexBlobReader<R> {
76    file_id: FileId,
77    column_id: ColumnId,
78    tag: Tag,
79    blob_size: u64,
80    inner: R,
81    cache: BloomFilterIndexCacheRef,
82}
83
84impl<R> CachedBloomFilterIndexBlobReader<R> {
85    /// Creates a new bloom filter index blob reader with cache.
86    pub fn new(
87        file_id: FileId,
88        column_id: ColumnId,
89        tag: Tag,
90        blob_size: u64,
91        inner: R,
92        cache: BloomFilterIndexCacheRef,
93    ) -> Self {
94        Self {
95            file_id,
96            column_id,
97            tag,
98            blob_size,
99            inner,
100            cache,
101        }
102    }
103}
104
105#[async_trait]
106impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
107    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
108        let inner = &self.inner;
109        self.cache
110            .get_or_load(
111                (self.file_id, self.column_id, self.tag),
112                self.blob_size,
113                offset,
114                size,
115                move |ranges| async move { inner.read_vec(&ranges).await },
116            )
117            .await
118            .map(|b| b.into())
119    }
120
121    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
122        let mut pages = Vec::with_capacity(ranges.len());
123        for range in ranges {
124            let inner = &self.inner;
125            let page = self
126                .cache
127                .get_or_load(
128                    (self.file_id, self.column_id, self.tag),
129                    self.blob_size,
130                    range.start,
131                    (range.end - range.start) as u32,
132                    move |ranges| async move { inner.read_vec(&ranges).await },
133                )
134                .await?;
135
136            pages.push(Bytes::from(page));
137        }
138
139        Ok(pages)
140    }
141
142    /// Reads the meta information of the bloom filter.
143    async fn metadata(&self) -> Result<BloomFilterMeta> {
144        if let Some(cached) = self
145            .cache
146            .get_metadata((self.file_id, self.column_id, self.tag))
147        {
148            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
149            Ok((*cached).clone())
150        } else {
151            let meta = self.inner.metadata().await?;
152            self.cache.put_metadata(
153                (self.file_id, self.column_id, self.tag),
154                Arc::new(meta.clone()),
155            );
156            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
157            Ok(meta)
158        }
159    }
160}
161
162#[cfg(test)]
163mod test {
164    use rand::{Rng, RngCore};
165
166    use super::*;
167
168    const FUZZ_REPEAT_TIMES: usize = 100;
169
170    #[test]
171    fn fuzz_index_calculation() {
172        let mut rng = rand::rng();
173        let mut data = vec![0u8; 1024 * 1024];
174        rng.fill_bytes(&mut data);
175
176        for _ in 0..FUZZ_REPEAT_TIMES {
177            let offset = rng.random_range(0..data.len() as u64);
178            let size = rng.random_range(0..data.len() as u32 - offset as u32);
179            let page_size: usize = rng.random_range(1..1024);
180
181            let indexes =
182                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
183            let page_num = indexes.len();
184            let mut read = Vec::with_capacity(size as usize);
185            for key in indexes.into_iter() {
186                let start = key.page_id as usize * page_size;
187                let page = if start + page_size < data.len() {
188                    &data[start..start + page_size]
189                } else {
190                    &data[start..]
191                };
192                read.extend_from_slice(page);
193            }
194            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
195            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
196            assert_eq!(
197                read,
198                data.get(expected_range).unwrap(),
199                "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
200                offset,
201                size,
202                page_size,
203                read.len(),
204                size as usize,
205                PageKey::calculate_range(offset, size, page_size as u64),
206                page_num
207            );
208        }
209    }
210}