1use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::{ColumnId, FileId};
24
25use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27
28const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
32pub enum Tag {
33 Skipping,
34 Fulltext,
35}
36
37pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
39pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
40
41impl BloomFilterIndexCache {
42 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
44 Self::new_with_weighter(
45 index_metadata_cap,
46 index_content_cap,
47 page_size,
48 INDEX_TYPE_BLOOM_FILTER_INDEX,
49 bloom_filter_index_metadata_weight,
50 bloom_filter_index_content_weight,
51 )
52 }
53}
54
55fn bloom_filter_index_metadata_weight(
57 k: &(FileId, ColumnId, Tag),
58 _: &Arc<BloomFilterMeta>,
59) -> u32 {
60 (k.0.as_bytes().len()
61 + std::mem::size_of::<ColumnId>()
62 + std::mem::size_of::<BloomFilterMeta>()) as u32
63}
64
65fn bloom_filter_index_content_weight(
67 (k, _): &((FileId, ColumnId, Tag), PageKey),
68 v: &Bytes,
69) -> u32 {
70 (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
71}
72
73pub struct CachedBloomFilterIndexBlobReader<R> {
75 file_id: FileId,
76 column_id: ColumnId,
77 tag: Tag,
78 blob_size: u64,
79 inner: R,
80 cache: BloomFilterIndexCacheRef,
81}
82
83impl<R> CachedBloomFilterIndexBlobReader<R> {
84 pub fn new(
86 file_id: FileId,
87 column_id: ColumnId,
88 tag: Tag,
89 blob_size: u64,
90 inner: R,
91 cache: BloomFilterIndexCacheRef,
92 ) -> Self {
93 Self {
94 file_id,
95 column_id,
96 tag,
97 blob_size,
98 inner,
99 cache,
100 }
101 }
102}
103
104#[async_trait]
105impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
106 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
107 let inner = &self.inner;
108 self.cache
109 .get_or_load(
110 (self.file_id, self.column_id, self.tag),
111 self.blob_size,
112 offset,
113 size,
114 move |ranges| async move { inner.read_vec(&ranges).await },
115 )
116 .await
117 .map(|b| b.into())
118 }
119
120 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
121 let mut pages = Vec::with_capacity(ranges.len());
122 for range in ranges {
123 let inner = &self.inner;
124 let page = self
125 .cache
126 .get_or_load(
127 (self.file_id, self.column_id, self.tag),
128 self.blob_size,
129 range.start,
130 (range.end - range.start) as u32,
131 move |ranges| async move { inner.read_vec(&ranges).await },
132 )
133 .await?;
134
135 pages.push(Bytes::from(page));
136 }
137
138 Ok(pages)
139 }
140
141 async fn metadata(&self) -> Result<BloomFilterMeta> {
143 if let Some(cached) = self
144 .cache
145 .get_metadata((self.file_id, self.column_id, self.tag))
146 {
147 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
148 Ok((*cached).clone())
149 } else {
150 let meta = self.inner.metadata().await?;
151 self.cache.put_metadata(
152 (self.file_id, self.column_id, self.tag),
153 Arc::new(meta.clone()),
154 );
155 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
156 Ok(meta)
157 }
158 }
159}
160
161#[cfg(test)]
162mod test {
163 use rand::{Rng, RngCore};
164
165 use super::*;
166
167 const FUZZ_REPEAT_TIMES: usize = 100;
168
169 #[test]
170 fn fuzz_index_calculation() {
171 let mut rng = rand::rng();
172 let mut data = vec![0u8; 1024 * 1024];
173 rng.fill_bytes(&mut data);
174
175 for _ in 0..FUZZ_REPEAT_TIMES {
176 let offset = rng.random_range(0..data.len() as u64);
177 let size = rng.random_range(0..data.len() as u32 - offset as u32);
178 let page_size: usize = rng.random_range(1..1024);
179
180 let indexes =
181 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
182 let page_num = indexes.len();
183 let mut read = Vec::with_capacity(size as usize);
184 for key in indexes.into_iter() {
185 let start = key.page_id as usize * page_size;
186 let page = if start + page_size < data.len() {
187 &data[start..start + page_size]
188 } else {
189 &data[start..]
190 };
191 read.extend_from_slice(page);
192 }
193 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
194 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
195 assert_eq!(
196 read,
197 data.get(expected_range).unwrap(),
198 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
199 offset,
200 size,
201 page_size,
202 read.len(),
203 size as usize,
204 PageKey::calculate_range(offset, size, page_size as u64),
205 page_num
206 );
207 }
208 }
209}