1use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use index::bloom_filter::error::Result;
22use index::bloom_filter::reader::BloomFilterReader;
23use store_api::storage::ColumnId;
24
25use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
26use crate::metrics::{CACHE_HIT, CACHE_MISS};
27use crate::sst::file::FileId;
28
29const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
33pub enum Tag {
34 Skipping,
35 Fulltext,
36}
37
38pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
40pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
41
42impl BloomFilterIndexCache {
43 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
45 Self::new_with_weighter(
46 index_metadata_cap,
47 index_content_cap,
48 page_size,
49 INDEX_TYPE_BLOOM_FILTER_INDEX,
50 bloom_filter_index_metadata_weight,
51 bloom_filter_index_content_weight,
52 )
53 }
54}
55
56fn bloom_filter_index_metadata_weight(
58 k: &(FileId, ColumnId, Tag),
59 _: &Arc<BloomFilterMeta>,
60) -> u32 {
61 (k.0.as_bytes().len()
62 + std::mem::size_of::<ColumnId>()
63 + std::mem::size_of::<BloomFilterMeta>()) as u32
64}
65
66fn bloom_filter_index_content_weight(
68 (k, _): &((FileId, ColumnId, Tag), PageKey),
69 v: &Bytes,
70) -> u32 {
71 (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
72}
73
74pub struct CachedBloomFilterIndexBlobReader<R> {
76 file_id: FileId,
77 column_id: ColumnId,
78 tag: Tag,
79 blob_size: u64,
80 inner: R,
81 cache: BloomFilterIndexCacheRef,
82}
83
84impl<R> CachedBloomFilterIndexBlobReader<R> {
85 pub fn new(
87 file_id: FileId,
88 column_id: ColumnId,
89 tag: Tag,
90 blob_size: u64,
91 inner: R,
92 cache: BloomFilterIndexCacheRef,
93 ) -> Self {
94 Self {
95 file_id,
96 column_id,
97 tag,
98 blob_size,
99 inner,
100 cache,
101 }
102 }
103}
104
105#[async_trait]
106impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
107 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
108 let inner = &self.inner;
109 self.cache
110 .get_or_load(
111 (self.file_id, self.column_id, self.tag),
112 self.blob_size,
113 offset,
114 size,
115 move |ranges| async move { inner.read_vec(&ranges).await },
116 )
117 .await
118 .map(|b| b.into())
119 }
120
121 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
122 let mut pages = Vec::with_capacity(ranges.len());
123 for range in ranges {
124 let inner = &self.inner;
125 let page = self
126 .cache
127 .get_or_load(
128 (self.file_id, self.column_id, self.tag),
129 self.blob_size,
130 range.start,
131 (range.end - range.start) as u32,
132 move |ranges| async move { inner.read_vec(&ranges).await },
133 )
134 .await?;
135
136 pages.push(Bytes::from(page));
137 }
138
139 Ok(pages)
140 }
141
142 async fn metadata(&self) -> Result<BloomFilterMeta> {
144 if let Some(cached) = self
145 .cache
146 .get_metadata((self.file_id, self.column_id, self.tag))
147 {
148 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
149 Ok((*cached).clone())
150 } else {
151 let meta = self.inner.metadata().await?;
152 self.cache.put_metadata(
153 (self.file_id, self.column_id, self.tag),
154 Arc::new(meta.clone()),
155 );
156 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
157 Ok(meta)
158 }
159 }
160}
161
162#[cfg(test)]
163mod test {
164 use rand::{Rng, RngCore};
165
166 use super::*;
167
168 const FUZZ_REPEAT_TIMES: usize = 100;
169
170 #[test]
171 fn fuzz_index_calculation() {
172 let mut rng = rand::rng();
173 let mut data = vec![0u8; 1024 * 1024];
174 rng.fill_bytes(&mut data);
175
176 for _ in 0..FUZZ_REPEAT_TIMES {
177 let offset = rng.random_range(0..data.len() as u64);
178 let size = rng.random_range(0..data.len() as u32 - offset as u32);
179 let page_size: usize = rng.random_range(1..1024);
180
181 let indexes =
182 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
183 let page_num = indexes.len();
184 let mut read = Vec::with_capacity(size as usize);
185 for key in indexes.into_iter() {
186 let start = key.page_id as usize * page_size;
187 let page = if start + page_size < data.len() {
188 &data[start..start + page_size]
189 } else {
190 &data[start..]
191 };
192 read.extend_from_slice(page);
193 }
194 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
195 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
196 assert_eq!(
197 read,
198 data.get(expected_range).unwrap(),
199 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
200 offset,
201 size,
202 page_size,
203 read.len(),
204 size as usize,
205 PageKey::calculate_range(offset, size, page_size as u64),
206 page_num
207 );
208 }
209 }
210}