mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::HashMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::error::{
37    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
38    Result,
39};
40use crate::metrics::INDEX_APPLY_ELAPSED;
41use crate::sst::file::FileId;
42pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
43use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
45use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
46
47pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
48
49/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
50pub struct BloomFilterIndexApplier {
51    /// Directory of the region.
52    region_dir: String,
53
54    /// ID of the region.
55    region_id: RegionId,
56
57    /// Object store to read the index file.
58    object_store: ObjectStore,
59
60    /// File cache to read the index file.
61    file_cache: Option<FileCacheRef>,
62
63    /// Factory to create puffin manager.
64    puffin_manager_factory: PuffinManagerFactory,
65
66    /// Cache for puffin metadata.
67    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
68
69    /// Cache for bloom filter index.
70    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
71
72    /// Bloom filter predicates.
73    /// For each column, the value will be retained only if it contains __all__ predicates.
74    predicates: HashMap<ColumnId, Vec<InListPredicate>>,
75}
76
77impl BloomFilterIndexApplier {
78    /// Creates a new `BloomFilterIndexApplier`.
79    ///
80    /// For each column, the value will be retained only if it contains __all__ predicates.
81    pub fn new(
82        region_dir: String,
83        region_id: RegionId,
84        object_store: ObjectStore,
85        puffin_manager_factory: PuffinManagerFactory,
86        predicates: HashMap<ColumnId, Vec<InListPredicate>>,
87    ) -> Self {
88        Self {
89            region_dir,
90            region_id,
91            object_store,
92            file_cache: None,
93            puffin_manager_factory,
94            puffin_metadata_cache: None,
95            bloom_filter_index_cache: None,
96            predicates,
97        }
98    }
99
100    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
101        self.file_cache = file_cache;
102        self
103    }
104
105    pub fn with_puffin_metadata_cache(
106        mut self,
107        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
108    ) -> Self {
109        self.puffin_metadata_cache = puffin_metadata_cache;
110        self
111    }
112
113    pub fn with_bloom_filter_cache(
114        mut self,
115        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
116    ) -> Self {
117        self.bloom_filter_index_cache = bloom_filter_index_cache;
118        self
119    }
120
121    /// Applies bloom filter predicates to the provided SST file and returns a
122    /// list of row group ranges that match the predicates.
123    ///
124    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
125    pub async fn apply(
126        &self,
127        file_id: FileId,
128        file_size_hint: Option<u64>,
129        row_groups: impl Iterator<Item = (usize, bool)>,
130    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
131        let _timer = INDEX_APPLY_ELAPSED
132            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
133            .start_timer();
134
135        // Calculates row groups' ranges based on start of the file.
136        let mut input = Vec::with_capacity(row_groups.size_hint().0);
137        let mut start = 0;
138        for (i, (len, to_search)) in row_groups.enumerate() {
139            let end = start + len;
140            if to_search {
141                input.push((i, start..end));
142            }
143            start = end;
144        }
145
146        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
147        // so we need to adjust them later.
148        let mut output = input
149            .iter()
150            .map(|(i, range)| (*i, vec![range.clone()]))
151            .collect::<Vec<_>>();
152
153        for (column_id, predicates) in &self.predicates {
154            let blob = match self
155                .blob_reader(file_id, *column_id, file_size_hint)
156                .await?
157            {
158                Some(blob) => blob,
159                None => continue,
160            };
161
162            // Create appropriate reader based on whether we have caching enabled
163            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
164                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
165                let reader = CachedBloomFilterIndexBlobReader::new(
166                    file_id,
167                    *column_id,
168                    Tag::Skipping,
169                    blob_size,
170                    BloomFilterReaderImpl::new(blob),
171                    bloom_filter_cache.clone(),
172                );
173                self.apply_predicates(reader, predicates, &mut output)
174                    .await
175                    .context(ApplyBloomFilterIndexSnafu)?;
176            } else {
177                let reader = BloomFilterReaderImpl::new(blob);
178                self.apply_predicates(reader, predicates, &mut output)
179                    .await
180                    .context(ApplyBloomFilterIndexSnafu)?;
181            }
182        }
183
184        // adjust ranges to be based on row group
185        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
186            let start = input.start;
187            for range in output.iter_mut() {
188                range.start -= start;
189                range.end -= start;
190            }
191        }
192        output.retain(|(_, ranges)| !ranges.is_empty());
193
194        Ok(output)
195    }
196
197    /// Creates a blob reader from the cached or remote index file.
198    ///
199    /// Returus `None` if the column does not have an index.
200    async fn blob_reader(
201        &self,
202        file_id: FileId,
203        column_id: ColumnId,
204        file_size_hint: Option<u64>,
205    ) -> Result<Option<BlobReader>> {
206        let reader = match self
207            .cached_blob_reader(file_id, column_id, file_size_hint)
208            .await
209        {
210            Ok(Some(puffin_reader)) => puffin_reader,
211            other => {
212                if let Err(err) = other {
213                    // Blob not found means no index for this column
214                    if is_blob_not_found(&err) {
215                        return Ok(None);
216                    }
217                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
218                }
219                let res = self
220                    .remote_blob_reader(file_id, column_id, file_size_hint)
221                    .await;
222                if let Err(err) = res {
223                    // Blob not found means no index for this column
224                    if is_blob_not_found(&err) {
225                        return Ok(None);
226                    }
227                    return Err(err);
228                }
229
230                res?
231            }
232        };
233
234        Ok(Some(reader))
235    }
236
237    /// Creates a blob reader from the cached index file
238    async fn cached_blob_reader(
239        &self,
240        file_id: FileId,
241        column_id: ColumnId,
242        file_size_hint: Option<u64>,
243    ) -> Result<Option<BlobReader>> {
244        let Some(file_cache) = &self.file_cache else {
245            return Ok(None);
246        };
247
248        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
249        if file_cache.get(index_key).await.is_none() {
250            return Ok(None);
251        };
252
253        let puffin_manager = self.puffin_manager_factory.build(
254            file_cache.local_store(),
255            WriteCachePathProvider::new(self.region_id, file_cache.clone()),
256        );
257        let reader = puffin_manager
258            .reader(&file_id)
259            .await
260            .context(PuffinBuildReaderSnafu)?
261            .with_file_size_hint(file_size_hint)
262            .blob(&Self::column_blob_name(column_id))
263            .await
264            .context(PuffinReadBlobSnafu)?
265            .reader()
266            .await
267            .context(PuffinBuildReaderSnafu)?;
268        Ok(Some(reader))
269    }
270
271    // TODO(ruihang): use the same util with the code in creator
272    fn column_blob_name(column_id: ColumnId) -> String {
273        format!("{INDEX_BLOB_TYPE}-{column_id}")
274    }
275
276    /// Creates a blob reader from the remote index file
277    async fn remote_blob_reader(
278        &self,
279        file_id: FileId,
280        column_id: ColumnId,
281        file_size_hint: Option<u64>,
282    ) -> Result<BlobReader> {
283        let puffin_manager = self
284            .puffin_manager_factory
285            .build(
286                self.object_store.clone(),
287                RegionFilePathFactory::new(self.region_dir.clone()),
288            )
289            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
290
291        puffin_manager
292            .reader(&file_id)
293            .await
294            .context(PuffinBuildReaderSnafu)?
295            .with_file_size_hint(file_size_hint)
296            .blob(&Self::column_blob_name(column_id))
297            .await
298            .context(PuffinReadBlobSnafu)?
299            .reader()
300            .await
301            .context(PuffinBuildReaderSnafu)
302    }
303
304    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
305        &self,
306        reader: R,
307        predicates: &[InListPredicate],
308        output: &mut [(usize, Vec<Range<usize>>)],
309    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
310        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
311
312        for (_, row_group_output) in output.iter_mut() {
313            // All rows are filtered out, skip the search
314            if row_group_output.is_empty() {
315                continue;
316            }
317
318            *row_group_output = applier.search(predicates, row_group_output).await?;
319        }
320
321        Ok(())
322    }
323}
324
325fn is_blob_not_found(err: &Error) -> bool {
326    matches!(
327        err,
328        Error::PuffinReadBlob {
329            source: puffin::error::Error::BlobNotFound { .. },
330            ..
331        }
332    )
333}
334
335#[cfg(test)]
336mod tests {
337
338    use datafusion_expr::{col, lit, Expr};
339    use futures::future::BoxFuture;
340    use puffin::puffin_manager::PuffinWriter;
341    use store_api::metadata::RegionMetadata;
342
343    use super::*;
344    use crate::sst::index::bloom_filter::creator::tests::{
345        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
346    };
347    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
348
349    #[allow(clippy::type_complexity)]
350    fn tester(
351        region_dir: String,
352        object_store: ObjectStore,
353        metadata: &RegionMetadata,
354        puffin_manager_factory: PuffinManagerFactory,
355        file_id: FileId,
356    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
357           + use<'_> {
358        move |exprs, row_groups| {
359            let region_dir = region_dir.clone();
360            let object_store = object_store.clone();
361            let metadata = metadata.clone();
362            let puffin_manager_factory = puffin_manager_factory.clone();
363            let exprs = exprs.to_vec();
364
365            Box::pin(async move {
366                let builder = BloomFilterIndexApplierBuilder::new(
367                    region_dir,
368                    object_store,
369                    &metadata,
370                    puffin_manager_factory,
371                );
372
373                let applier = builder.build(&exprs).unwrap().unwrap();
374                applier
375                    .apply(file_id, None, row_groups.into_iter())
376                    .await
377                    .unwrap()
378            })
379        }
380    }
381
382    #[tokio::test]
383    #[allow(clippy::single_range_in_vec_init)]
384    async fn test_bloom_filter_applier() {
385        // tag_str:
386        //   - type: string
387        //   - index: bloom filter
388        //   - granularity: 2
389        //   - column_id: 1
390        //
391        // ts:
392        //   - type: timestamp
393        //   - index: time index
394        //   - column_id: 2
395        //
396        // field_u64:
397        //   - type: uint64
398        //   - index: bloom filter
399        //   - granularity: 4
400        //   - column_id: 3
401        let region_metadata = mock_region_metadata();
402        let prefix = "test_bloom_filter_applier_";
403        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
404        let object_store = mock_object_store();
405        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
406        let memory_usage_threshold = Some(1024);
407        let file_id = FileId::random();
408        let region_dir = "region_dir".to_string();
409
410        let mut indexer =
411            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
412                .unwrap()
413                .unwrap();
414
415        // push 20 rows
416        let mut batch = new_batch("tag1", 0..10);
417        indexer.update(&mut batch).await.unwrap();
418        let mut batch = new_batch("tag2", 10..20);
419        indexer.update(&mut batch).await.unwrap();
420
421        let puffin_manager = factory.build(
422            object_store.clone(),
423            RegionFilePathFactory::new(region_dir.clone()),
424        );
425
426        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
427        indexer.finish(&mut puffin_writer).await.unwrap();
428        puffin_writer.finish().await.unwrap();
429
430        let tester = tester(
431            region_dir.clone(),
432            object_store.clone(),
433            &region_metadata,
434            factory.clone(),
435            file_id,
436        );
437
438        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
439        // row group: | o  row group |  o row group | o  row group     |  o row group     |
440        // tag_str:   |      o pred                 |   x pred                            |
441        let res = tester(
442            &[col("tag_str").eq(lit("tag1"))],
443            vec![(5, true), (5, true), (5, true), (5, true)],
444        )
445        .await;
446        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
447
448        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
449        // row group: | o  row group |  x row group | o  row group     |  o row group     |
450        // tag_str:   |      o pred                 |   x pred                            |
451        let res = tester(
452            &[col("tag_str").eq(lit("tag1"))],
453            vec![(5, true), (5, false), (5, true), (5, true)],
454        )
455        .await;
456        assert_eq!(res, vec![(0, vec![0..5])]);
457
458        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
459        // row group: | o  row group |  o row group | o  row group     |  o row group     |
460        // tag_str:   |      o pred                 |   x pred                            |
461        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
462        let res = tester(
463            &[
464                col("tag_str").eq(lit("tag1")),
465                col("field_u64").eq(lit(1u64)),
466            ],
467            vec![(5, true), (5, true), (5, true), (5, true)],
468        )
469        .await;
470        assert_eq!(res, vec![(0, vec![0..4])]);
471
472        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
473        // row group: | o  row group |  o row group | x  row group     |  o row group     |
474        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
475        let res = tester(
476            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
477            vec![(5, true), (5, true), (5, false), (5, true)],
478        )
479        .await;
480        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
481    }
482}