1use std::ops::Range;
16use std::sync::Arc;
17
18use api::v1::index::BloomFilterMeta;
19use async_trait::async_trait;
20use bytes::Bytes;
21use futures::future::try_join_all;
22use index::bloom_filter::error::Result;
23use index::bloom_filter::reader::BloomFilterReader;
24use store_api::storage::ColumnId;
25
26use crate::cache::index::{IndexCache, PageKey, INDEX_METADATA_TYPE};
27use crate::metrics::{CACHE_HIT, CACHE_MISS};
28use crate::sst::file::FileId;
29
30const INDEX_TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
34pub enum Tag {
35 Skipping,
36 Fulltext,
37}
38
39pub type BloomFilterIndexCache = IndexCache<(FileId, ColumnId, Tag), BloomFilterMeta>;
41pub type BloomFilterIndexCacheRef = Arc<BloomFilterIndexCache>;
42
43impl BloomFilterIndexCache {
44 pub fn new(index_metadata_cap: u64, index_content_cap: u64, page_size: u64) -> Self {
46 Self::new_with_weighter(
47 index_metadata_cap,
48 index_content_cap,
49 page_size,
50 INDEX_TYPE_BLOOM_FILTER_INDEX,
51 bloom_filter_index_metadata_weight,
52 bloom_filter_index_content_weight,
53 )
54 }
55}
56
57fn bloom_filter_index_metadata_weight(
59 k: &(FileId, ColumnId, Tag),
60 _: &Arc<BloomFilterMeta>,
61) -> u32 {
62 (k.0.as_bytes().len()
63 + std::mem::size_of::<ColumnId>()
64 + std::mem::size_of::<BloomFilterMeta>()) as u32
65}
66
67fn bloom_filter_index_content_weight(
69 (k, _): &((FileId, ColumnId, Tag), PageKey),
70 v: &Bytes,
71) -> u32 {
72 (k.0.as_bytes().len() + std::mem::size_of::<ColumnId>() + v.len()) as u32
73}
74
75pub struct CachedBloomFilterIndexBlobReader<R> {
77 file_id: FileId,
78 column_id: ColumnId,
79 tag: Tag,
80 blob_size: u64,
81 inner: R,
82 cache: BloomFilterIndexCacheRef,
83}
84
85impl<R> CachedBloomFilterIndexBlobReader<R> {
86 pub fn new(
88 file_id: FileId,
89 column_id: ColumnId,
90 tag: Tag,
91 blob_size: u64,
92 inner: R,
93 cache: BloomFilterIndexCacheRef,
94 ) -> Self {
95 Self {
96 file_id,
97 column_id,
98 tag,
99 blob_size,
100 inner,
101 cache,
102 }
103 }
104}
105
106#[async_trait]
107impl<R: BloomFilterReader + Send> BloomFilterReader for CachedBloomFilterIndexBlobReader<R> {
108 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
109 let inner = &self.inner;
110 self.cache
111 .get_or_load(
112 (self.file_id, self.column_id, self.tag),
113 self.blob_size,
114 offset,
115 size,
116 move |ranges| async move { inner.read_vec(&ranges).await },
117 )
118 .await
119 .map(|b| b.into())
120 }
121
122 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
123 let fetch = ranges.iter().map(|range| {
124 let inner = &self.inner;
125 self.cache.get_or_load(
126 (self.file_id, self.column_id, self.tag),
127 self.blob_size,
128 range.start,
129 (range.end - range.start) as u32,
130 move |ranges| async move { inner.read_vec(&ranges).await },
131 )
132 });
133 Ok(try_join_all(fetch)
134 .await?
135 .into_iter()
136 .map(Bytes::from)
137 .collect::<Vec<_>>())
138 }
139
140 async fn metadata(&self) -> Result<BloomFilterMeta> {
142 if let Some(cached) = self
143 .cache
144 .get_metadata((self.file_id, self.column_id, self.tag))
145 {
146 CACHE_HIT.with_label_values(&[INDEX_METADATA_TYPE]).inc();
147 Ok((*cached).clone())
148 } else {
149 let meta = self.inner.metadata().await?;
150 self.cache.put_metadata(
151 (self.file_id, self.column_id, self.tag),
152 Arc::new(meta.clone()),
153 );
154 CACHE_MISS.with_label_values(&[INDEX_METADATA_TYPE]).inc();
155 Ok(meta)
156 }
157 }
158}
159
160#[cfg(test)]
161mod test {
162 use rand::{Rng, RngCore};
163
164 use super::*;
165
166 const FUZZ_REPEAT_TIMES: usize = 100;
167
168 #[test]
169 fn fuzz_index_calculation() {
170 let mut rng = rand::rng();
171 let mut data = vec![0u8; 1024 * 1024];
172 rng.fill_bytes(&mut data);
173
174 for _ in 0..FUZZ_REPEAT_TIMES {
175 let offset = rng.random_range(0..data.len() as u64);
176 let size = rng.random_range(0..data.len() as u32 - offset as u32);
177 let page_size: usize = rng.random_range(1..1024);
178
179 let indexes =
180 PageKey::generate_page_keys(offset, size, page_size as u64).collect::<Vec<_>>();
181 let page_num = indexes.len();
182 let mut read = Vec::with_capacity(size as usize);
183 for key in indexes.into_iter() {
184 let start = key.page_id as usize * page_size;
185 let page = if start + page_size < data.len() {
186 &data[start..start + page_size]
187 } else {
188 &data[start..]
189 };
190 read.extend_from_slice(page);
191 }
192 let expected_range = offset as usize..(offset + size as u64 as u64) as usize;
193 let read = read[PageKey::calculate_range(offset, size, page_size as u64)].to_vec();
194 assert_eq!(
195 read,
196 data.get(expected_range).unwrap(),
197 "fuzz_read_index failed, offset: {}, size: {}, page_size: {}\nread len: {}, expected len: {}\nrange: {:?}, page num: {}",
198 offset,
199 size,
200 page_size,
201 read.len(),
202 size as usize,
203 PageKey::calculate_range(offset, size, page_size as u64),
204 page_num
205 );
206 }
207 }
208}