1use std::ops::Range;
16use std::sync::Arc;
17use std::time::Instant;
18
19use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
20use async_trait::async_trait;
21use bytes::Bytes;
22use index::bloom_filter::error::Result;
23use index::bloom_filter::reader::{BloomFilterReadMetrics, BloomFilterReader};
24use store_api::storage::{ColumnId, FileId, IndexVersion};
25
26use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28
29const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Tag {
34 Skipping,
35 Fulltext,
36}
37
38pub type BloomFilterIndexKey = (FileId, IndexVersion, ColumnId, Tag);
39
40pub type BloomFilterIndexCache = IndexCache<BloomFilterIndexKey, BloomFilterMeta>;
42pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
43
44impl BloomFilterIndexCache {
45 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
47 Self::new_with_weighter(
48 index_metadata_cap,
49 index_content_cap,
50 page_size,
51 INDEX_TYPE_BLOOM_FILTER_INDEX,
52 bloom_filter_index_metadata_weight,
53 bloom_filter_index_content_weight,
54 )
55 }
56
57 pub fn invalidate_file(&self, file_id: FileId) {
59 self.invalidate_if(move |key| key.0 == file_id);
60 }
61}
62
63fn bloom_filter_index_metadata_weight(k: &BloomFilterIndexKey, meta: &Arc<BloomFilterMeta>) -> u32 {
65 let base = k.0.as_bytes().len()
66 + std::mem::size_of::<IndexVersion>()
67 + std::mem::size_of::<ColumnId>()
68 + std::mem::size_of::<Tag>()
69 + std::mem::size_of::<BloomFilterMeta>();
70
71 let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
72 + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
73
74 (base + vec_estimated) as u32
75}
76
77fn bloom_filter_index_content_weight((k, _): &(BloomFilterIndexKey, PageKey), v: &Bytes) -> u32 {
79 (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
80}
81
82pub struct CachedBloomFilterIndexBlobReader<R> {
84 file_id: FileId,
85 index_version: IndexVersion,
86 column_id: ColumnId,
87 tag: Tag,
88 blob_size: u64,
89 inner: R,
90 cache: BloomFilterIndexCacheRef,
91}
92
93impl<R> CachedBloomFilterIndexBlobReader<R> {
94 pub fn new(
96 file_id: FileId,
97 index_version: IndexVersion,
98 column_id: ColumnId,
99 tag: Tag,
100 blob_size: u64,
101 inner: R,
102 cache: BloomFilterIndexCacheRef,
103 ) -> Self {
104 Self {
105 file_id,
106 index_version,
107 column_id,
108 tag,
109 blob_size,
110 inner,
111 cache,
112 }
113 }
114}
115
116#[async_trait]
117impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
118 async fn range_read(
119 &self,
120 offset: u64,
121 size: u32,
122 metrics: Option<&mut BloomFilterReadMetrics>,
123 ) -> Result<Bytes> {
124 let start = metrics.as_ref().map(|_| Instant::now());
125 let inner = &self.inner;
126 let (result, cache_metrics) = self
127 .cache
128 .get_or_load(
129 (self.file_id, self.index_version, self.column_id, self.tag),
130 self.blob_size,
131 offset,
132 size,
133 move |ranges| async move { inner.read_vec(&ranges, None).await },
134 )
135 .await?;
136
137 if let Some(m) = metrics {
138 m.total_ranges += cache_metrics.num_pages;
139 m.total_bytes += cache_metrics.page_bytes;
140 m.cache_hit += cache_metrics.cache_hit;
141 m.cache_miss += cache_metrics.cache_miss;
142 if let Some(start) = start {
143 m.fetch_elapsed += start.elapsed();
144 }
145 }
146
147 Ok(result.into())
148 }
149
150 async fn read_vec(
151 &self,
152 ranges: &[Range<u64>],
153 metrics: Option<&mut BloomFilterReadMetrics>,
154 ) -> Result<Vec<Bytes>> {
155 let start = metrics.as_ref().map(|_| Instant::now());
156
157 let mut pages = Vec::with_capacity(ranges.len());
158 let mut total_cache_metrics = crate::cache::index::IndexCacheMetrics::default();
159 for range in ranges {
160 let inner = &self.inner;
161 let (page, cache_metrics) = self
162 .cache
163 .get_or_load(
164 (self.file_id, self.index_version, self.column_id, self.tag),
165 self.blob_size,
166 range.start,
167 (range.end - range.start) as u32,
168 move |ranges| async move { inner.read_vec(&ranges, None).await },
169 )
170 .await?;
171
172 total_cache_metrics.merge(&cache_metrics);
173 pages.push(Bytes::from(page));
174 }
175
176 if let Some(m) = metrics {
177 m.total_ranges += total_cache_metrics.num_pages;
178 m.total_bytes += total_cache_metrics.page_bytes;
179 m.cache_hit += total_cache_metrics.cache_hit;
180 m.cache_miss += total_cache_metrics.cache_miss;
181 if let Some(start) = start {
182 m.fetch_elapsed += start.elapsed();
183 }
184 }
185
186 Ok(pages)
187 }
188
189 async fn metadata(
191 &self,
192 metrics: Option<&mut BloomFilterReadMetrics>,
193 ) -> Result<BloomFilterMeta> {
194 if let Some(cached) =
195 self.cache
196 .get_metadata((self.file_id, self.index_version, self.column_id, self.tag))
197 {
198 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
199 if let Some(m) = metrics {
200 m.cache_hit += 1;
201 }
202 Ok((*cached).clone())
203 } else {
204 let meta = self.inner.metadata(metrics).await?;
205 self.cache.put_metadata(
206 (self.file_id, self.index_version, self.column_id, self.tag),
207 Arc::new(meta.clone()),
208 );
209 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
210 Ok(meta)
211 }
212 }
213}
214
215#[cfg(test)]
216mod test {
217 use rand::{Rng, RngCore};
218
219 use super::*;
220
221 const FUZZ_REPEAT_TIMES: usize = 100;
222
223 #[test]
224 fn bloom_filter_metadata_weight_counts_vec_contents() {
225 let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
226 let version = 0;
227 let column_id: ColumnId = 42;
228 let tag = Tag::Skipping;
229
230 let meta = BloomFilterMeta {
231 rows_per_segment: 128,
232 segment_count: 2,
233 row_count: 256,
234 bloom_filter_size: 1024,
235 segment_loc_indices: vec![0, 64, 128, 192],
236 bloom_filter_locs: vec![
237 BloomFilterLoc {
238 offset: 0,
239 size: 512,
240 element_count: 1000,
241 },
242 BloomFilterLoc {
243 offset: 512,
244 size: 512,
245 element_count: 1000,
246 },
247 ],
248 };
249
250 let weight = bloom_filter_index_metadata_weight(
251 &(file_id, version, column_id, tag),
252 &Arc::new(meta.clone()),
253 );
254
255 let base = file_id.as_bytes().len()
256 + std::mem::size_of::<IndexVersion>()
257 + std::mem::size_of::<ColumnId>()
258 + std::mem::size_of::<Tag>()
259 + std::mem::size_of::<BloomFilterMeta>();
260 let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
261 + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
262
263 assert_eq!(weight as usize, base + expected_dynamic);
264 }
265
266 #[test]
267 fn fuzz_index_calculation() {
268 let mut rng = rand::rng();
269 let mut data = vec![0u8; 1024 * 1024];
270 rng.fill_bytes(&mut data);
271
272 for _ in 0..FUZZ_REPEAT_TIMES {
273 let offset = rng.random_range(0..data.len() as u64);
274 let size = rng.random_range(0..data.len() as u32 - offset as u32);
275 let page_size: usize = rng.random_range(1..1024);
276
277 let indexes =
278 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
279 let page_num = indexes.len();
280 let mut read = Vec::with_capacity(size as usize);
281 for key in indexes.into_iter() {
282 let start = key.page_id as usize * page_size;
283 let page = if start + page_size < data.len() {
284 &data[start..start + page_size]
285 } else {
286 &data[start..]
287 };
288 read.extend_from_slice(page);
289 }
290 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
291 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
292 assert_eq!(
293 read,
294 data.get(expected_range).unwrap(),
295 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
296 offset,
297 size,
298 page_size,
299 read.len(),
300 size as usize,
301 PageKey::calculate_range(offset, size, page_size as u64),
302 page_num
303 );
304 }
305 }
306}