index/bloom_filter/
reader.rs1use std::ops::{Range, Rem};
16
17use async_trait::async_trait;
18use bytemuck::try_cast_slice;
19use bytes::Bytes;
20use common_base::range_read::RangeReader;
21use fastbloom::BloomFilter;
22use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
23use prost::Message;
24use snafu::{ensure, ResultExt};
25
26use crate::bloom_filter::error::{
27 DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
28};
29use crate::bloom_filter::SEED;
30
31const BLOOM_META_LEN_SIZE: u64 = 4;
33
34pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
42 let aligned_length = bytes.len() - bytes.len().rem(std::mem::size_of::<u64>());
44 let byte_slice = &bytes[..aligned_length];
45
46 let u64_vec = if let Ok(u64_slice) = try_cast_slice::<u8, u64>(byte_slice) {
48 u64_slice.to_vec()
49 } else {
50 let u64_count = byte_slice.len() / std::mem::size_of::<u64>();
52 let mut u64_vec = Vec::<u64>::with_capacity(u64_count);
53
54 let dest_slice = unsafe {
57 std::slice::from_raw_parts_mut(u64_vec.as_mut_ptr() as *mut u8, byte_slice.len())
58 };
59 dest_slice.copy_from_slice(byte_slice);
60
61 unsafe { u64_vec.set_len(u64_count) };
63 u64_vec
64 };
65
66 #[cfg(target_endian = "little")]
69 {
70 u64_vec
71 }
72 #[cfg(target_endian = "big")]
73 {
74 u64_vec.into_iter().map(|x| x.swap_bytes()).collect()
75 }
76}
77
78#[async_trait]
80pub trait BloomFilterReader: Sync {
81 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
83
84 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
86 let mut results = Vec::with_capacity(ranges.len());
87 for range in ranges {
88 let size = (range.end - range.start) as u32;
89 let data = self.range_read(range.start, size).await?;
90 results.push(data);
91 }
92 Ok(results)
93 }
94
95 async fn metadata(&self) -> Result<BloomFilterMeta>;
97
98 async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
100 let bytes = self.range_read(loc.offset, loc.size as _).await?;
101 let vec = bytes_to_u64_vec(&bytes);
102 let bm = BloomFilter::from_vec(vec)
103 .seed(&SEED)
104 .expected_items(loc.element_count as _);
105 Ok(bm)
106 }
107
108 async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
109 let ranges = locs
110 .iter()
111 .map(|l| l.offset..l.offset + l.size)
112 .collect::<Vec<_>>();
113 let bss = self.read_vec(&ranges).await?;
114
115 let mut result = Vec::with_capacity(bss.len());
116 for (bs, loc) in bss.into_iter().zip(locs.iter()) {
117 let vec = bytes_to_u64_vec(&bs);
118 let bm = BloomFilter::from_vec(vec)
119 .seed(&SEED)
120 .expected_items(loc.element_count as _);
121 result.push(bm);
122 }
123
124 Ok(result)
125 }
126}
127
128pub struct BloomFilterReaderImpl<R: RangeReader> {
130 reader: R,
132}
133
134impl<R: RangeReader> BloomFilterReaderImpl<R> {
135 pub fn new(reader: R) -> Self {
137 Self { reader }
138 }
139}
140
141#[async_trait]
142impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
143 async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
144 self.reader
145 .read(offset..offset + size as u64)
146 .await
147 .context(IoSnafu)
148 }
149
150 async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
151 self.reader.read_vec(ranges).await.context(IoSnafu)
152 }
153
154 async fn metadata(&self) -> Result<BloomFilterMeta> {
155 let metadata = self.reader.metadata().await.context(IoSnafu)?;
156 let file_size = metadata.content_length;
157
158 let mut meta_reader =
159 BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
160 meta_reader.metadata().await
161 }
162}
163
164struct BloomFilterMetaReader<R: RangeReader> {
166 reader: R,
167 file_size: u64,
168 prefetch_size: u64,
169}
170
171impl<R: RangeReader> BloomFilterMetaReader<R> {
172 pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
173 Self {
174 reader,
175 file_size,
176 prefetch_size: prefetch_size
177 .unwrap_or(BLOOM_META_LEN_SIZE)
178 .max(BLOOM_META_LEN_SIZE),
179 }
180 }
181
182 pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
187 ensure!(
188 self.file_size >= BLOOM_META_LEN_SIZE,
189 FileSizeTooSmallSnafu {
190 size: self.file_size,
191 }
192 );
193
194 let meta_start = self.file_size.saturating_sub(self.prefetch_size);
195 let suffix = self
196 .reader
197 .read(meta_start..self.file_size)
198 .await
199 .context(IoSnafu)?;
200 let suffix_len = suffix.len();
201 let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
202 self.validate_meta_size(length)?;
203
204 if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
205 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
206 let meta = self
207 .reader
208 .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
209 .await
210 .context(IoSnafu)?;
211 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
212 } else {
213 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
214 let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
215 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
216 }
217 }
218
219 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
220 let suffix_len = suffix.len();
221 ensure!(
222 suffix_len >= 4,
223 FileSizeTooSmallSnafu {
224 size: suffix_len as u64
225 }
226 );
227 let mut bytes = [0; 4];
228 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
229
230 Ok(bytes)
231 }
232
233 fn validate_meta_size(&self, length: u64) -> Result<()> {
234 let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
235 ensure!(
236 length <= max_meta_size,
237 UnexpectedMetaSizeSnafu {
238 max_meta_size,
239 actual_meta_size: length,
240 }
241 );
242 Ok(())
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::sync::atomic::AtomicUsize;
249 use std::sync::Arc;
250
251 use futures::io::Cursor;
252
253 use super::*;
254 use crate::bloom_filter::creator::BloomFilterCreator;
255 use crate::external_provider::MockExternalTempFileProvider;
256
257 async fn mock_bloom_filter_bytes() -> Vec<u8> {
258 let mut writer = Cursor::new(vec![]);
259 let mut creator = BloomFilterCreator::new(
260 2,
261 0.01,
262 Arc::new(MockExternalTempFileProvider::new()),
263 Arc::new(AtomicUsize::new(0)),
264 None,
265 );
266
267 creator
268 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
269 .await
270 .unwrap();
271 creator
272 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
273 .await
274 .unwrap();
275 creator
276 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
277 .await
278 .unwrap();
279
280 creator.finish(&mut writer).await.unwrap();
281
282 writer.into_inner()
283 }
284
285 #[tokio::test]
286 async fn test_bloom_filter_meta_reader() {
287 let bytes = mock_bloom_filter_bytes().await;
288 let file_size = bytes.len() as u64;
289
290 for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
291 let mut reader =
292 BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
293 let meta = reader.metadata().await.unwrap();
294
295 assert_eq!(meta.rows_per_segment, 2);
296 assert_eq!(meta.segment_count, 2);
297 assert_eq!(meta.row_count, 3);
298 assert_eq!(meta.bloom_filter_locs.len(), 2);
299
300 assert_eq!(meta.bloom_filter_locs[0].offset, 0);
301 assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
302 assert_eq!(
303 meta.bloom_filter_locs[1].offset,
304 meta.bloom_filter_locs[0].size
305 );
306 assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
307 }
308 }
309
310 #[tokio::test]
311 async fn test_bloom_filter_reader() {
312 let bytes = mock_bloom_filter_bytes().await;
313
314 let reader = BloomFilterReaderImpl::new(bytes);
315 let meta = reader.metadata().await.unwrap();
316
317 assert_eq!(meta.bloom_filter_locs.len(), 2);
318 let bf = reader
319 .bloom_filter(&meta.bloom_filter_locs[0])
320 .await
321 .unwrap();
322 assert!(bf.contains(&b"a"));
323 assert!(bf.contains(&b"b"));
324 assert!(bf.contains(&b"c"));
325 assert!(bf.contains(&b"d"));
326
327 let bf = reader
328 .bloom_filter(&meta.bloom_filter_locs[1])
329 .await
330 .unwrap();
331 assert!(bf.contains(&b"e"));
332 assert!(bf.contains(&b"f"));
333 }
334}