mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use index::target::IndexTarget;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::region_request::PathType;
31use store_api::storage::ColumnId;
32
33use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
34use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
35use crate::cache::index::bloom_filter_index::{
36    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
37};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
41    Result,
42};
43use crate::metrics::INDEX_APPLY_ELAPSED;
44use crate::sst::file::RegionFileId;
45use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
46use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
47pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
48use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
49
50pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
51
52/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
53pub struct BloomFilterIndexApplier {
54    /// Directory of the table.
55    table_dir: String,
56
57    /// Path type for generating file paths.
58    path_type: PathType,
59
60    /// Object store to read the index file.
61    object_store: ObjectStore,
62
63    /// File cache to read the index file.
64    file_cache: Option<FileCacheRef>,
65
66    /// Factory to create puffin manager.
67    puffin_manager_factory: PuffinManagerFactory,
68
69    /// Cache for puffin metadata.
70    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
71
72    /// Cache for bloom filter index.
73    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
74
75    /// Bloom filter predicates.
76    /// For each column, the value will be retained only if it contains __all__ predicates.
77    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
78
79    /// Predicate key. Used to identify the predicate and fetch result from cache.
80    predicate_key: PredicateKey,
81}
82
83impl BloomFilterIndexApplier {
84    /// Creates a new `BloomFilterIndexApplier`.
85    ///
86    /// For each column, the value will be retained only if it contains __all__ predicates.
87    pub fn new(
88        table_dir: String,
89        path_type: PathType,
90        object_store: ObjectStore,
91        puffin_manager_factory: PuffinManagerFactory,
92        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
93    ) -> Self {
94        let predicates = Arc::new(predicates);
95        Self {
96            table_dir,
97            path_type,
98            object_store,
99            file_cache: None,
100            puffin_manager_factory,
101            puffin_metadata_cache: None,
102            bloom_filter_index_cache: None,
103            predicate_key: PredicateKey::new_bloom(predicates.clone()),
104            predicates,
105        }
106    }
107
108    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
109        self.file_cache = file_cache;
110        self
111    }
112
113    pub fn with_puffin_metadata_cache(
114        mut self,
115        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
116    ) -> Self {
117        self.puffin_metadata_cache = puffin_metadata_cache;
118        self
119    }
120
121    pub fn with_bloom_filter_cache(
122        mut self,
123        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
124    ) -> Self {
125        self.bloom_filter_index_cache = bloom_filter_index_cache;
126        self
127    }
128
129    /// Applies bloom filter predicates to the provided SST file and returns a
130    /// list of row group ranges that match the predicates.
131    ///
132    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
133    ///
134    /// Row group id existing in the returned result means that the row group is searched.
135    /// Empty ranges means that the row group is searched but no rows are found.
136    pub async fn apply(
137        &self,
138        file_id: RegionFileId,
139        file_size_hint: Option<u64>,
140        row_groups: impl Iterator<Item = (usize, bool)>,
141    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
142        let _timer = INDEX_APPLY_ELAPSED
143            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
144            .start_timer();
145
146        // Calculates row groups' ranges based on start of the file.
147        let mut input = Vec::with_capacity(row_groups.size_hint().0);
148        let mut start = 0;
149        for (i, (len, to_search)) in row_groups.enumerate() {
150            let end = start + len;
151            if to_search {
152                input.push((i, start..end));
153            }
154            start = end;
155        }
156
157        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
158        // so we need to adjust them later.
159        let mut output = input
160            .iter()
161            .map(|(i, range)| (*i, vec![range.clone()]))
162            .collect::<Vec<_>>();
163
164        for (column_id, predicates) in self.predicates.iter() {
165            let blob = match self
166                .blob_reader(file_id, *column_id, file_size_hint)
167                .await?
168            {
169                Some(blob) => blob,
170                None => continue,
171            };
172
173            // Create appropriate reader based on whether we have caching enabled
174            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
175                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
176                let reader = CachedBloomFilterIndexBlobReader::new(
177                    file_id.file_id(),
178                    *column_id,
179                    Tag::Skipping,
180                    blob_size,
181                    BloomFilterReaderImpl::new(blob),
182                    bloom_filter_cache.clone(),
183                );
184                self.apply_predicates(reader, predicates, &mut output)
185                    .await
186                    .context(ApplyBloomFilterIndexSnafu)?;
187            } else {
188                let reader = BloomFilterReaderImpl::new(blob);
189                self.apply_predicates(reader, predicates, &mut output)
190                    .await
191                    .context(ApplyBloomFilterIndexSnafu)?;
192            }
193        }
194
195        // adjust ranges to be based on row group
196        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
197            let start = input.start;
198            for range in output.iter_mut() {
199                range.start -= start;
200                range.end -= start;
201            }
202        }
203
204        Ok(output)
205    }
206
207    /// Creates a blob reader from the cached or remote index file.
208    ///
209    /// Returus `None` if the column does not have an index.
210    async fn blob_reader(
211        &self,
212        file_id: RegionFileId,
213        column_id: ColumnId,
214        file_size_hint: Option<u64>,
215    ) -> Result<Option<BlobReader>> {
216        let reader = match self
217            .cached_blob_reader(file_id, column_id, file_size_hint)
218            .await
219        {
220            Ok(Some(puffin_reader)) => puffin_reader,
221            other => {
222                if let Err(err) = other {
223                    // Blob not found means no index for this column
224                    if is_blob_not_found(&err) {
225                        return Ok(None);
226                    }
227                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
228                }
229                let res = self
230                    .remote_blob_reader(file_id, column_id, file_size_hint)
231                    .await;
232                if let Err(err) = res {
233                    // Blob not found means no index for this column
234                    if is_blob_not_found(&err) {
235                        return Ok(None);
236                    }
237                    return Err(err);
238                }
239
240                res?
241            }
242        };
243
244        Ok(Some(reader))
245    }
246
247    /// Creates a blob reader from the cached index file
248    async fn cached_blob_reader(
249        &self,
250        file_id: RegionFileId,
251        column_id: ColumnId,
252        file_size_hint: Option<u64>,
253    ) -> Result<Option<BlobReader>> {
254        let Some(file_cache) = &self.file_cache else {
255            return Ok(None);
256        };
257
258        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
259        if file_cache.get(index_key).await.is_none() {
260            return Ok(None);
261        };
262
263        let puffin_manager = self.puffin_manager_factory.build(
264            file_cache.local_store(),
265            WriteCachePathProvider::new(file_cache.clone()),
266        );
267        let blob_name = Self::column_blob_name(column_id);
268
269        let reader = puffin_manager
270            .reader(&file_id)
271            .await
272            .context(PuffinBuildReaderSnafu)?
273            .with_file_size_hint(file_size_hint)
274            .blob(&blob_name)
275            .await
276            .context(PuffinReadBlobSnafu)?
277            .reader()
278            .await
279            .context(PuffinBuildReaderSnafu)?;
280        Ok(Some(reader))
281    }
282
283    // TODO(ruihang): use the same util with the code in creator
284    fn column_blob_name(column_id: ColumnId) -> String {
285        format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
286    }
287
288    /// Creates a blob reader from the remote index file
289    async fn remote_blob_reader(
290        &self,
291        file_id: RegionFileId,
292        column_id: ColumnId,
293        file_size_hint: Option<u64>,
294    ) -> Result<BlobReader> {
295        let puffin_manager = self
296            .puffin_manager_factory
297            .build(
298                self.object_store.clone(),
299                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
300            )
301            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
302
303        let blob_name = Self::column_blob_name(column_id);
304
305        puffin_manager
306            .reader(&file_id)
307            .await
308            .context(PuffinBuildReaderSnafu)?
309            .with_file_size_hint(file_size_hint)
310            .blob(&blob_name)
311            .await
312            .context(PuffinReadBlobSnafu)?
313            .reader()
314            .await
315            .context(PuffinBuildReaderSnafu)
316    }
317
318    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
319        &self,
320        reader: R,
321        predicates: &[InListPredicate],
322        output: &mut [(usize, Vec<Range<usize>>)],
323    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
324        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
325
326        for (_, row_group_output) in output.iter_mut() {
327            // All rows are filtered out, skip the search
328            if row_group_output.is_empty() {
329                continue;
330            }
331
332            *row_group_output = applier.search(predicates, row_group_output).await?;
333        }
334
335        Ok(())
336    }
337
338    /// Returns the predicate key.
339    pub fn predicate_key(&self) -> &PredicateKey {
340        &self.predicate_key
341    }
342}
343
344fn is_blob_not_found(err: &Error) -> bool {
345    matches!(
346        err,
347        Error::PuffinReadBlob {
348            source: puffin::error::Error::BlobNotFound { .. },
349            ..
350        }
351    )
352}
353
354#[cfg(test)]
355mod tests {
356
357    use datafusion_expr::{Expr, col, lit};
358    use futures::future::BoxFuture;
359    use puffin::puffin_manager::PuffinWriter;
360    use store_api::metadata::RegionMetadata;
361    use store_api::storage::FileId;
362
363    use super::*;
364    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
365    use crate::sst::index::bloom_filter::creator::tests::{
366        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
367    };
368
369    #[allow(clippy::type_complexity)]
370    fn tester(
371        table_dir: String,
372        object_store: ObjectStore,
373        metadata: &RegionMetadata,
374        puffin_manager_factory: PuffinManagerFactory,
375        file_id: RegionFileId,
376    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
377    + use<'_> {
378        move |exprs, row_groups| {
379            let table_dir = table_dir.clone();
380            let object_store: ObjectStore = object_store.clone();
381            let metadata = metadata.clone();
382            let puffin_manager_factory = puffin_manager_factory.clone();
383            let exprs = exprs.to_vec();
384
385            Box::pin(async move {
386                let builder = BloomFilterIndexApplierBuilder::new(
387                    table_dir,
388                    PathType::Bare,
389                    object_store,
390                    &metadata,
391                    puffin_manager_factory,
392                );
393
394                let applier = builder.build(&exprs).unwrap().unwrap();
395                applier
396                    .apply(file_id, None, row_groups.into_iter())
397                    .await
398                    .unwrap()
399                    .into_iter()
400                    .filter(|(_, ranges)| !ranges.is_empty())
401                    .collect()
402            })
403        }
404    }
405
406    #[tokio::test]
407    #[allow(clippy::single_range_in_vec_init)]
408    async fn test_bloom_filter_applier() {
409        // tag_str:
410        //   - type: string
411        //   - index: bloom filter
412        //   - granularity: 2
413        //   - column_id: 1
414        //
415        // ts:
416        //   - type: timestamp
417        //   - index: time index
418        //   - column_id: 2
419        //
420        // field_u64:
421        //   - type: uint64
422        //   - index: bloom filter
423        //   - granularity: 4
424        //   - column_id: 3
425        let region_metadata = mock_region_metadata();
426        let prefix = "test_bloom_filter_applier_";
427        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
428        let object_store = mock_object_store();
429        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
430        let memory_usage_threshold = Some(1024);
431        let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
432        let table_dir = "table_dir".to_string();
433
434        let mut indexer = BloomFilterIndexer::new(
435            file_id.file_id(),
436            &region_metadata,
437            intm_mgr,
438            memory_usage_threshold,
439        )
440        .unwrap()
441        .unwrap();
442
443        // push 20 rows
444        let mut batch = new_batch("tag1", 0..10);
445        indexer.update(&mut batch).await.unwrap();
446        let mut batch = new_batch("tag2", 10..20);
447        indexer.update(&mut batch).await.unwrap();
448
449        let puffin_manager = factory.build(
450            object_store.clone(),
451            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
452        );
453
454        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
455        indexer.finish(&mut puffin_writer).await.unwrap();
456        puffin_writer.finish().await.unwrap();
457
458        let tester = tester(
459            table_dir.clone(),
460            object_store.clone(),
461            &region_metadata,
462            factory.clone(),
463            file_id,
464        );
465
466        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
467        // row group: | o  row group |  o row group | o  row group     |  o row group     |
468        // tag_str:   |      o pred                 |   x pred                            |
469        let res = tester(
470            &[col("tag_str").eq(lit("tag1"))],
471            vec![(5, true), (5, true), (5, true), (5, true)],
472        )
473        .await;
474        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
475
476        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
477        // row group: | o  row group |  x row group | o  row group     |  o row group     |
478        // tag_str:   |      o pred                 |   x pred                            |
479        let res = tester(
480            &[col("tag_str").eq(lit("tag1"))],
481            vec![(5, true), (5, false), (5, true), (5, true)],
482        )
483        .await;
484        assert_eq!(res, vec![(0, vec![0..5])]);
485
486        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
487        // row group: | o  row group |  o row group | o  row group     |  o row group     |
488        // tag_str:   |      o pred                 |   x pred                            |
489        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
490        let res = tester(
491            &[
492                col("tag_str").eq(lit("tag1")),
493                col("field_u64").eq(lit(1u64)),
494            ],
495            vec![(5, true), (5, true), (5, true), (5, true)],
496        )
497        .await;
498        assert_eq!(res, vec![(0, vec![0..4])]);
499
500        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
501        // row group: | o  row group |  o row group | x  row group     |  o row group     |
502        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
503        let res = tester(
504            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
505            vec![(5, true), (5, true), (5, false), (5, true)],
506        )
507        .await;
508        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
509    }
510}