index/bloom_filter/
reader.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Range;
16
17use async_trait::async_trait;
18use bytes::Bytes;
19use common_base::range_read::RangeReader;
20use fastbloom::BloomFilter;
21use greptime_proto::v1::index::{BloomFilterLoc, BloomFilterMeta};
22use prost::Message;
23use snafu::{ensure, ResultExt};
24
25use crate::bloom_filter::error::{
26    DecodeProtoSnafu, FileSizeTooSmallSnafu, IoSnafu, Result, UnexpectedMetaSizeSnafu,
27};
28use crate::bloom_filter::SEED;
29
30/// Minimum size of the bloom filter, which is the size of the length of the bloom filter.
31const BLOOM_META_LEN_SIZE: u64 = 4;
32
33/// Default prefetch size of bloom filter meta.
34pub const DEFAULT_PREFETCH_SIZE: u64 = 8192; // 8KiB
35
36/// `BloomFilterReader` reads the bloom filter from the file.
37#[async_trait]
38pub trait BloomFilterReader: Sync {
39    /// Reads range of bytes from the file.
40    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes>;
41
42    /// Reads bunch of ranges from the file.
43    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
44        let mut results = Vec::with_capacity(ranges.len());
45        for range in ranges {
46            let size = (range.end - range.start) as u32;
47            let data = self.range_read(range.start, size).await?;
48            results.push(data);
49        }
50        Ok(results)
51    }
52
53    /// Reads the meta information of the bloom filter.
54    async fn metadata(&self) -> Result<BloomFilterMeta>;
55
56    /// Reads a bloom filter with the given location.
57    async fn bloom_filter(&self, loc: &BloomFilterLoc) -> Result<BloomFilter> {
58        let bytes = self.range_read(loc.offset, loc.size as _).await?;
59        let vec = bytes
60            .chunks_exact(std::mem::size_of::<u64>())
61            .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
62            .collect();
63        let bm = BloomFilter::from_vec(vec)
64            .seed(&SEED)
65            .expected_items(loc.element_count as _);
66        Ok(bm)
67    }
68
69    async fn bloom_filter_vec(&self, locs: &[BloomFilterLoc]) -> Result<Vec<BloomFilter>> {
70        let ranges = locs
71            .iter()
72            .map(|l| l.offset..l.offset + l.size)
73            .collect::<Vec<_>>();
74        let bss = self.read_vec(&ranges).await?;
75
76        let mut result = Vec::with_capacity(bss.len());
77        for (bs, loc) in bss.into_iter().zip(locs.iter()) {
78            let vec = bs
79                .chunks_exact(std::mem::size_of::<u64>())
80                .map(|chunk| u64::from_le_bytes(chunk.try_into().unwrap()))
81                .collect();
82            let bm = BloomFilter::from_vec(vec)
83                .seed(&SEED)
84                .expected_items(loc.element_count as _);
85            result.push(bm);
86        }
87
88        Ok(result)
89    }
90}
91
92/// `BloomFilterReaderImpl` reads the bloom filter from the file.
93pub struct BloomFilterReaderImpl<R: RangeReader> {
94    /// The underlying reader.
95    reader: R,
96}
97
98impl<R: RangeReader> BloomFilterReaderImpl<R> {
99    /// Creates a new `BloomFilterReaderImpl` with the given reader.
100    pub fn new(reader: R) -> Self {
101        Self { reader }
102    }
103}
104
105#[async_trait]
106impl<R: RangeReader> BloomFilterReader for BloomFilterReaderImpl<R> {
107    async fn range_read(&self, offset: u64, size: u32) -> Result<Bytes> {
108        self.reader
109            .read(offset..offset + size as u64)
110            .await
111            .context(IoSnafu)
112    }
113
114    async fn read_vec(&self, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
115        self.reader.read_vec(ranges).await.context(IoSnafu)
116    }
117
118    async fn metadata(&self) -> Result<BloomFilterMeta> {
119        let metadata = self.reader.metadata().await.context(IoSnafu)?;
120        let file_size = metadata.content_length;
121
122        let mut meta_reader =
123            BloomFilterMetaReader::new(&self.reader, file_size, Some(DEFAULT_PREFETCH_SIZE));
124        meta_reader.metadata().await
125    }
126}
127
128/// `BloomFilterMetaReader` reads the metadata of the bloom filter.
129struct BloomFilterMetaReader<R: RangeReader> {
130    reader: R,
131    file_size: u64,
132    prefetch_size: u64,
133}
134
135impl<R: RangeReader> BloomFilterMetaReader<R> {
136    pub fn new(reader: R, file_size: u64, prefetch_size: Option<u64>) -> Self {
137        Self {
138            reader,
139            file_size,
140            prefetch_size: prefetch_size
141                .unwrap_or(BLOOM_META_LEN_SIZE)
142                .max(BLOOM_META_LEN_SIZE),
143        }
144    }
145
146    /// Reads the metadata of the bloom filter.
147    ///
148    /// It will first prefetch some bytes from the end of the file,
149    /// then parse the metadata from the prefetch bytes.
150    pub async fn metadata(&mut self) -> Result<BloomFilterMeta> {
151        ensure!(
152            self.file_size >= BLOOM_META_LEN_SIZE,
153            FileSizeTooSmallSnafu {
154                size: self.file_size,
155            }
156        );
157
158        let meta_start = self.file_size.saturating_sub(self.prefetch_size);
159        let suffix = self
160            .reader
161            .read(meta_start..self.file_size)
162            .await
163            .context(IoSnafu)?;
164        let suffix_len = suffix.len();
165        let length = u32::from_le_bytes(Self::read_tailing_four_bytes(&suffix)?) as u64;
166        self.validate_meta_size(length)?;
167
168        if length > suffix_len as u64 - BLOOM_META_LEN_SIZE {
169            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE;
170            let meta = self
171                .reader
172                .read(metadata_start..self.file_size - BLOOM_META_LEN_SIZE)
173                .await
174                .context(IoSnafu)?;
175            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
176        } else {
177            let metadata_start = self.file_size - length - BLOOM_META_LEN_SIZE - meta_start;
178            let meta = &suffix[metadata_start as usize..suffix_len - BLOOM_META_LEN_SIZE as usize];
179            BloomFilterMeta::decode(meta).context(DecodeProtoSnafu)
180        }
181    }
182
183    fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> {
184        let suffix_len = suffix.len();
185        ensure!(
186            suffix_len >= 4,
187            FileSizeTooSmallSnafu {
188                size: suffix_len as u64
189            }
190        );
191        let mut bytes = [0; 4];
192        bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]);
193
194        Ok(bytes)
195    }
196
197    fn validate_meta_size(&self, length: u64) -> Result<()> {
198        let max_meta_size = self.file_size - BLOOM_META_LEN_SIZE;
199        ensure!(
200            length <= max_meta_size,
201            UnexpectedMetaSizeSnafu {
202                max_meta_size,
203                actual_meta_size: length,
204            }
205        );
206        Ok(())
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use std::sync::atomic::AtomicUsize;
213    use std::sync::Arc;
214
215    use futures::io::Cursor;
216
217    use super::*;
218    use crate::bloom_filter::creator::BloomFilterCreator;
219    use crate::external_provider::MockExternalTempFileProvider;
220
221    async fn mock_bloom_filter_bytes() -> Vec<u8> {
222        let mut writer = Cursor::new(vec![]);
223        let mut creator = BloomFilterCreator::new(
224            2,
225            Arc::new(MockExternalTempFileProvider::new()),
226            Arc::new(AtomicUsize::new(0)),
227            None,
228        );
229
230        creator
231            .push_row_elems(vec![b"a".to_vec(), b"b".to_vec()])
232            .await
233            .unwrap();
234        creator
235            .push_row_elems(vec![b"c".to_vec(), b"d".to_vec()])
236            .await
237            .unwrap();
238        creator
239            .push_row_elems(vec![b"e".to_vec(), b"f".to_vec()])
240            .await
241            .unwrap();
242
243        creator.finish(&mut writer).await.unwrap();
244
245        writer.into_inner()
246    }
247
248    #[tokio::test]
249    async fn test_bloom_filter_meta_reader() {
250        let bytes = mock_bloom_filter_bytes().await;
251        let file_size = bytes.len() as u64;
252
253        for prefetch in [0u64, file_size / 2, file_size, file_size + 10] {
254            let mut reader =
255                BloomFilterMetaReader::new(bytes.clone(), file_size as _, Some(prefetch));
256            let meta = reader.metadata().await.unwrap();
257
258            assert_eq!(meta.rows_per_segment, 2);
259            assert_eq!(meta.segment_count, 2);
260            assert_eq!(meta.row_count, 3);
261            assert_eq!(meta.bloom_filter_locs.len(), 2);
262
263            assert_eq!(meta.bloom_filter_locs[0].offset, 0);
264            assert_eq!(meta.bloom_filter_locs[0].element_count, 4);
265            assert_eq!(
266                meta.bloom_filter_locs[1].offset,
267                meta.bloom_filter_locs[0].size
268            );
269            assert_eq!(meta.bloom_filter_locs[1].element_count, 2);
270        }
271    }
272
273    #[tokio::test]
274    async fn test_bloom_filter_reader() {
275        let bytes = mock_bloom_filter_bytes().await;
276
277        let reader = BloomFilterReaderImpl::new(bytes);
278        let meta = reader.metadata().await.unwrap();
279
280        assert_eq!(meta.bloom_filter_locs.len(), 2);
281        let bf = reader
282            .bloom_filter(&meta.bloom_filter_locs[0])
283            .await
284            .unwrap();
285        assert!(bf.contains(&b"a"));
286        assert!(bf.contains(&b"b"));
287        assert!(bf.contains(&b"c"));
288        assert!(bf.contains(&b"d"));
289
290        let bf = reader
291            .bloom_filter(&meta.bloom_filter_locs[1])
292            .await
293            .unwrap();
294        assert!(bf.contains(&b"e"));
295        assert!(bf.contains(&b"f"));
296    }
297}