1use std::ops::{Range, Rem};
16use std::time::{Duration, Instant};
17
18use async_trait::async_trait;
19use bytemuck::try_cast_slice;
20use bytes::Bytes;
21use common_base::range_read::RangeReader;
22use fastbloom::BloomFilter;
23use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
24use prost::Message;
25use snafu::{ResultExt, ensure};
26
27use crate::bloom_filter::SEED;
28use crate::bloom_filter::error::{
29 DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
30};
31
32const BLOOM_META_LEN_SIZE: u64 = 4;
34
35pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; #[derive(Default, Clone)]
40pub struct BloomFilterReadMetrics {
41 pub total_bytes: u64,
43 pub total_ranges: usize,
45 pub fetch_elapsed: Duration,
47 pub cache_hit: usize,
49 pub cache_miss: usize,
51}
52
53impl std::fmt::Debug for BloomFilterReadMetrics {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 let Self {
56 total_bytes,
57 total_ranges,
58 fetch_elapsed,
59 cache_hit,
60 cache_miss,
61 } = self;
62
63 if *total_bytes == 0 && *cache_hit == 0 {
65 return write!(f, "{{}}");
66 }
67 write!(f, "{{")?;
68
69 if *total_bytes > 0 {
70 write!(f, "\"total_bytes\":{}", total_bytes)?;
71 }
72 if *cache_hit > 0 {
73 if *total_bytes > 0 {
74 write!(f, ", ")?;
75 }
76 write!(f, "\"cache_hit\":{}", cache_hit)?;
77 }
78
79 if *total_ranges > 0 {
80 write!(f, ", \"total_ranges\":{}", total_ranges)?;
81 }
82 if !fetch_elapsed.is_zero() {
83 write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?;
84 }
85 if *cache_miss > 0 {
86 write!(f, ", \"cache_miss\":{}", cache_miss)?;
87 }
88
89 write!(f, "}}")
90 }
91}
92
93impl BloomFilterReadMetrics {
94 pub fn merge_from(&mut self, other: &Self) {
96 self.total_bytes += other.total_bytes;
97 self.total_ranges += other.total_ranges;
98 self.fetch_elapsed += other.fetch_elapsed;
99 self.cache_hit += other.cache_hit;
100 self.cache_miss += other.cache_miss;
101 }
102}
103
104pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
109 let aligned_length = bytes.len() - bytes.len().rem(std::mem::size_of::<u64>());
111 let byte_slice = &bytes[..aligned_length];
112
113 let u64_vec = if let Ok(u64_slice) = try_cast_slice::<u8, u64>(byte_slice) {
115 u64_slice.to_vec()
116 } else {
117 let u64_count = byte_slice.len() / std::mem::size_of::<u64>();
119 let mut u64_vec = Vec::<u64>::with_capacity(u64_count);
120
121 let dest_slice = unsafe {
124 std::slice::from_raw_parts_mut(u64_vec.as_mut_ptr() as *mut u8, byte_slice.len())
125 };
126 dest_slice.copy_from_slice(byte_slice);
127
128 unsafe { u64_vec.set_len(u64_count) };
130 u64_vec
131 };
132
133 #[cfg(target_endian = "little")]
136 {
137 u64_vec
138 }
139 #[cfg(target_endian = "big")]
140 {
141 u64_vec.into_iter().map(|x| x.swap_bytes()).collect()
142 }
143}
144
145#[async_trait]
147pub trait BloomFilterReader: Sync {
148 async fn range_read(
150 &self,
151 offset: u64,
152 size: u32,
153 metrics: Option<&mut BloomFilterReadMetrics>,
154 ) -> Result<Bytes>;
155
156 async fn read_vec(
158 &self,
159 ranges: &[Range<u64>],
160 metrics: Option<&mut BloomFilterReadMetrics>,
161 ) -> Result<Vec<Bytes>>;
162
163 async fn metadata(
165 &self,
166 metrics: Option<&mut BloomFilterReadMetrics>,
167 ) -> Result<BloomFilterMeta>;
168
169 async fn bloom_filter(
171 &self,
172 loc: &BloomFilterLoc,
173 metrics: Option<&mut BloomFilterReadMetrics>,
174 ) -> Result<BloomFilter> {
175 let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
176 let vec = bytes_to_u64_vec(&bytes);
177 let bm = BloomFilter::from_vec(vec)
178 .seed(&SEED)
179 .expected_items(loc.element_count as _);
180 Ok(bm)
181 }
182
183 async fn bloom_filter_vec(
184 &self,
185 locs: &[BloomFilterLoc],
186 metrics: Option<&mut BloomFilterReadMetrics>,
187 ) -> Result<Vec<BloomFilter>> {
188 let ranges = locs
189 .iter()
190 .map(|l| l.offset..l.offset + l.size)
191 .collect::<Vec<_>>();
192 let bss = self.read_vec(&ranges, metrics).await?;
193
194 let mut result = Vec::with_capacity(bss.len());
195 for (bs, loc) in bss.into_iter().zip(locs.iter()) {
196 let vec = bytes_to_u64_vec(&bs);
197 let bm = BloomFilter::from_vec(vec)
198 .seed(&SEED)
199 .expected_items(loc.element_count as _);
200 result.push(bm);
201 }
202
203 Ok(result)
204 }
205}
206
207pub struct BloomFilterReaderImpl<R: RangeReader> {
209 reader: R,
211}
212
213impl<R: RangeReader> BloomFilterReaderImpl<R> {
214 pub fn new(reader: R) -> Self {
216 Self { reader }
217 }
218}
219
220#[async_trait]
221impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
222 async fn range_read(
223 &self,
224 offset: u64,
225 size: u32,
226 metrics: Option<&mut BloomFilterReadMetrics>,
227 ) -> Result<Bytes> {
228 let start = metrics.as_ref().map(|_| Instant::now());
229 let result = self
230 .reader
231 .read(offset..offset + size as u64)
232 .await
233 .context(IoSnafu)?;
234
235 if let Some(m) = metrics {
236 m.total_ranges += 1;
237 m.total_bytes += size as u64;
238 if let Some(start) = start {
239 m.fetch_elapsed += start.elapsed();
240 }
241 }
242
243 Ok(result)
244 }
245
246 async fn read_vec(
247 &self,
248 ranges: &[Range<u64>],
249 metrics: Option<&mut BloomFilterReadMetrics>,
250 ) -> Result<Vec<Bytes>> {
251 let start = metrics.as_ref().map(|_| Instant::now());
252 let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
253
254 if let Some(m) = metrics {
255 m.total_ranges += ranges.len();
256 m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
257 if let Some(start) = start {
258 m.fetch_elapsed += start.elapsed();
259 }
260 }
261
262 Ok(result)
263 }
264
265 async fn metadata(
266 &self,
267 metrics: Option<&mut BloomFilterReadMetrics>,
268 ) -> Result<BloomFilterMeta> {
269 let metadata = self.reader.metadata().await.context(IoSnafu)?;
270 let file_size = metadata.content_length;
271
272 let mut meta_reader =
273 BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
274 meta_reader.metadata(metrics).await
275 }
276}
277
278struct BloomFilterMetaReader<R: RangeReader> {
280 reader: R,
281 file_size: u64,
282 prefetch_size: u64,
283}
284
285impl<R: RangeReader> BloomFilterMetaReader<R> {
286 pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
287 Self {
288 reader,
289 file_size,
290 prefetch_size: prefetch_size
291 .unwrap_or(BLOOM_META_LEN_SIZE)
292 .max(BLOOM_META_LEN_SIZE),
293 }
294 }
295
296 pub async fn metadata(
301 &mut self,
302 metrics: Option<&mut BloomFilterReadMetrics>,
303 ) -> Result<BloomFilterMeta> {
304 ensure!(
305 self.file_size >= BLOOM_META_LEN_SIZE,
306 FileSizeTooSmallSnafu {
307 size: self.file_size,
308 }
309 );
310
311 let start = metrics.as_ref().map(|_| Instant::now());
312 let meta_start = self.file_size.saturating_sub(self.prefetch_size);
313 let suffix = self
314 .reader
315 .read(meta_start..self.file_size)
316 .await
317 .context(IoSnafu)?;
318 let suffix_len = suffix.len();
319 let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
320 self.validate_meta_size(length)?;
321
322 if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
323 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
324 let meta = self
325 .reader
326 .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
327 .await
328 .context(IoSnafu)?;
329
330 if let Some(m) = metrics {
331 m.total_ranges += 2;
333 m.total_bytes += self.file_size.min(self.prefetch_size) + length;
335 if let Some(start) = start {
336 m.fetch_elapsed += start.elapsed();
337 }
338 }
339
340 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
341 } else {
342 if let Some(m) = metrics {
343 m.total_ranges += 1;
345 m.total_bytes += self.file_size.min(self.prefetch_size);
346 if let Some(start) = start {
347 m.fetch_elapsed += start.elapsed();
348 }
349 }
350
351 let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
352 let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
353 BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
354 }
355 }
356
357 fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
358 let suffix_len = suffix.len();
359 ensure!(
360 suffix_len >= 4,
361 FileSizeTooSmallSnafu {
362 size: suffix_len as u64
363 }
364 );
365 let mut bytes = [0; 4];
366 bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
367
368 Ok(bytes)
369 }
370
371 fn validate_meta_size(&self, length: u64) -> Result<()> {
372 let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
373 ensure!(
374 length <= max_meta_size,
375 UnexpectedMetaSizeSnafu {
376 max_meta_size,
377 actual_meta_size: length,
378 }
379 );
380 Ok(())
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use std::sync::Arc;
387 use std::sync::atomic::AtomicUsize;
388
389 use futures::io::Cursor;
390
391 use super::*;
392 use crate::bloom_filter::creator::BloomFilterCreator;
393 use crate::external_provider::MockExternalTempFileProvider;
394
395 async fn mock_bloom_filter_bytes() -> Vec<u8> {
396 let mut writer = Cursor::new(vec![]);
397 let mut creator = BloomFilterCreator::new(
398 2,
399 0.01,
400 Arc::new(MockExternalTempFileProvider::new()),
401 Arc::new(AtomicUsize::new(0)),
402 None,
403 );
404
405 creator
406 .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
407 .await
408 .unwrap();
409 creator
410 .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
411 .await
412 .unwrap();
413 creator
414 .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
415 .await
416 .unwrap();
417
418 creator.finish(&mut writer).await.unwrap();
419
420 writer.into_inner()
421 }
422
423 #[tokio::test]
424 async fn test_bloom_filter_meta_reader() {
425 let bytes = mock_bloom_filter_bytes().await;
426 let file_size = bytes.len() as u64;
427
428 for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
429 let mut reader =
430 BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
431 let meta = reader.metadata(None).await.unwrap();
432
433 assert_eq!(meta.rows_per_segment, 2);
434 assert_eq!(meta.segment_count, 2);
435 assert_eq!(meta.row_count, 3);
436 assert_eq!(meta.bloom_filter_locs.len(), 2);
437
438 assert_eq!(meta.bloom_filter_locs[0].offset, 0);
439 assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
440 assert_eq!(
441 meta.bloom_filter_locs[1].offset,
442 meta.bloom_filter_locs[0].size
443 );
444 assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
445 }
446 }
447
448 #[tokio::test]
449 async fn test_bloom_filter_reader() {
450 let bytes = mock_bloom_filter_bytes().await;
451
452 let reader = BloomFilterReaderImpl::new(bytes);
453 let meta = reader.metadata(None).await.unwrap();
454
455 assert_eq!(meta.bloom_filter_locs.len(), 2);
456 let bf = reader
457 .bloom_filter(&meta.bloom_filter_locs[0], None)
458 .await
459 .unwrap();
460 assert!(bf.contains(&b"a"));
461 assert!(bf.contains(&b"b"));
462 assert!(bf.contains(&b"c"));
463 assert!(bf.contains(&b"d"));
464
465 let bf = reader
466 .bloom_filter(&meta.bloom_filter_locs[1], None)
467 .await
468 .unwrap();
469 assert!(bf.contains(&b"e"));
470 assert!(bf.contains(&b"f"));
471 }
472}