mito2/cache/index/
bloom_filter_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::{ColumnId, FileId};
24
25use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27
28const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
29
30/// Tag for bloom filter index cache.
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
32pub enum Tag {
33    Skipping,
34    Fulltext,
35}
36
37/// Cache for bloom filter index.
38pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
39pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
40
41impl BloomFilterIndexCache {
42    /// Creates a new bloom filter index cache.
43    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
44        Self::new_with_weighter(
45            index_metadata_cap,
46            index_content_cap,
47            page_size,
48            INDEX_TYPE_BLOOM_FILTER_INDEX,
49            bloom_filter_index_metadata_weight,
50            bloom_filter_index_content_weight,
51        )
52    }
53
54    /// Removes all cached entries for the given `file_id`.
55    pub fn invalidate_file(&self, file_id: FileId) {
56        self.invalidate_if(move |key| key.0 == file_id);
57    }
58}
59
60/// Calculates weight for bloom filter index metadata.
61fn bloom_filter_index_metadata_weight(
62    k: &(FileId, ColumnId, Tag),
63    meta: &Arc<BloomFilterMeta>,
64) -> u32 {
65    let base = k.0.as_bytes().len()
66        + std::mem::size_of::<ColumnId>()
67        + std::mem::size_of::<Tag>()
68        + std::mem::size_of::<BloomFilterMeta>();
69
70    let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
71        + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
72
73    (base + vec_estimated) as u32
74}
75
76/// Calculates weight for bloom filter index content.
77fn bloom_filter_index_content_weight(
78    (k, _): &((FileId, ColumnId, Tag), PageKey),
79    v: &Bytes,
80) -> u32 {
81    (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
82}
83
84/// Bloom filter index blob reader with cache.
85pub struct CachedBloomFilterIndexBlobReader<R> {
86    file_id: FileId,
87    column_id: ColumnId,
88    tag: Tag,
89    blob_size: u64,
90    inner: R,
91    cache: BloomFilterIndexCacheRef,
92}
93
94impl<R> CachedBloomFilterIndexBlobReader<R> {
95    /// Creates a new bloom filter index blob reader with cache.
96    pub fn new(
97        file_id: FileId,
98        column_id: ColumnId,
99        tag: Tag,
100        blob_size: u64,
101        inner: R,
102        cache: BloomFilterIndexCacheRef,
103    ) -> Self {
104        Self {
105            file_id,
106            column_id,
107            tag,
108            blob_size,
109            inner,
110            cache,
111        }
112    }
113}
114
115#[async_trait]
116impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
117    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
118        let inner = &self.inner;
119        self.cache
120            .get_or_load(
121                (self.file_id, self.column_id, self.tag),
122                self.blob_size,
123                offset,
124                size,
125                move |ranges| async move { inner.read_vec(&ranges).await },
126            )
127            .await
128            .map(|b| b.into())
129    }
130
131    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
132        let mut pages = Vec::with_capacity(ranges.len());
133        for range in ranges {
134            let inner = &self.inner;
135            let page = self
136                .cache
137                .get_or_load(
138                    (self.file_id, self.column_id, self.tag),
139                    self.blob_size,
140                    range.start,
141                    (range.end - range.start) as u32,
142                    move |ranges| async move { inner.read_vec(&ranges).await },
143                )
144                .await?;
145
146            pages.push(Bytes::from(page));
147        }
148
149        Ok(pages)
150    }
151
152    /// Reads the meta information of the bloom filter.
153    async fn metadata(&self) -> Result<BloomFilterMeta> {
154        if let Some(cached) = self
155            .cache
156            .get_metadata((self.file_id, self.column_id, self.tag))
157        {
158            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
159            Ok((*cached).clone())
160        } else {
161            let meta = self.inner.metadata().await?;
162            self.cache.put_metadata(
163                (self.file_id, self.column_id, self.tag),
164                Arc::new(meta.clone()),
165            );
166            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
167            Ok(meta)
168        }
169    }
170}
171
172#[cfg(test)]
173mod test {
174    use rand::{Rng, RngCore};
175
176    use super::*;
177
178    const FUZZ_REPEAT_TIMES: usize = 100;
179
180    #[test]
181    fn bloom_filter_metadata_weight_counts_vec_contents() {
182        let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
183        let column_id: ColumnId = 42;
184        let tag = Tag::Skipping;
185
186        let meta = BloomFilterMeta {
187            rows_per_segment: 128,
188            segment_count: 2,
189            row_count: 256,
190            bloom_filter_size: 1024,
191            segment_loc_indices: vec![0, 64, 128, 192],
192            bloom_filter_locs: vec![
193                BloomFilterLoc {
194                    offset: 0,
195                    size: 512,
196                    element_count: 1000,
197                },
198                BloomFilterLoc {
199                    offset: 512,
200                    size: 512,
201                    element_count: 1000,
202                },
203            ],
204        };
205
206        let weight =
207            bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
208
209        let base = file_id.as_bytes().len()
210            + std::mem::size_of::<ColumnId>()
211            + std::mem::size_of::<Tag>()
212            + std::mem::size_of::<BloomFilterMeta>();
213        let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
214            + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
215
216        assert_eq!(weight as usize, base + expected_dynamic);
217    }
218
219    #[test]
220    fn fuzz_index_calculation() {
221        let mut rng = rand::rng();
222        let mut data = vec![0u8; 1024 * 1024];
223        rng.fill_bytes(&mut data);
224
225        for _ in 0..FUZZ_REPEAT_TIMES {
226            let offset = rng.random_range(0..data.len() as u64);
227            let size = rng.random_range(0..data.len() as u32 - offset as u32);
228            let page_size: usize = rng.random_range(1..1024);
229
230            let indexes =
231                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
232            let page_num = indexes.len();
233            let mut read = Vec::with_capacity(size as usize);
234            for key in indexes.into_iter() {
235                let start = key.page_id as usize * page_size;
236                let page = if start + page_size < data.len() {
237                    &data[start..start + page_size]
238                } else {
239                    &data[start..]
240                };
241                read.extend_from_slice(page);
242            }
243            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
244            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
245            assert_eq!(
246                read,
247                data.get(expected_range).unwrap(),
248                "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
249                offset,
250                size,
251                page_size,
252                read.len(),
253                size as usize,
254                PageKey::calculate_range(offset, size, page_size as u64),
255                page_num
256            );
257        }
258    }
259}