mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
39    Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
44use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
47
48pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
49
50/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
51pub struct BloomFilterIndexApplier {
52    /// Directory of the region.
53    region_dir: String,
54
55    /// ID of the region.
56    region_id: RegionId,
57
58    /// Object store to read the index file.
59    object_store: ObjectStore,
60
61    /// File cache to read the index file.
62    file_cache: Option<FileCacheRef>,
63
64    /// Factory to create puffin manager.
65    puffin_manager_factory: PuffinManagerFactory,
66
67    /// Cache for puffin metadata.
68    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
69
70    /// Cache for bloom filter index.
71    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
72
73    /// Bloom filter predicates.
74    /// For each column, the value will be retained only if it contains __all__ predicates.
75    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
76
77    /// Predicate key. Used to identify the predicate and fetch result from cache.
78    predicate_key: PredicateKey,
79}
80
81impl BloomFilterIndexApplier {
82    /// Creates a new `BloomFilterIndexApplier`.
83    ///
84    /// For each column, the value will be retained only if it contains __all__ predicates.
85    pub fn new(
86        region_dir: String,
87        region_id: RegionId,
88        object_store: ObjectStore,
89        puffin_manager_factory: PuffinManagerFactory,
90        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
91    ) -> Self {
92        let predicates = Arc::new(predicates);
93        Self {
94            region_dir,
95            region_id,
96            object_store,
97            file_cache: None,
98            puffin_manager_factory,
99            puffin_metadata_cache: None,
100            bloom_filter_index_cache: None,
101            predicate_key: PredicateKey::new_bloom(predicates.clone()),
102            predicates,
103        }
104    }
105
106    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107        self.file_cache = file_cache;
108        self
109    }
110
111    pub fn with_puffin_metadata_cache(
112        mut self,
113        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114    ) -> Self {
115        self.puffin_metadata_cache = puffin_metadata_cache;
116        self
117    }
118
119    pub fn with_bloom_filter_cache(
120        mut self,
121        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
122    ) -> Self {
123        self.bloom_filter_index_cache = bloom_filter_index_cache;
124        self
125    }
126
127    /// Applies bloom filter predicates to the provided SST file and returns a
128    /// list of row group ranges that match the predicates.
129    ///
130    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
131    ///
132    /// Row group id existing in the returned result means that the row group is searched.
133    /// Empty ranges means that the row group is searched but no rows are found.
134    pub async fn apply(
135        &self,
136        file_id: FileId,
137        file_size_hint: Option<u64>,
138        row_groups: impl Iterator<Item = (usize, bool)>,
139    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
140        let _timer = INDEX_APPLY_ELAPSED
141            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
142            .start_timer();
143
144        // Calculates row groups' ranges based on start of the file.
145        let mut input = Vec::with_capacity(row_groups.size_hint().0);
146        let mut start = 0;
147        for (i, (len, to_search)) in row_groups.enumerate() {
148            let end = start + len;
149            if to_search {
150                input.push((i, start..end));
151            }
152            start = end;
153        }
154
155        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
156        // so we need to adjust them later.
157        let mut output = input
158            .iter()
159            .map(|(i, range)| (*i, vec![range.clone()]))
160            .collect::<Vec<_>>();
161
162        for (column_id, predicates) in self.predicates.iter() {
163            let blob = match self
164                .blob_reader(file_id, *column_id, file_size_hint)
165                .await?
166            {
167                Some(blob) => blob,
168                None => continue,
169            };
170
171            // Create appropriate reader based on whether we have caching enabled
172            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
173                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
174                let reader = CachedBloomFilterIndexBlobReader::new(
175                    file_id,
176                    *column_id,
177                    Tag::Skipping,
178                    blob_size,
179                    BloomFilterReaderImpl::new(blob),
180                    bloom_filter_cache.clone(),
181                );
182                self.apply_predicates(reader, predicates, &mut output)
183                    .await
184                    .context(ApplyBloomFilterIndexSnafu)?;
185            } else {
186                let reader = BloomFilterReaderImpl::new(blob);
187                self.apply_predicates(reader, predicates, &mut output)
188                    .await
189                    .context(ApplyBloomFilterIndexSnafu)?;
190            }
191        }
192
193        // adjust ranges to be based on row group
194        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
195            let start = input.start;
196            for range in output.iter_mut() {
197                range.start -= start;
198                range.end -= start;
199            }
200        }
201
202        Ok(output)
203    }
204
205    /// Creates a blob reader from the cached or remote index file.
206    ///
207    /// Returus `None` if the column does not have an index.
208    async fn blob_reader(
209        &self,
210        file_id: FileId,
211        column_id: ColumnId,
212        file_size_hint: Option<u64>,
213    ) -> Result<Option<BlobReader>> {
214        let reader = match self
215            .cached_blob_reader(file_id, column_id, file_size_hint)
216            .await
217        {
218            Ok(Some(puffin_reader)) => puffin_reader,
219            other => {
220                if let Err(err) = other {
221                    // Blob not found means no index for this column
222                    if is_blob_not_found(&err) {
223                        return Ok(None);
224                    }
225                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
226                }
227                let res = self
228                    .remote_blob_reader(file_id, column_id, file_size_hint)
229                    .await;
230                if let Err(err) = res {
231                    // Blob not found means no index for this column
232                    if is_blob_not_found(&err) {
233                        return Ok(None);
234                    }
235                    return Err(err);
236                }
237
238                res?
239            }
240        };
241
242        Ok(Some(reader))
243    }
244
245    /// Creates a blob reader from the cached index file
246    async fn cached_blob_reader(
247        &self,
248        file_id: FileId,
249        column_id: ColumnId,
250        file_size_hint: Option<u64>,
251    ) -> Result<Option<BlobReader>> {
252        let Some(file_cache) = &self.file_cache else {
253            return Ok(None);
254        };
255
256        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
257        if file_cache.get(index_key).await.is_none() {
258            return Ok(None);
259        };
260
261        let puffin_manager = self.puffin_manager_factory.build(
262            file_cache.local_store(),
263            WriteCachePathProvider::new(self.region_id, file_cache.clone()),
264        );
265        let reader = puffin_manager
266            .reader(&file_id)
267            .await
268            .context(PuffinBuildReaderSnafu)?
269            .with_file_size_hint(file_size_hint)
270            .blob(&Self::column_blob_name(column_id))
271            .await
272            .context(PuffinReadBlobSnafu)?
273            .reader()
274            .await
275            .context(PuffinBuildReaderSnafu)?;
276        Ok(Some(reader))
277    }
278
279    // TODO(ruihang): use the same util with the code in creator
280    fn column_blob_name(column_id: ColumnId) -> String {
281        format!("{INDEX_BLOB_TYPE}-{column_id}")
282    }
283
284    /// Creates a blob reader from the remote index file
285    async fn remote_blob_reader(
286        &self,
287        file_id: FileId,
288        column_id: ColumnId,
289        file_size_hint: Option<u64>,
290    ) -> Result<BlobReader> {
291        let puffin_manager = self
292            .puffin_manager_factory
293            .build(
294                self.object_store.clone(),
295                RegionFilePathFactory::new(self.region_dir.clone()),
296            )
297            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
298
299        puffin_manager
300            .reader(&file_id)
301            .await
302            .context(PuffinBuildReaderSnafu)?
303            .with_file_size_hint(file_size_hint)
304            .blob(&Self::column_blob_name(column_id))
305            .await
306            .context(PuffinReadBlobSnafu)?
307            .reader()
308            .await
309            .context(PuffinBuildReaderSnafu)
310    }
311
312    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
313        &self,
314        reader: R,
315        predicates: &[InListPredicate],
316        output: &mut [(usize, Vec<Range<usize>>)],
317    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
318        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
319
320        for (_, row_group_output) in output.iter_mut() {
321            // All rows are filtered out, skip the search
322            if row_group_output.is_empty() {
323                continue;
324            }
325
326            *row_group_output = applier.search(predicates, row_group_output).await?;
327        }
328
329        Ok(())
330    }
331
332    /// Returns the predicate key.
333    pub fn predicate_key(&self) -> &PredicateKey {
334        &self.predicate_key
335    }
336}
337
338fn is_blob_not_found(err: &Error) -> bool {
339    matches!(
340        err,
341        Error::PuffinReadBlob {
342            source: puffin::error::Error::BlobNotFound { .. },
343            ..
344        }
345    )
346}
347
348#[cfg(test)]
349mod tests {
350
351    use datafusion_expr::{col, lit, Expr};
352    use futures::future::BoxFuture;
353    use puffin::puffin_manager::PuffinWriter;
354    use store_api::metadata::RegionMetadata;
355
356    use super::*;
357    use crate::sst::index::bloom_filter::creator::tests::{
358        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
359    };
360    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
361
362    #[allow(clippy::type_complexity)]
363    fn tester(
364        region_dir: String,
365        object_store: ObjectStore,
366        metadata: &RegionMetadata,
367        puffin_manager_factory: PuffinManagerFactory,
368        file_id: FileId,
369    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
370           + use<'_> {
371        move |exprs, row_groups| {
372            let region_dir = region_dir.clone();
373            let object_store = object_store.clone();
374            let metadata = metadata.clone();
375            let puffin_manager_factory = puffin_manager_factory.clone();
376            let exprs = exprs.to_vec();
377
378            Box::pin(async move {
379                let builder = BloomFilterIndexApplierBuilder::new(
380                    region_dir,
381                    object_store,
382                    &metadata,
383                    puffin_manager_factory,
384                );
385
386                let applier = builder.build(&exprs).unwrap().unwrap();
387                applier
388                    .apply(file_id, None, row_groups.into_iter())
389                    .await
390                    .unwrap()
391                    .into_iter()
392                    .filter(|(_, ranges)| !ranges.is_empty())
393                    .collect()
394            })
395        }
396    }
397
398    #[tokio::test]
399    #[allow(clippy::single_range_in_vec_init)]
400    async fn test_bloom_filter_applier() {
401        // tag_str:
402        //   - type: string
403        //   - index: bloom filter
404        //   - granularity: 2
405        //   - column_id: 1
406        //
407        // ts:
408        //   - type: timestamp
409        //   - index: time index
410        //   - column_id: 2
411        //
412        // field_u64:
413        //   - type: uint64
414        //   - index: bloom filter
415        //   - granularity: 4
416        //   - column_id: 3
417        let region_metadata = mock_region_metadata();
418        let prefix = "test_bloom_filter_applier_";
419        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
420        let object_store = mock_object_store();
421        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
422        let memory_usage_threshold = Some(1024);
423        let file_id = FileId::random();
424        let region_dir = "region_dir".to_string();
425
426        let mut indexer =
427            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
428                .unwrap()
429                .unwrap();
430
431        // push 20 rows
432        let mut batch = new_batch("tag1", 0..10);
433        indexer.update(&mut batch).await.unwrap();
434        let mut batch = new_batch("tag2", 10..20);
435        indexer.update(&mut batch).await.unwrap();
436
437        let puffin_manager = factory.build(
438            object_store.clone(),
439            RegionFilePathFactory::new(region_dir.clone()),
440        );
441
442        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
443        indexer.finish(&mut puffin_writer).await.unwrap();
444        puffin_writer.finish().await.unwrap();
445
446        let tester = tester(
447            region_dir.clone(),
448            object_store.clone(),
449            &region_metadata,
450            factory.clone(),
451            file_id,
452        );
453
454        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
455        // row group: | o  row group |  o row group | o  row group     |  o row group     |
456        // tag_str:   |      o pred                 |   x pred                            |
457        let res = tester(
458            &[col("tag_str").eq(lit("tag1"))],
459            vec![(5, true), (5, true), (5, true), (5, true)],
460        )
461        .await;
462        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
463
464        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
465        // row group: | o  row group |  x row group | o  row group     |  o row group     |
466        // tag_str:   |      o pred                 |   x pred                            |
467        let res = tester(
468            &[col("tag_str").eq(lit("tag1"))],
469            vec![(5, true), (5, false), (5, true), (5, true)],
470        )
471        .await;
472        assert_eq!(res, vec![(0, vec![0..5])]);
473
474        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
475        // row group: | o  row group |  o row group | o  row group     |  o row group     |
476        // tag_str:   |      o pred                 |   x pred                            |
477        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
478        let res = tester(
479            &[
480                col("tag_str").eq(lit("tag1")),
481                col("field_u64").eq(lit(1u64)),
482            ],
483            vec![(5, true), (5, true), (5, true), (5, true)],
484        )
485        .await;
486        assert_eq!(res, vec![(0, vec![0..4])]);
487
488        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
489        // row group: | o  row group |  o row group | x  row group     |  o row group     |
490        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
491        let res = tester(
492            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
493            vec![(5, true), (5, true), (5, false), (5, true)],
494        )
495        .await;
496        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
497    }
498}