index/bloom_filter/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Range, Rem};
16use std::time::{Duration, Instant};
17
18use async_trait::async_trait;
19use bytemuck::try_cast_slice;
20use bytes::Bytes;
21use common_base::range_read::RangeReader;
22use fastbloom::BloomFilter;
23use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
24use prost::Message;
25use snafu::{ResultExt, ensure};
26
27use crate::bloom_filter::SEED;
28use crate::bloom_filter::error::{
29    DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
30};
31
32/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
33const BLOOM_META_LEN_SIZE: u64 = 4;
34
35/// Default prefetch size of bloom filter meta.
36pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
37
38/// Metrics for bloom filter read operations.
39#[derive(Default, Clone)]
40pub struct BloomFilterReadMetrics {
41    /// Total byte size to read.
42    pub total_bytes: u64,
43    /// Total number of ranges to read.
44    pub total_ranges: usize,
45    /// Elapsed time to fetch data.
46    pub fetch_elapsed: Duration,
47    /// Number of cache hits.
48    pub cache_hit: usize,
49    /// Number of cache misses.
50    pub cache_miss: usize,
51}
52
53impl std::fmt::Debug for BloomFilterReadMetrics {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        let Self {
56            total_bytes,
57            total_ranges,
58            fetch_elapsed,
59            cache_hit,
60            cache_miss,
61        } = self;
62
63        // If both total_bytes and cache_hit are 0, we didn't read anything.
64        if *total_bytes == 0 && *cache_hit == 0 {
65            return write!(f, "{{}}");
66        }
67        write!(f, "{{")?;
68
69        if *total_bytes > 0 {
70            write!(f, "\"total_bytes\":{}", total_bytes)?;
71        }
72        if *cache_hit > 0 {
73            if *total_bytes > 0 {
74                write!(f, ", ")?;
75            }
76            write!(f, "\"cache_hit\":{}", cache_hit)?;
77        }
78
79        if *total_ranges > 0 {
80            write!(f, ", \"total_ranges\":{}", total_ranges)?;
81        }
82        if !fetch_elapsed.is_zero() {
83            write!(f, ", \"fetch_elapsed\":\"{:?}\"", fetch_elapsed)?;
84        }
85        if *cache_miss > 0 {
86            write!(f, ", \"cache_miss\":{}", cache_miss)?;
87        }
88
89        write!(f, "}}")
90    }
91}
92
93impl BloomFilterReadMetrics {
94    /// Merges another metrics into this one.
95    pub fn merge_from(&mut self, other: &Self) {
96        self.total_bytes += other.total_bytes;
97        self.total_ranges += other.total_ranges;
98        self.fetch_elapsed += other.fetch_elapsed;
99        self.cache_hit += other.cache_hit;
100        self.cache_miss += other.cache_miss;
101    }
102}
103
104/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
105/// Faster than chunking and converting each piece individually.
106///
107/// The input bytes are a sequence of little-endian u64s.
108pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
109    // drop tailing things, this keeps the same behavior with `chunks_exact`.
110    let aligned_length = bytes.len() - bytes.len().rem(std::mem::size_of::<u64>());
111    let byte_slice = &bytes[..aligned_length];
112
113    // Try fast path first: direct cast if aligned
114    let u64_vec = if let Ok(u64_slice) = try_cast_slice::<u8, u64>(byte_slice) {
115        u64_slice.to_vec()
116    } else {
117        // Slow path: create aligned Vec<u64> and copy data
118        let u64_count = byte_slice.len() / std::mem::size_of::<u64>();
119        let mut u64_vec = Vec::<u64>::with_capacity(u64_count);
120
121        // SAFETY: We're creating a properly sized slice from uninitialized but allocated memory
122        // to copy bytes into. The slice has exactly the right size for the byte data.
123        let dest_slice = unsafe {
124            std::slice::from_raw_parts_mut(u64_vec.as_mut_ptr() as *mut u8, byte_slice.len())
125        };
126        dest_slice.copy_from_slice(byte_slice);
127
128        // SAFETY: We've just initialized exactly u64_count elements worth of bytes
129        unsafe { u64_vec.set_len(u64_count) };
130        u64_vec
131    };
132
133    // Convert from platform endianness to little endian if needed
134    // Just in case.
135    #[cfg(target_endian = "little")]
136    {
137        u64_vec
138    }
139    #[cfg(target_endian = "big")]
140    {
141        u64_vec.into_iter().map(|x| x.swap_bytes()).collect()
142    }
143}
144
145/// `BloomFilterReader` reads the bloom filter from the file.
146#[async_trait]
147pub trait BloomFilterReader: Sync {
148    /// Reads range of bytes from the file.
149    async fn range_read(
150        &self,
151        offset: u64,
152        size: u32,
153        metrics: Option<&mut BloomFilterReadMetrics>,
154    ) -> Result<Bytes>;
155
156    /// Reads bunch of ranges from the file.
157    async fn read_vec(
158        &self,
159        ranges: &[Range<u64>],
160        metrics: Option<&mut BloomFilterReadMetrics>,
161    ) -> Result<Vec<Bytes>>;
162
163    /// Reads the meta information of the bloom filter.
164    async fn metadata(
165        &self,
166        metrics: Option<&mut BloomFilterReadMetrics>,
167    ) -> Result<BloomFilterMeta>;
168
169    /// Reads a bloom filter with the given location.
170    async fn bloom_filter(
171        &self,
172        loc: &BloomFilterLoc,
173        metrics: Option<&mut BloomFilterReadMetrics>,
174    ) -> Result<BloomFilter> {
175        let bytes = self.range_read(loc.offset, loc.size as _, metrics).await?;
176        let vec = bytes_to_u64_vec(&bytes);
177        let bm = BloomFilter::from_vec(vec)
178            .seed(&SEED)
179            .expected_items(loc.element_count as _);
180        Ok(bm)
181    }
182
183    async fn bloom_filter_vec(
184        &self,
185        locs: &[BloomFilterLoc],
186        metrics: Option<&mut BloomFilterReadMetrics>,
187    ) -> Result<Vec<BloomFilter>> {
188        let ranges = locs
189            .iter()
190            .map(|l| l.offset..l.offset + l.size)
191            .collect::<Vec<_>>();
192        let bss = self.read_vec(&ranges, metrics).await?;
193
194        let mut result = Vec::with_capacity(bss.len());
195        for (bs, loc) in bss.into_iter().zip(locs.iter()) {
196            let vec = bytes_to_u64_vec(&bs);
197            let bm = BloomFilter::from_vec(vec)
198                .seed(&SEED)
199                .expected_items(loc.element_count as _);
200            result.push(bm);
201        }
202
203        Ok(result)
204    }
205}
206
207/// `BloomFilterReaderImpl` reads the bloom filter from the file.
208pub struct BloomFilterReaderImpl<R: RangeReader> {
209    /// The underlying reader.
210    reader: R,
211}
212
213impl<R: RangeReader> BloomFilterReaderImpl<R> {
214    /// Creates a new `BloomFilterReaderImpl` with the given reader.
215    pub fn new(reader: R) -> Self {
216        Self { reader }
217    }
218}
219
220#[async_trait]
221impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
222    async fn range_read(
223        &self,
224        offset: u64,
225        size: u32,
226        metrics: Option<&mut BloomFilterReadMetrics>,
227    ) -> Result<Bytes> {
228        let start = metrics.as_ref().map(|_| Instant::now());
229        let result = self
230            .reader
231            .read(offset..offset + size as u64)
232            .await
233            .context(IoSnafu)?;
234
235        if let Some(m) = metrics {
236            m.total_ranges += 1;
237            m.total_bytes += size as u64;
238            if let Some(start) = start {
239                m.fetch_elapsed += start.elapsed();
240            }
241        }
242
243        Ok(result)
244    }
245
246    async fn read_vec(
247        &self,
248        ranges: &[Range<u64>],
249        metrics: Option<&mut BloomFilterReadMetrics>,
250    ) -> Result<Vec<Bytes>> {
251        let start = metrics.as_ref().map(|_| Instant::now());
252        let result = self.reader.read_vec(ranges).await.context(IoSnafu)?;
253
254        if let Some(m) = metrics {
255            m.total_ranges += ranges.len();
256            m.total_bytes += ranges.iter().map(|r| r.end - r.start).sum::<u64>();
257            if let Some(start) = start {
258                m.fetch_elapsed += start.elapsed();
259            }
260        }
261
262        Ok(result)
263    }
264
265    async fn metadata(
266        &self,
267        metrics: Option<&mut BloomFilterReadMetrics>,
268    ) -> Result<BloomFilterMeta> {
269        let metadata = self.reader.metadata().await.context(IoSnafu)?;
270        let file_size = metadata.content_length;
271
272        let mut meta_reader =
273            BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
274        meta_reader.metadata(metrics).await
275    }
276}
277
278/// `BloomFilterMetaReader` reads the metadata of the bloom filter.
279struct BloomFilterMetaReader<R: RangeReader> {
280    reader: R,
281    file_size: u64,
282    prefetch_size: u64,
283}
284
285impl<R: RangeReader> BloomFilterMetaReader<R> {
286    pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
287        Self {
288            reader,
289            file_size,
290            prefetch_size: prefetch_size
291                .unwrap_or(BLOOM_META_LEN_SIZE)
292                .max(BLOOM_META_LEN_SIZE),
293        }
294    }
295
296    /// Reads the metadata of the bloom filter.
297    ///
298    /// It will first prefetch some bytes from the end of the file,
299    /// then parse the metadata from the prefetch bytes.
300    pub async fn metadata(
301        &mut self,
302        metrics: Option<&mut BloomFilterReadMetrics>,
303    ) -> Result<BloomFilterMeta> {
304        ensure!(
305            self.file_size >= BLOOM_META_LEN_SIZE,
306            FileSizeTooSmallSnafu {
307                size: self.file_size,
308            }
309        );
310
311        let start = metrics.as_ref().map(|_| Instant::now());
312        let meta_start = self.file_size.saturating_sub(self.prefetch_size);
313        let suffix = self
314            .reader
315            .read(meta_start..self.file_size)
316            .await
317            .context(IoSnafu)?;
318        let suffix_len = suffix.len();
319        let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
320        self.validate_meta_size(length)?;
321
322        if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
323            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
324            let meta = self
325                .reader
326                .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
327                .await
328                .context(IoSnafu)?;
329
330            if let Some(m) = metrics {
331                // suffix read + meta read
332                m.total_ranges += 2;
333                // Ignores the meta length size to simplify the calculation.
334                m.total_bytes += self.file_size.min(self.prefetch_size) + length;
335                if let Some(start) = start {
336                    m.fetch_elapsed += start.elapsed();
337                }
338            }
339
340            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
341        } else {
342            if let Some(m) = metrics {
343                // suffix read only
344                m.total_ranges += 1;
345                m.total_bytes += self.file_size.min(self.prefetch_size);
346                if let Some(start) = start {
347                    m.fetch_elapsed += start.elapsed();
348                }
349            }
350
351            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
352            let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
353            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
354        }
355    }
356
357    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
358        let suffix_len = suffix.len();
359        ensure!(
360            suffix_len >= 4,
361            FileSizeTooSmallSnafu {
362                size: suffix_len as u64
363            }
364        );
365        let mut bytes = [0; 4];
366        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
367
368        Ok(bytes)
369    }
370
371    fn validate_meta_size(&self, length: u64) -> Result<()> {
372        let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
373        ensure!(
374            length <= max_meta_size,
375            UnexpectedMetaSizeSnafu {
376                max_meta_size,
377                actual_meta_size: length,
378            }
379        );
380        Ok(())
381    }
382}
383
384#[cfg(test)]
385mod tests {
386    use std::sync::Arc;
387    use std::sync::atomic::AtomicUsize;
388
389    use futures::io::Cursor;
390
391    use super::*;
392    use crate::bloom_filter::creator::BloomFilterCreator;
393    use crate::external_provider::MockExternalTempFileProvider;
394
395    async fn mock_bloom_filter_bytes() -> Vec<u8> {
396        let mut writer = Cursor::new(vec![]);
397        let mut creator = BloomFilterCreator::new(
398            2,
399            0.01,
400            Arc::new(MockExternalTempFileProvider::new()),
401            Arc::new(AtomicUsize::new(0)),
402            None,
403        );
404
405        creator
406            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
407            .await
408            .unwrap();
409        creator
410            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
411            .await
412            .unwrap();
413        creator
414            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
415            .await
416            .unwrap();
417
418        creator.finish(&mut writer).await.unwrap();
419
420        writer.into_inner()
421    }
422
423    #[tokio::test]
424    async fn test_bloom_filter_meta_reader() {
425        let bytes = mock_bloom_filter_bytes().await;
426        let file_size = bytes.len() as u64;
427
428        for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
429            let mut reader =
430                BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
431            let meta = reader.metadata(None).await.unwrap();
432
433            assert_eq!(meta.rows_per_segment, 2);
434            assert_eq!(meta.segment_count, 2);
435            assert_eq!(meta.row_count, 3);
436            assert_eq!(meta.bloom_filter_locs.len(), 2);
437
438            assert_eq!(meta.bloom_filter_locs[0].offset, 0);
439            assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
440            assert_eq!(
441                meta.bloom_filter_locs[1].offset,
442                meta.bloom_filter_locs[0].size
443            );
444            assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
445        }
446    }
447
448    #[tokio::test]
449    async fn test_bloom_filter_reader() {
450        let bytes = mock_bloom_filter_bytes().await;
451
452        let reader = BloomFilterReaderImpl::new(bytes);
453        let meta = reader.metadata(None).await.unwrap();
454
455        assert_eq!(meta.bloom_filter_locs.len(), 2);
456        let bf = reader
457            .bloom_filter(&meta.bloom_filter_locs[0], None)
458            .await
459            .unwrap();
460        assert!(bf.contains(&b"a"));
461        assert!(bf.contains(&b"b"));
462        assert!(bf.contains(&b"c"));
463        assert!(bf.contains(&b"d"));
464
465        let bf = reader
466            .bloom_filter(&meta.bloom_filter_locs[1], None)
467            .await
468            .unwrap();
469        assert!(bf.contains(&b"e"));
470        assert!(bf.contains(&b"f"));
471    }
472}