index/bloom_filter/
reader.rs1use std::ops::Range;
16
17use async_trait::async_trait;
18use bytes::Bytes;
19use common_base::range_read::RangeReader;
20use fastbloom::BloomFilter;
21use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
22use prost::Message;
23use snafu::{ensure, ResultExt};
24
25use crate::bloom_filter::error::{
26 DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
27};
28use crate::bloom_filter::SEED;
29
30const BLOOM_META_LEN_SIZE: u64 = 4;
32
33pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; #[async_trait]
38pub trait BloomFilterReader: Sync {
39 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
41
42 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
44 let mut results = Vec::with_capacity(ranges.len());
45 for range in ranges {
46 let size = (range.end - range.start) as u32;
47 let data = self.range_read(range.start, size).await?;
48 results.push(data);
49 }
50 Ok(results)
51 }
52
53 async fn metadata(&self) -> Result<BloomFilterMeta>;
55
56 async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
58 let bytes = self.range_read(loc.offset, loc.size as _).await?;
59 let vec = bytes
60 .chunks_exact(std::mem::size_of::<u64>())
61 .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
62 .collect();
63 let bm = BloomFilter::from_vec(vec)
64 .seed(&SEED)
65 .expected_items(loc.element_count as _);
66 Ok(bm)
67 }
68
69 async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
70 let ranges = locs
71 .iter()
72 .map(|l| l.offset..l.offset + l.size)
73 .collect::<Vec<_>>();
74 let bss = self.read_vec(&ranges).await?;
75
76 let mut result = Vec::with_capacity(bss.len());
77 for (bs, loc) in bss.into_iter().zip(locs.iter()) {
78 let vec = bs
79 .chunks_exact(std::mem::size_of::<u64>())
80 .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
81 .collect();
82 let bm = BloomFilter::from_vec(vec)
83 .seed(&SEED)
84 .expected_items(loc.element_count as _);
85 result.push(bm);
86 }
87
88 Ok(result)
89 }
90}
91
92pub struct BloomFilterReaderImpl<R: RangeReader> {
94 reader: R,
96}
97
98impl<R: RangeReader> BloomFilterReaderImpl<R> {
99 pub fn new(reader: R) -> Self {
101 Self { reader }
102 }
103}
104
105#[async_trait]
106impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
107 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
108 self.reader
109 .read(offset..offset + size as u64)
110 .await
111 .context(IoSnafu)
112 }
113
114 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
115 self.reader.read_vec(ranges).await.context(IoSnafu)
116 }
117
118 async fn metadata(&self) -> Result<BloomFilterMeta> {
119 let metadata = self.reader.metadata().await.context(IoSnafu)?;
120 let file_size = metadata.content_length;
121
122 let mut meta_reader =
123 BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
124 meta_reader.metadata().await
125 }
126}
127
128struct BloomFilterMetaReader<R: RangeReader> {
130 reader: R,
131 file_size: u64,
132 prefetch_size: u64,
133}
134
135impl<R: RangeReader> BloomFilterMetaReader<R> {
136 pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
137 Self {
138 reader,
139 file_size,
140 prefetch_size: prefetch_size
141 .unwrap_or(BLOOM_META_LEN_SIZE)
142 .max(BLOOM_META_LEN_SIZE),
143 }
144 }
145
146 pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
151 ensure!(
152 self.file_size >= BLOOM_META_LEN_SIZE,
153 FileSizeTooSmallSnafu {
154 size: self.file_size,
155 }
156 );
157
158 let meta_start = self.file_size.saturating_sub(self.prefetch_size);
159 let suffix = self
160 .reader
161 .read(meta_start..self.file_size)
162 .await
163 .context(IoSnafu)?;
164 let suffix_len = suffix.len();
165 let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
166 self.validate_meta_size(length)?;
167
168 if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
169 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
170 let meta = self
171 .reader
172 .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
173 .await
174 .context(IoSnafu)?;
175 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
176 } else {
177 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
178 let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
179 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
180 }
181 }
182
183 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
184 let suffix_len = suffix.len();
185 ensure!(
186 suffix_len >= 4,
187 FileSizeTooSmallSnafu {
188 size: suffix_len as u64
189 }
190 );
191 let mut bytes = [0; 4];
192 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
193
194 Ok(bytes)
195 }
196
197 fn validate_meta_size(&self, length: u64) -> Result<()> {
198 let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
199 ensure!(
200 length <= max_meta_size,
201 UnexpectedMetaSizeSnafu {
202 max_meta_size,
203 actual_meta_size: length,
204 }
205 );
206 Ok(())
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use std::sync::atomic::AtomicUsize;
213 use std::sync::Arc;
214
215 use futures::io::Cursor;
216
217 use super::*;
218 use crate::bloom_filter::creator::BloomFilterCreator;
219 use crate::external_provider::MockExternalTempFileProvider;
220
221 async fn mock_bloom_filter_bytes() -> Vec<u8> {
222 let mut writer = Cursor::new(vec![]);
223 let mut creator = BloomFilterCreator::new(
224 2,
225 Arc::new(MockExternalTempFileProvider::new()),
226 Arc::new(AtomicUsize::new(0)),
227 None,
228 );
229
230 creator
231 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
232 .await
233 .unwrap();
234 creator
235 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
236 .await
237 .unwrap();
238 creator
239 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
240 .await
241 .unwrap();
242
243 creator.finish(&mut writer).await.unwrap();
244
245 writer.into_inner()
246 }
247
248 #[tokio::test]
249 async fn test_bloom_filter_meta_reader() {
250 let bytes = mock_bloom_filter_bytes().await;
251 let file_size = bytes.len() as u64;
252
253 for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
254 let mut reader =
255 BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
256 let meta = reader.metadata().await.unwrap();
257
258 assert_eq!(meta.rows_per_segment, 2);
259 assert_eq!(meta.segment_count, 2);
260 assert_eq!(meta.row_count, 3);
261 assert_eq!(meta.bloom_filter_locs.len(), 2);
262
263 assert_eq!(meta.bloom_filter_locs[0].offset, 0);
264 assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
265 assert_eq!(
266 meta.bloom_filter_locs[1].offset,
267 meta.bloom_filter_locs[0].size
268 );
269 assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
270 }
271 }
272
273 #[tokio::test]
274 async fn test_bloom_filter_reader() {
275 let bytes = mock_bloom_filter_bytes().await;
276
277 let reader = BloomFilterReaderImpl::new(bytes);
278 let meta = reader.metadata().await.unwrap();
279
280 assert_eq!(meta.bloom_filter_locs.len(), 2);
281 let bf = reader
282 .bloom_filter(&meta.bloom_filter_locs[0])
283 .await
284 .unwrap();
285 assert!(bf.contains(&b"a"));
286 assert!(bf.contains(&b"b"));
287 assert!(bf.contains(&b"c"));
288 assert!(bf.contains(&b"d"));
289
290 let bf = reader
291 .bloom_filter(&meta.bloom_filter_locs[1])
292 .await
293 .unwrap();
294 assert!(bf.contains(&b"e"));
295 assert!(bf.contains(&b"f"));
296 }
297}