mito2/cache/index/
bloom_filter_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::{ColumnId, FileId};
24
25use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27
28const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
29
30/// Tag for bloom filter index cache.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
32pub enum Tag {
33    Skipping,
34    Fulltext,
35}
36
37/// Cache for bloom filter index.
38pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
39pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
40
41impl BloomFilterIndexCache {
42    /// Creates a new bloom filter index cache.
43    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
44        Self::new_with_weighter(
45            index_metadata_cap,
46            index_content_cap,
47            page_size,
48            INDEX_TYPE_BLOOM_FILTER_INDEX,
49            bloom_filter_index_metadata_weight,
50            bloom_filter_index_content_weight,
51        )
52    }
53}
54
55/// Calculates weight for bloom filter index metadata.
56fn bloom_filter_index_metadata_weight(
57    k: &(FileId, ColumnId, Tag),
58    _: &Arc<BloomFilterMeta>,
59) -> u32 {
60    (k.0.as_bytes().len()
61        + std::mem::size_of::<ColumnId>()
62        + std::mem::size_of::<BloomFilterMeta>()) as u32
63}
64
65/// Calculates weight for bloom filter index content.
66fn bloom_filter_index_content_weight(
67    (k, _): &((FileId, ColumnId, Tag), PageKey),
68    v: &Bytes,
69) -> u32 {
70    (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
71}
72
73/// Bloom filter index blob reader with cache.
74pub struct CachedBloomFilterIndexBlobReader<R> {
75    file_id: FileId,
76    column_id: ColumnId,
77    tag: Tag,
78    blob_size: u64,
79    inner: R,
80    cache: BloomFilterIndexCacheRef,
81}
82
83impl<R> CachedBloomFilterIndexBlobReader<R> {
84    /// Creates a new bloom filter index blob reader with cache.
85    pub fn new(
86        file_id: FileId,
87        column_id: ColumnId,
88        tag: Tag,
89        blob_size: u64,
90        inner: R,
91        cache: BloomFilterIndexCacheRef,
92    ) -> Self {
93        Self {
94            file_id,
95            column_id,
96            tag,
97            blob_size,
98            inner,
99            cache,
100        }
101    }
102}
103
104#[async_trait]
105impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
106    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
107        let inner = &self.inner;
108        self.cache
109            .get_or_load(
110                (self.file_id, self.column_id, self.tag),
111                self.blob_size,
112                offset,
113                size,
114                move |ranges| async move { inner.read_vec(&ranges).await },
115            )
116            .await
117            .map(|b| b.into())
118    }
119
120    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
121        let mut pages = Vec::with_capacity(ranges.len());
122        for range in ranges {
123            let inner = &self.inner;
124            let page = self
125                .cache
126                .get_or_load(
127                    (self.file_id, self.column_id, self.tag),
128                    self.blob_size,
129                    range.start,
130                    (range.end - range.start) as u32,
131                    move |ranges| async move { inner.read_vec(&ranges).await },
132                )
133                .await?;
134
135            pages.push(Bytes::from(page));
136        }
137
138        Ok(pages)
139    }
140
141    /// Reads the meta information of the bloom filter.
142    async fn metadata(&self) -> Result<BloomFilterMeta> {
143        if let Some(cached) = self
144            .cache
145            .get_metadata((self.file_id, self.column_id, self.tag))
146        {
147            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
148            Ok((*cached).clone())
149        } else {
150            let meta = self.inner.metadata().await?;
151            self.cache.put_metadata(
152                (self.file_id, self.column_id, self.tag),
153                Arc::new(meta.clone()),
154            );
155            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
156            Ok(meta)
157        }
158    }
159}
160
161#[cfg(test)]
162mod test {
163    use rand::{Rng, RngCore};
164
165    use super::*;
166
167    const FUZZ_REPEAT_TIMES: usize = 100;
168
169    #[test]
170    fn fuzz_index_calculation() {
171        let mut rng = rand::rng();
172        let mut data = vec![0u8; 1024 * 1024];
173        rng.fill_bytes(&mut data);
174
175        for _ in 0..FUZZ_REPEAT_TIMES {
176            let offset = rng.random_range(0..data.len() as u64);
177            let size = rng.random_range(0..data.len() as u32 - offset as u32);
178            let page_size: usize = rng.random_range(1..1024);
179
180            let indexes =
181                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
182            let page_num = indexes.len();
183            let mut read = Vec::with_capacity(size as usize);
184            for key in indexes.into_iter() {
185                let start = key.page_id as usize * page_size;
186                let page = if start + page_size < data.len() {
187                    &data[start..start + page_size]
188                } else {
189                    &data[start..]
190                };
191                read.extend_from_slice(page);
192            }
193            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
194            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
195            assert_eq!(
196                read,
197                data.get(expected_range).unwrap(),
198                "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
199                offset,
200                size,
201                page_size,
202                read.len(),
203                size as usize,
204                PageKey::calculate_range(offset, size, page_size as u64),
205                page_num
206            );
207        }
208    }
209}