mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::region_request::PathType;
30use store_api::storage::ColumnId;
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
40    Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::RegionFileId;
44pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
45use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
47use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
48
49pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
50
51/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
52pub struct BloomFilterIndexApplier {
53    /// Directory of the table.
54    table_dir: String,
55
56    /// Path type for generating file paths.
57    path_type: PathType,
58
59    /// Object store to read the index file.
60    object_store: ObjectStore,
61
62    /// File cache to read the index file.
63    file_cache: Option<FileCacheRef>,
64
65    /// Factory to create puffin manager.
66    puffin_manager_factory: PuffinManagerFactory,
67
68    /// Cache for puffin metadata.
69    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
70
71    /// Cache for bloom filter index.
72    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
73
74    /// Bloom filter predicates.
75    /// For each column, the value will be retained only if it contains __all__ predicates.
76    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
77
78    /// Predicate key. Used to identify the predicate and fetch result from cache.
79    predicate_key: PredicateKey,
80}
81
82impl BloomFilterIndexApplier {
83    /// Creates a new `BloomFilterIndexApplier`.
84    ///
85    /// For each column, the value will be retained only if it contains __all__ predicates.
86    pub fn new(
87        table_dir: String,
88        path_type: PathType,
89        object_store: ObjectStore,
90        puffin_manager_factory: PuffinManagerFactory,
91        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
92    ) -> Self {
93        let predicates = Arc::new(predicates);
94        Self {
95            table_dir,
96            path_type,
97            object_store,
98            file_cache: None,
99            puffin_manager_factory,
100            puffin_metadata_cache: None,
101            bloom_filter_index_cache: None,
102            predicate_key: PredicateKey::new_bloom(predicates.clone()),
103            predicates,
104        }
105    }
106
107    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
108        self.file_cache = file_cache;
109        self
110    }
111
112    pub fn with_puffin_metadata_cache(
113        mut self,
114        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
115    ) -> Self {
116        self.puffin_metadata_cache = puffin_metadata_cache;
117        self
118    }
119
120    pub fn with_bloom_filter_cache(
121        mut self,
122        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
123    ) -> Self {
124        self.bloom_filter_index_cache = bloom_filter_index_cache;
125        self
126    }
127
128    /// Applies bloom filter predicates to the provided SST file and returns a
129    /// list of row group ranges that match the predicates.
130    ///
131    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
132    ///
133    /// Row group id existing in the returned result means that the row group is searched.
134    /// Empty ranges means that the row group is searched but no rows are found.
135    pub async fn apply(
136        &self,
137        file_id: RegionFileId,
138        file_size_hint: Option<u64>,
139        row_groups: impl Iterator<Item = (usize, bool)>,
140    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
141        let _timer = INDEX_APPLY_ELAPSED
142            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
143            .start_timer();
144
145        // Calculates row groups' ranges based on start of the file.
146        let mut input = Vec::with_capacity(row_groups.size_hint().0);
147        let mut start = 0;
148        for (i, (len, to_search)) in row_groups.enumerate() {
149            let end = start + len;
150            if to_search {
151                input.push((i, start..end));
152            }
153            start = end;
154        }
155
156        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
157        // so we need to adjust them later.
158        let mut output = input
159            .iter()
160            .map(|(i, range)| (*i, vec![range.clone()]))
161            .collect::<Vec<_>>();
162
163        for (column_id, predicates) in self.predicates.iter() {
164            let blob = match self
165                .blob_reader(file_id, *column_id, file_size_hint)
166                .await?
167            {
168                Some(blob) => blob,
169                None => continue,
170            };
171
172            // Create appropriate reader based on whether we have caching enabled
173            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
174                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
175                let reader = CachedBloomFilterIndexBlobReader::new(
176                    file_id.file_id(),
177                    *column_id,
178                    Tag::Skipping,
179                    blob_size,
180                    BloomFilterReaderImpl::new(blob),
181                    bloom_filter_cache.clone(),
182                );
183                self.apply_predicates(reader, predicates, &mut output)
184                    .await
185                    .context(ApplyBloomFilterIndexSnafu)?;
186            } else {
187                let reader = BloomFilterReaderImpl::new(blob);
188                self.apply_predicates(reader, predicates, &mut output)
189                    .await
190                    .context(ApplyBloomFilterIndexSnafu)?;
191            }
192        }
193
194        // adjust ranges to be based on row group
195        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
196            let start = input.start;
197            for range in output.iter_mut() {
198                range.start -= start;
199                range.end -= start;
200            }
201        }
202
203        Ok(output)
204    }
205
206    /// Creates a blob reader from the cached or remote index file.
207    ///
208    /// Returus `None` if the column does not have an index.
209    async fn blob_reader(
210        &self,
211        file_id: RegionFileId,
212        column_id: ColumnId,
213        file_size_hint: Option<u64>,
214    ) -> Result<Option<BlobReader>> {
215        let reader = match self
216            .cached_blob_reader(file_id, column_id, file_size_hint)
217            .await
218        {
219            Ok(Some(puffin_reader)) => puffin_reader,
220            other => {
221                if let Err(err) = other {
222                    // Blob not found means no index for this column
223                    if is_blob_not_found(&err) {
224                        return Ok(None);
225                    }
226                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
227                }
228                let res = self
229                    .remote_blob_reader(file_id, column_id, file_size_hint)
230                    .await;
231                if let Err(err) = res {
232                    // Blob not found means no index for this column
233                    if is_blob_not_found(&err) {
234                        return Ok(None);
235                    }
236                    return Err(err);
237                }
238
239                res?
240            }
241        };
242
243        Ok(Some(reader))
244    }
245
246    /// Creates a blob reader from the cached index file
247    async fn cached_blob_reader(
248        &self,
249        file_id: RegionFileId,
250        column_id: ColumnId,
251        file_size_hint: Option<u64>,
252    ) -> Result<Option<BlobReader>> {
253        let Some(file_cache) = &self.file_cache else {
254            return Ok(None);
255        };
256
257        let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
258        if file_cache.get(index_key).await.is_none() {
259            return Ok(None);
260        };
261
262        let puffin_manager = self.puffin_manager_factory.build(
263            file_cache.local_store(),
264            WriteCachePathProvider::new(file_cache.clone()),
265        );
266        let reader = puffin_manager
267            .reader(&file_id)
268            .await
269            .context(PuffinBuildReaderSnafu)?
270            .with_file_size_hint(file_size_hint)
271            .blob(&Self::column_blob_name(column_id))
272            .await
273            .context(PuffinReadBlobSnafu)?
274            .reader()
275            .await
276            .context(PuffinBuildReaderSnafu)?;
277        Ok(Some(reader))
278    }
279
280    // TODO(ruihang): use the same util with the code in creator
281    fn column_blob_name(column_id: ColumnId) -> String {
282        format!("{INDEX_BLOB_TYPE}-{column_id}")
283    }
284
285    /// Creates a blob reader from the remote index file
286    async fn remote_blob_reader(
287        &self,
288        file_id: RegionFileId,
289        column_id: ColumnId,
290        file_size_hint: Option<u64>,
291    ) -> Result<BlobReader> {
292        let puffin_manager = self
293            .puffin_manager_factory
294            .build(
295                self.object_store.clone(),
296                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
297            )
298            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
299
300        puffin_manager
301            .reader(&file_id)
302            .await
303            .context(PuffinBuildReaderSnafu)?
304            .with_file_size_hint(file_size_hint)
305            .blob(&Self::column_blob_name(column_id))
306            .await
307            .context(PuffinReadBlobSnafu)?
308            .reader()
309            .await
310            .context(PuffinBuildReaderSnafu)
311    }
312
313    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
314        &self,
315        reader: R,
316        predicates: &[InListPredicate],
317        output: &mut [(usize, Vec<Range<usize>>)],
318    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
319        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
320
321        for (_, row_group_output) in output.iter_mut() {
322            // All rows are filtered out, skip the search
323            if row_group_output.is_empty() {
324                continue;
325            }
326
327            *row_group_output = applier.search(predicates, row_group_output).await?;
328        }
329
330        Ok(())
331    }
332
333    /// Returns the predicate key.
334    pub fn predicate_key(&self) -> &PredicateKey {
335        &self.predicate_key
336    }
337}
338
339fn is_blob_not_found(err: &Error) -> bool {
340    matches!(
341        err,
342        Error::PuffinReadBlob {
343            source: puffin::error::Error::BlobNotFound { .. },
344            ..
345        }
346    )
347}
348
349#[cfg(test)]
350mod tests {
351
352    use datafusion_expr::{col, lit, Expr};
353    use futures::future::BoxFuture;
354    use puffin::puffin_manager::PuffinWriter;
355    use store_api::metadata::RegionMetadata;
356
357    use super::*;
358    use crate::sst::file::FileId;
359    use crate::sst::index::bloom_filter::creator::tests::{
360        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
361    };
362    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
363
364    #[allow(clippy::type_complexity)]
365    fn tester(
366        table_dir: String,
367        object_store: ObjectStore,
368        metadata: &RegionMetadata,
369        puffin_manager_factory: PuffinManagerFactory,
370        file_id: RegionFileId,
371    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
372           + use<'_> {
373        move |exprs, row_groups| {
374            let table_dir = table_dir.clone();
375            let object_store: ObjectStore = object_store.clone();
376            let metadata = metadata.clone();
377            let puffin_manager_factory = puffin_manager_factory.clone();
378            let exprs = exprs.to_vec();
379
380            Box::pin(async move {
381                let builder = BloomFilterIndexApplierBuilder::new(
382                    table_dir,
383                    PathType::Bare,
384                    object_store,
385                    &metadata,
386                    puffin_manager_factory,
387                );
388
389                let applier = builder.build(&exprs).unwrap().unwrap();
390                applier
391                    .apply(file_id, None, row_groups.into_iter())
392                    .await
393                    .unwrap()
394                    .into_iter()
395                    .filter(|(_, ranges)| !ranges.is_empty())
396                    .collect()
397            })
398        }
399    }
400
401    #[tokio::test]
402    #[allow(clippy::single_range_in_vec_init)]
403    async fn test_bloom_filter_applier() {
404        // tag_str:
405        //   - type: string
406        //   - index: bloom filter
407        //   - granularity: 2
408        //   - column_id: 1
409        //
410        // ts:
411        //   - type: timestamp
412        //   - index: time index
413        //   - column_id: 2
414        //
415        // field_u64:
416        //   - type: uint64
417        //   - index: bloom filter
418        //   - granularity: 4
419        //   - column_id: 3
420        let region_metadata = mock_region_metadata();
421        let prefix = "test_bloom_filter_applier_";
422        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
423        let object_store = mock_object_store();
424        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
425        let memory_usage_threshold = Some(1024);
426        let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
427        let table_dir = "table_dir".to_string();
428
429        let mut indexer = BloomFilterIndexer::new(
430            file_id.file_id(),
431            &region_metadata,
432            intm_mgr,
433            memory_usage_threshold,
434        )
435        .unwrap()
436        .unwrap();
437
438        // push 20 rows
439        let mut batch = new_batch("tag1", 0..10);
440        indexer.update(&mut batch).await.unwrap();
441        let mut batch = new_batch("tag2", 10..20);
442        indexer.update(&mut batch).await.unwrap();
443
444        let puffin_manager = factory.build(
445            object_store.clone(),
446            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
447        );
448
449        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
450        indexer.finish(&mut puffin_writer).await.unwrap();
451        puffin_writer.finish().await.unwrap();
452
453        let tester = tester(
454            table_dir.clone(),
455            object_store.clone(),
456            &region_metadata,
457            factory.clone(),
458            file_id,
459        );
460
461        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
462        // row group: | o  row group |  o row group | o  row group     |  o row group     |
463        // tag_str:   |      o pred                 |   x pred                            |
464        let res = tester(
465            &[col("tag_str").eq(lit("tag1"))],
466            vec![(5, true), (5, true), (5, true), (5, true)],
467        )
468        .await;
469        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
470
471        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
472        // row group: | o  row group |  x row group | o  row group     |  o row group     |
473        // tag_str:   |      o pred                 |   x pred                            |
474        let res = tester(
475            &[col("tag_str").eq(lit("tag1"))],
476            vec![(5, true), (5, false), (5, true), (5, true)],
477        )
478        .await;
479        assert_eq!(res, vec![(0, vec![0..5])]);
480
481        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
482        // row group: | o  row group |  o row group | o  row group     |  o row group     |
483        // tag_str:   |      o pred                 |   x pred                            |
484        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
485        let res = tester(
486            &[
487                col("tag_str").eq(lit("tag1")),
488                col("field_u64").eq(lit(1u64)),
489            ],
490            vec![(5, true), (5, true), (5, true), (5, true)],
491        )
492        .await;
493        assert_eq!(res, vec![(0, vec![0..4])]);
494
495        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
496        // row group: | o  row group |  o row group | x  row group     |  o row group     |
497        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
498        let res = tester(
499            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
500            vec![(5, true), (5, true), (5, false), (5, true)],
501        )
502        .await;
503        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
504    }
505}