1use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::{BloomFilterLoc, BloomFilterMeta};
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::{ColumnId, FileId};
24
25use crate::cache::index::{INDEX_METADATA_TYPE, IndexCache, PageKey};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27
28const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
32pub enum Tag {
33 Skipping,
34 Fulltext,
35}
36
37pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
39pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
40
41impl BloomFilterIndexCache {
42 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
44 Self::new_with_weighter(
45 index_metadata_cap,
46 index_content_cap,
47 page_size,
48 INDEX_TYPE_BLOOM_FILTER_INDEX,
49 bloom_filter_index_metadata_weight,
50 bloom_filter_index_content_weight,
51 )
52 }
53
54 pub fn invalidate_file(&self, file_id: FileId) {
56 self.invalidate_if(move |key| key.0 == file_id);
57 }
58}
59
60fn bloom_filter_index_metadata_weight(
62 k: &(FileId, ColumnId, Tag),
63 meta: &Arc<BloomFilterMeta>,
64) -> u32 {
65 let base = k.0.as_bytes().len()
66 + std::mem::size_of::<ColumnId>()
67 + std::mem::size_of::<Tag>()
68 + std::mem::size_of::<BloomFilterMeta>();
69
70 let vec_estimated = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
71 + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
72
73 (base + vec_estimated) as u32
74}
75
76fn bloom_filter_index_content_weight(
78 (k, _): &((FileId, ColumnId, Tag), PageKey),
79 v: &Bytes,
80) -> u32 {
81 (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
82}
83
84pub struct CachedBloomFilterIndexBlobReader<R> {
86 file_id: FileId,
87 column_id: ColumnId,
88 tag: Tag,
89 blob_size: u64,
90 inner: R,
91 cache: BloomFilterIndexCacheRef,
92}
93
94impl<R> CachedBloomFilterIndexBlobReader<R> {
95 pub fn new(
97 file_id: FileId,
98 column_id: ColumnId,
99 tag: Tag,
100 blob_size: u64,
101 inner: R,
102 cache: BloomFilterIndexCacheRef,
103 ) -> Self {
104 Self {
105 file_id,
106 column_id,
107 tag,
108 blob_size,
109 inner,
110 cache,
111 }
112 }
113}
114
115#[async_trait]
116impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
117 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
118 let inner = &self.inner;
119 self.cache
120 .get_or_load(
121 (self.file_id, self.column_id, self.tag),
122 self.blob_size,
123 offset,
124 size,
125 move |ranges| async move { inner.read_vec(&ranges).await },
126 )
127 .await
128 .map(|b| b.into())
129 }
130
131 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
132 let mut pages = Vec::with_capacity(ranges.len());
133 for range in ranges {
134 let inner = &self.inner;
135 let page = self
136 .cache
137 .get_or_load(
138 (self.file_id, self.column_id, self.tag),
139 self.blob_size,
140 range.start,
141 (range.end - range.start) as u32,
142 move |ranges| async move { inner.read_vec(&ranges).await },
143 )
144 .await?;
145
146 pages.push(Bytes::from(page));
147 }
148
149 Ok(pages)
150 }
151
152 async fn metadata(&self) -> Result<BloomFilterMeta> {
154 if let Some(cached) = self
155 .cache
156 .get_metadata((self.file_id, self.column_id, self.tag))
157 {
158 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
159 Ok((*cached).clone())
160 } else {
161 let meta = self.inner.metadata().await?;
162 self.cache.put_metadata(
163 (self.file_id, self.column_id, self.tag),
164 Arc::new(meta.clone()),
165 );
166 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
167 Ok(meta)
168 }
169 }
170}
171
172#[cfg(test)]
173mod test {
174 use rand::{Rng, RngCore};
175
176 use super::*;
177
178 const FUZZ_REPEAT_TIMES: usize = 100;
179
180 #[test]
181 fn bloom_filter_metadata_weight_counts_vec_contents() {
182 let file_id = FileId::parse_str("00000000-0000-0000-0000-000000000001").unwrap();
183 let column_id: ColumnId = 42;
184 let tag = Tag::Skipping;
185
186 let meta = BloomFilterMeta {
187 rows_per_segment: 128,
188 segment_count: 2,
189 row_count: 256,
190 bloom_filter_size: 1024,
191 segment_loc_indices: vec![0, 64, 128, 192],
192 bloom_filter_locs: vec![
193 BloomFilterLoc {
194 offset: 0,
195 size: 512,
196 element_count: 1000,
197 },
198 BloomFilterLoc {
199 offset: 512,
200 size: 512,
201 element_count: 1000,
202 },
203 ],
204 };
205
206 let weight =
207 bloom_filter_index_metadata_weight(&(file_id, column_id, tag), &Arc::new(meta.clone()));
208
209 let base = file_id.as_bytes().len()
210 + std::mem::size_of::<ColumnId>()
211 + std::mem::size_of::<Tag>()
212 + std::mem::size_of::<BloomFilterMeta>();
213 let expected_dynamic = meta.segment_loc_indices.len() * std::mem::size_of::<u64>()
214 + meta.bloom_filter_locs.len() * std::mem::size_of::<BloomFilterLoc>();
215
216 assert_eq!(weight as usize, base + expected_dynamic);
217 }
218
219 #[test]
220 fn fuzz_index_calculation() {
221 let mut rng = rand::rng();
222 let mut data = vec![0u8; 1024 * 1024];
223 rng.fill_bytes(&mut data);
224
225 for _ in 0..FUZZ_REPEAT_TIMES {
226 let offset = rng.random_range(0..data.len() as u64);
227 let size = rng.random_range(0..data.len() as u32 - offset as u32);
228 let page_size: usize = rng.random_range(1..1024);
229
230 let indexes =
231 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
232 let page_num = indexes.len();
233 let mut read = Vec::with_capacity(size as usize);
234 for key in indexes.into_iter() {
235 let start = key.page_id as usize * page_size;
236 let page = if start + page_size < data.len() {
237 &data[start..start + page_size]
238 } else {
239 &data[start..]
240 };
241 read.extend_from_slice(page);
242 }
243 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
244 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
245 assert_eq!(
246 read,
247 data.get(expected_range).unwrap(),
248 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
249 offset,
250 size,
251 page_size,
252 read.len(),
253 size as usize,
254 PageKey::calculate_range(offset, size, page_size as u64),
255 page_num
256 );
257 }
258 }
259}