mito2/cache/index/
bloom_filter_index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
20use async_trait::async_trait;
21use bytes::Bytes;
22use index::bloom_filter::error::Result;
23use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
24use store_api::storage::{ColumnId, FileId, IndexVersion};
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
30
31/// Tag for bloom filter index cache.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Tag {
34    Skipping,
35    Fulltext,
36}
37
38pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
39
40/// Cache for bloom filter index.
41pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
42pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
43
44impl BloomFilterIndexCache {
45    /// Creates a new bloom filter index cache.
46    pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
47        Self::new_with_weighter(
48            index_metadata_cap,
49            index_content_cap,
50            page_size,
51            INDEX_TYPE_BLOOM_FILTER_INDEX,
52            bloom_filter_index_metadata_weight,
53            bloom_filter_index_content_weight,
54        )
55    }
56
57    /// Removes all cached entries for the given `file_id`.
58    pub fn invalidate_file(&self, file_id: FileId) {
59        self.invalidate_if(move |key| key.0 == file_id);
60    }
61}
62
63/// Calculates weight for bloom filter index metadata.
64fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
65    let base = k.0.as_bytes().len()
66        + std::mem::size_of::<IndexVersion>()
67        + std::mem::size_of::<ColumnId>()
68        + std::mem::size_of::<Tag>()
69        + std::mem::size_of::<BloomFilterMeta>();
70
71    let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
72        + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
73
74    (base + vec_estimated) as u32
75}
76
77/// Calculates weight for bloom filter index content.
78fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
79    (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
80}
81
82/// Bloom filter index blob reader with cache.
83pub struct CachedBloomFilterIndexBlobReader<R> {
84    file_id: FileId,
85    index_version: IndexVersion,
86    column_id: ColumnId,
87    tag: Tag,
88    blob_size: u64,
89    inner: R,
90    cache: BloomFilterIndexCacheRef,
91}
92
93impl<R> CachedBloomFilterIndexBlobReader<R> {
94    /// Creates a new bloom filter index blob reader with cache.
95    pub fn new(
96        file_id: FileId,
97        index_version: IndexVersion,
98        column_id: ColumnId,
99        tag: Tag,
100        blob_size: u64,
101        inner: R,
102        cache: BloomFilterIndexCacheRef,
103    ) -> Self {
104        Self {
105            file_id,
106            index_version,
107            column_id,
108            tag,
109            blob_size,
110            inner,
111            cache,
112        }
113    }
114}
115
116#[async_trait]
117impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
118    async fn range_read(
119        &self,
120        offset: u64,
121        size: u32,
122        metrics: Option<&mut BloomFilterReadMetrics>,
123    ) -> Result<Bytes> {
124        let start = metrics.as_ref().map(|_| Instant::now());
125        let inner = &self.inner;
126        let (result, cache_metrics) = self
127            .cache
128            .get_or_load(
129                (self.file_id, self.index_version, self.column_id, self.tag),
130                self.blob_size,
131                offset,
132                size,
133                move |ranges| async move { inner.read_vec(&ranges, None).await },
134            )
135            .await?;
136
137        if let Some(m) = metrics {
138            m.total_ranges += cache_metrics.num_pages;
139            m.total_bytes += cache_metrics.page_bytes;
140            m.cache_hit += cache_metrics.cache_hit;
141            m.cache_miss += cache_metrics.cache_miss;
142            if let Some(start) = start {
143                m.fetch_elapsed += start.elapsed();
144            }
145        }
146
147        Ok(result.into())
148    }
149
150    async fn read_vec(
151        &self,
152        ranges: &[Range<u64>],
153        metrics: Option<&mut BloomFilterReadMetrics>,
154    ) -> Result<Vec<Bytes>> {
155        let start = metrics.as_ref().map(|_| Instant::now());
156
157        let mut pages = Vec::with_capacity(ranges.len());
158        let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
159        for range in ranges {
160            let inner = &self.inner;
161            let (page, cache_metrics) = self
162                .cache
163                .get_or_load(
164                    (self.file_id, self.index_version, self.column_id, self.tag),
165                    self.blob_size,
166                    range.start,
167                    (range.end - range.start) as u32,
168                    move |ranges| async move { inner.read_vec(&ranges, None).await },
169                )
170                .await?;
171
172            total_cache_metrics.merge(&cache_metrics);
173            pages.push(Bytes::from(page));
174        }
175
176        if let Some(m) = metrics {
177            m.total_ranges += total_cache_metrics.num_pages;
178            m.total_bytes += total_cache_metrics.page_bytes;
179            m.cache_hit += total_cache_metrics.cache_hit;
180            m.cache_miss += total_cache_metrics.cache_miss;
181            if let Some(start) = start {
182                m.fetch_elapsed += start.elapsed();
183            }
184        }
185
186        Ok(pages)
187    }
188
189    /// Reads the meta information of the bloom filter.
190    async fn metadata(
191        &self,
192        metrics: Option<&mut BloomFilterReadMetrics>,
193    ) -> Result<BloomFilterMeta> {
194        if let Some(cached) =
195            self.cache
196                .get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
197        {
198            CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
199            if let Some(m) = metrics {
200                m.cache_hit += 1;
201            }
202            Ok((*cached).clone())
203        } else {
204            let meta = self.inner.metadata(metrics).await?;
205            self.cache.put_metadata(
206                (self.file_id, self.index_version, self.column_id, self.tag),
207                Arc::new(meta.clone()),
208            );
209            CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
210            Ok(meta)
211        }
212    }
213}
214
215#[cfg(test)]
216mod test {
217    use rand::{Rng, RngCore};
218
219    use super::*;
220
221    const FUZZ_REPEAT_TIMES: usize = 100;
222
223    #[test]
224    fn bloom_filter_metadata_weight_counts_vec_contents() {
225        let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
226        let version = 0;
227        let column_id: ColumnId = 42;
228        let tag = Tag::Skipping;
229
230        let meta = BloomFilterMeta {
231            rows_per_segment: 128,
232            segment_count: 2,
233            row_count: 256,
234            bloom_filter_size: 1024,
235            segment_loc_indices: vec![0, 64, 128, 192],
236            bloom_filter_locs: vec![
237                BloomFilterLoc {
238                    offset: 0,
239                    size: 512,
240                    element_count: 1000,
241                },
242                BloomFilterLoc {
243                    offset: 512,
244                    size: 512,
245                    element_count: 1000,
246                },
247            ],
248        };
249
250        let weight = bloom_filter_index_metadata_weight(
251            &(file_id, version, column_id, tag),
252            &Arc::new(meta.clone()),
253        );
254
255        let base = file_id.as_bytes().len()
256            + std::mem::size_of::<IndexVersion>()
257            + std::mem::size_of::<ColumnId>()
258            + std::mem::size_of::<Tag>()
259            + std::mem::size_of::<BloomFilterMeta>();
260        let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
261            + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
262
263        assert_eq!(weight as usize, base + expected_dynamic);
264    }
265
266    #[test]
267    fn fuzz_index_calculation() {
268        let mut rng = rand::rng();
269        let mut data = vec![0u8; 1024 * 1024];
270        rng.fill_bytes(&mut data);
271
272        for _ in 0..FUZZ_REPEAT_TIMES {
273            let offset = rng.random_range(0..data.len() as u64);
274            let size = rng.random_range(0..data.len() as u32 - offset as u32);
275            let page_size: usize = rng.random_range(1..1024);
276
277            let indexes =
278                PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
279            let page_num = indexes.len();
280            let mut read = Vec::with_capacity(size as usize);
281            for key in indexes.into_iter() {
282                let start = key.page_id as usize * page_size;
283                let page = if start + page_size < data.len() {
284                    &data[start..start + page_size]
285                } else {
286                    &data[start..]
287                };
288                read.extend_from_slice(page);
289            }
290            let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
291            let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
292            assert_eq!(
293                read,
294                data.get(expected_range).unwrap(),
295                "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
296                offset,
297                size,
298                page_size,
299                read.len(),
300                size as usize,
301                PageKey::calculate_range(offset, size, page_size as u64),
302                page_num
303            );
304        }
305    }
306}