index/bloom_filter/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::{Range, Rem};
16
17use async_trait::async_trait;
18use bytemuck::try_cast_slice;
19use bytes::Bytes;
20use common_base::range_read::RangeReader;
21use fastbloom::BloomFilter;
22use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
23use prost::Message;
24use snafu::{ensure, ResultExt};
25
26use crate::bloom_filter::error::{
27    DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
28};
29use crate::bloom_filter::SEED;
30
31/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
32const BLOOM_META_LEN_SIZE: u64 = 4;
33
34/// Default prefetch size of bloom filter meta.
35pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
36
37/// Safely converts bytes to Vec<u64> using bytemuck for optimal performance.
38/// Faster than chunking and converting each piece individually.
39///
40/// The input bytes are a sequence of little-endian u64s.
41pub fn bytes_to_u64_vec(bytes: &Bytes) -> Vec<u64> {
42    // drop tailing things, this keeps the same behavior with `chunks_exact`.
43    let aligned_length = bytes.len() - bytes.len().rem(std::mem::size_of::<u64>());
44    let byte_slice = &bytes[..aligned_length];
45
46    // Try fast path first: direct cast if aligned
47    let u64_vec = if let Ok(u64_slice) = try_cast_slice::<u8, u64>(byte_slice) {
48        u64_slice.to_vec()
49    } else {
50        // Slow path: create aligned Vec<u64> and copy data
51        let u64_count = byte_slice.len() / std::mem::size_of::<u64>();
52        let mut u64_vec = Vec::<u64>::with_capacity(u64_count);
53
54        // SAFETY: We're creating a properly sized slice from uninitialized but allocated memory
55        // to copy bytes into. The slice has exactly the right size for the byte data.
56        let dest_slice = unsafe {
57            std::slice::from_raw_parts_mut(u64_vec.as_mut_ptr() as *mut u8, byte_slice.len())
58        };
59        dest_slice.copy_from_slice(byte_slice);
60
61        // SAFETY: We've just initialized exactly u64_count elements worth of bytes
62        unsafe { u64_vec.set_len(u64_count) };
63        u64_vec
64    };
65
66    // Convert from platform endianness to little endian if needed
67    // Just in case.
68    #[cfg(target_endian = "little")]
69    {
70        u64_vec
71    }
72    #[cfg(target_endian = "big")]
73    {
74        u64_vec.into_iter().map(|x| x.swap_bytes()).collect()
75    }
76}
77
78/// `BloomFilterReader` reads the bloom filter from the file.
79#[async_trait]
80pub trait BloomFilterReader: Sync {
81    /// Reads range of bytes from the file.
82    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
83
84    /// Reads bunch of ranges from the file.
85    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
86        let mut results = Vec::with_capacity(ranges.len());
87        for range in ranges {
88            let size = (range.end - range.start) as u32;
89            let data = self.range_read(range.start, size).await?;
90            results.push(data);
91        }
92        Ok(results)
93    }
94
95    /// Reads the meta information of the bloom filter.
96    async fn metadata(&self) -> Result<BloomFilterMeta>;
97
98    /// Reads a bloom filter with the given location.
99    async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
100        let bytes = self.range_read(loc.offset, loc.size as _).await?;
101        let vec = bytes_to_u64_vec(&bytes);
102        let bm = BloomFilter::from_vec(vec)
103            .seed(&SEED)
104            .expected_items(loc.element_count as _);
105        Ok(bm)
106    }
107
108    async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
109        let ranges = locs
110            .iter()
111            .map(|l| l.offset..l.offset + l.size)
112            .collect::<Vec<_>>();
113        let bss = self.read_vec(&ranges).await?;
114
115        let mut result = Vec::with_capacity(bss.len());
116        for (bs, loc) in bss.into_iter().zip(locs.iter()) {
117            let vec = bytes_to_u64_vec(&bs);
118            let bm = BloomFilter::from_vec(vec)
119                .seed(&SEED)
120                .expected_items(loc.element_count as _);
121            result.push(bm);
122        }
123
124        Ok(result)
125    }
126}
127
128/// `BloomFilterReaderImpl` reads the bloom filter from the file.
129pub struct BloomFilterReaderImpl<R: RangeReader> {
130    /// The underlying reader.
131    reader: R,
132}
133
134impl<R: RangeReader> BloomFilterReaderImpl<R> {
135    /// Creates a new `BloomFilterReaderImpl` with the given reader.
136    pub fn new(reader: R) -> Self {
137        Self { reader }
138    }
139}
140
141#[async_trait]
142impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
143    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
144        self.reader
145            .read(offset..offset + size as u64)
146            .await
147            .context(IoSnafu)
148    }
149
150    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
151        self.reader.read_vec(ranges).await.context(IoSnafu)
152    }
153
154    async fn metadata(&self) -> Result<BloomFilterMeta> {
155        let metadata = self.reader.metadata().await.context(IoSnafu)?;
156        let file_size = metadata.content_length;
157
158        let mut meta_reader =
159            BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
160        meta_reader.metadata().await
161    }
162}
163
164/// `BloomFilterMetaReader` reads the metadata of the bloom filter.
165struct BloomFilterMetaReader<R: RangeReader> {
166    reader: R,
167    file_size: u64,
168    prefetch_size: u64,
169}
170
171impl<R: RangeReader> BloomFilterMetaReader<R> {
172    pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
173        Self {
174            reader,
175            file_size,
176            prefetch_size: prefetch_size
177                .unwrap_or(BLOOM_META_LEN_SIZE)
178                .max(BLOOM_META_LEN_SIZE),
179        }
180    }
181
182    /// Reads the metadata of the bloom filter.
183    ///
184    /// It will first prefetch some bytes from the end of the file,
185    /// then parse the metadata from the prefetch bytes.
186    pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
187        ensure!(
188            self.file_size >= BLOOM_META_LEN_SIZE,
189            FileSizeTooSmallSnafu {
190                size: self.file_size,
191            }
192        );
193
194        let meta_start = self.file_size.saturating_sub(self.prefetch_size);
195        let suffix = self
196            .reader
197            .read(meta_start..self.file_size)
198            .await
199            .context(IoSnafu)?;
200        let suffix_len = suffix.len();
201        let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
202        self.validate_meta_size(length)?;
203
204        if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
205            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
206            let meta = self
207                .reader
208                .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
209                .await
210                .context(IoSnafu)?;
211            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
212        } else {
213            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
214            let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
215            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
216        }
217    }
218
219    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
220        let suffix_len = suffix.len();
221        ensure!(
222            suffix_len >= 4,
223            FileSizeTooSmallSnafu {
224                size: suffix_len as u64
225            }
226        );
227        let mut bytes = [0; 4];
228        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
229
230        Ok(bytes)
231    }
232
233    fn validate_meta_size(&self, length: u64) -> Result<()> {
234        let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
235        ensure!(
236            length <= max_meta_size,
237            UnexpectedMetaSizeSnafu {
238                max_meta_size,
239                actual_meta_size: length,
240            }
241        );
242        Ok(())
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::sync::atomic::AtomicUsize;
249    use std::sync::Arc;
250
251    use futures::io::Cursor;
252
253    use super::*;
254    use crate::bloom_filter::creator::BloomFilterCreator;
255    use crate::external_provider::MockExternalTempFileProvider;
256
257    async fn mock_bloom_filter_bytes() -> Vec<u8> {
258        let mut writer = Cursor::new(vec![]);
259        let mut creator = BloomFilterCreator::new(
260            2,
261            0.01,
262            Arc::new(MockExternalTempFileProvider::new()),
263            Arc::new(AtomicUsize::new(0)),
264            None,
265        );
266
267        creator
268            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
269            .await
270            .unwrap();
271        creator
272            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
273            .await
274            .unwrap();
275        creator
276            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
277            .await
278            .unwrap();
279
280        creator.finish(&mut writer).await.unwrap();
281
282        writer.into_inner()
283    }
284
285    #[tokio::test]
286    async fn test_bloom_filter_meta_reader() {
287        let bytes = mock_bloom_filter_bytes().await;
288        let file_size = bytes.len() as u64;
289
290        for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
291            let mut reader =
292                BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
293            let meta = reader.metadata().await.unwrap();
294
295            assert_eq!(meta.rows_per_segment, 2);
296            assert_eq!(meta.segment_count, 2);
297            assert_eq!(meta.row_count, 3);
298            assert_eq!(meta.bloom_filter_locs.len(), 2);
299
300            assert_eq!(meta.bloom_filter_locs[0].offset, 0);
301            assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
302            assert_eq!(
303                meta.bloom_filter_locs[1].offset,
304                meta.bloom_filter_locs[0].size
305            );
306            assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
307        }
308    }
309
310    #[tokio::test]
311    async fn test_bloom_filter_reader() {
312        let bytes = mock_bloom_filter_bytes().await;
313
314        let reader = BloomFilterReaderImpl::new(bytes);
315        let meta = reader.metadata().await.unwrap();
316
317        assert_eq!(meta.bloom_filter_locs.len(), 2);
318        let bf = reader
319            .bloom_filter(&meta.bloom_filter_locs[0])
320            .await
321            .unwrap();
322        assert!(bf.contains(&b"a"));
323        assert!(bf.contains(&b"b"));
324        assert!(bf.contains(&b"c"));
325        assert!(bf.contains(&b"d"));
326
327        let bf = reader
328            .bloom_filter(&meta.bloom_filter_locs[1])
329            .await
330            .unwrap();
331        assert!(bf.contains(&b"e"));
332        assert!(bf.contains(&b"f"));
333    }
334}