mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
39    Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
44use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
47
48pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
49
50/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
51pub struct BloomFilterIndexApplier {
52    /// Directory of the region.
53    region_dir: String,
54
55    /// ID of the region.
56    region_id: RegionId,
57
58    /// Object store to read the index file.
59    object_store: ObjectStore,
60
61    /// File cache to read the index file.
62    file_cache: Option<FileCacheRef>,
63
64    /// Factory to create puffin manager.
65    puffin_manager_factory: PuffinManagerFactory,
66
67    /// Cache for puffin metadata.
68    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
69
70    /// Cache for bloom filter index.
71    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
72
73    /// Bloom filter predicates.
74    /// For each column, the value will be retained only if it contains __all__ predicates.
75    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
76
77    /// Predicate key. Used to identify the predicate and fetch result from cache.
78    predicate_key: PredicateKey,
79}
80
81impl BloomFilterIndexApplier {
82    /// Creates a new `BloomFilterIndexApplier`.
83    ///
84    /// For each column, the value will be retained only if it contains __all__ predicates.
85    pub fn new(
86        region_dir: String,
87        region_id: RegionId,
88        object_store: ObjectStore,
89        puffin_manager_factory: PuffinManagerFactory,
90        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
91    ) -> Self {
92        let predicates = Arc::new(predicates);
93        Self {
94            region_dir,
95            region_id,
96            object_store,
97            file_cache: None,
98            puffin_manager_factory,
99            puffin_metadata_cache: None,
100            bloom_filter_index_cache: None,
101            predicate_key: PredicateKey::new_bloom(predicates.clone()),
102            predicates,
103        }
104    }
105
106    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107        self.file_cache = file_cache;
108        self
109    }
110
111    pub fn with_puffin_metadata_cache(
112        mut self,
113        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114    ) -> Self {
115        self.puffin_metadata_cache = puffin_metadata_cache;
116        self
117    }
118
119    pub fn with_bloom_filter_cache(
120        mut self,
121        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
122    ) -> Self {
123        self.bloom_filter_index_cache = bloom_filter_index_cache;
124        self
125    }
126
127    /// Applies bloom filter predicates to the provided SST file and returns a
128    /// list of row group ranges that match the predicates.
129    ///
130    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
131    pub async fn apply(
132        &self,
133        file_id: FileId,
134        file_size_hint: Option<u64>,
135        row_groups: impl Iterator<Item = (usize, bool)>,
136    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
137        let _timer = INDEX_APPLY_ELAPSED
138            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
139            .start_timer();
140
141        // Calculates row groups' ranges based on start of the file.
142        let mut input = Vec::with_capacity(row_groups.size_hint().0);
143        let mut start = 0;
144        for (i, (len, to_search)) in row_groups.enumerate() {
145            let end = start + len;
146            if to_search {
147                input.push((i, start..end));
148            }
149            start = end;
150        }
151
152        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
153        // so we need to adjust them later.
154        let mut output = input
155            .iter()
156            .map(|(i, range)| (*i, vec![range.clone()]))
157            .collect::<Vec<_>>();
158
159        for (column_id, predicates) in self.predicates.iter() {
160            let blob = match self
161                .blob_reader(file_id, *column_id, file_size_hint)
162                .await?
163            {
164                Some(blob) => blob,
165                None => continue,
166            };
167
168            // Create appropriate reader based on whether we have caching enabled
169            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
170                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
171                let reader = CachedBloomFilterIndexBlobReader::new(
172                    file_id,
173                    *column_id,
174                    Tag::Skipping,
175                    blob_size,
176                    BloomFilterReaderImpl::new(blob),
177                    bloom_filter_cache.clone(),
178                );
179                self.apply_predicates(reader, predicates, &mut output)
180                    .await
181                    .context(ApplyBloomFilterIndexSnafu)?;
182            } else {
183                let reader = BloomFilterReaderImpl::new(blob);
184                self.apply_predicates(reader, predicates, &mut output)
185                    .await
186                    .context(ApplyBloomFilterIndexSnafu)?;
187            }
188        }
189
190        // adjust ranges to be based on row group
191        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
192            let start = input.start;
193            for range in output.iter_mut() {
194                range.start -= start;
195                range.end -= start;
196            }
197        }
198        output.retain(|(_, ranges)| !ranges.is_empty());
199
200        Ok(output)
201    }
202
203    /// Creates a blob reader from the cached or remote index file.
204    ///
205    /// Returus `None` if the column does not have an index.
206    async fn blob_reader(
207        &self,
208        file_id: FileId,
209        column_id: ColumnId,
210        file_size_hint: Option<u64>,
211    ) -> Result<Option<BlobReader>> {
212        let reader = match self
213            .cached_blob_reader(file_id, column_id, file_size_hint)
214            .await
215        {
216            Ok(Some(puffin_reader)) => puffin_reader,
217            other => {
218                if let Err(err) = other {
219                    // Blob not found means no index for this column
220                    if is_blob_not_found(&err) {
221                        return Ok(None);
222                    }
223                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
224                }
225                let res = self
226                    .remote_blob_reader(file_id, column_id, file_size_hint)
227                    .await;
228                if let Err(err) = res {
229                    // Blob not found means no index for this column
230                    if is_blob_not_found(&err) {
231                        return Ok(None);
232                    }
233                    return Err(err);
234                }
235
236                res?
237            }
238        };
239
240        Ok(Some(reader))
241    }
242
243    /// Creates a blob reader from the cached index file
244    async fn cached_blob_reader(
245        &self,
246        file_id: FileId,
247        column_id: ColumnId,
248        file_size_hint: Option<u64>,
249    ) -> Result<Option<BlobReader>> {
250        let Some(file_cache) = &self.file_cache else {
251            return Ok(None);
252        };
253
254        let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
255        if file_cache.get(index_key).await.is_none() {
256            return Ok(None);
257        };
258
259        let puffin_manager = self.puffin_manager_factory.build(
260            file_cache.local_store(),
261            WriteCachePathProvider::new(self.region_id, file_cache.clone()),
262        );
263        let reader = puffin_manager
264            .reader(&file_id)
265            .await
266            .context(PuffinBuildReaderSnafu)?
267            .with_file_size_hint(file_size_hint)
268            .blob(&Self::column_blob_name(column_id))
269            .await
270            .context(PuffinReadBlobSnafu)?
271            .reader()
272            .await
273            .context(PuffinBuildReaderSnafu)?;
274        Ok(Some(reader))
275    }
276
277    // TODO(ruihang): use the same util with the code in creator
278    fn column_blob_name(column_id: ColumnId) -> String {
279        format!("{INDEX_BLOB_TYPE}-{column_id}")
280    }
281
282    /// Creates a blob reader from the remote index file
283    async fn remote_blob_reader(
284        &self,
285        file_id: FileId,
286        column_id: ColumnId,
287        file_size_hint: Option<u64>,
288    ) -> Result<BlobReader> {
289        let puffin_manager = self
290            .puffin_manager_factory
291            .build(
292                self.object_store.clone(),
293                RegionFilePathFactory::new(self.region_dir.clone()),
294            )
295            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
296
297        puffin_manager
298            .reader(&file_id)
299            .await
300            .context(PuffinBuildReaderSnafu)?
301            .with_file_size_hint(file_size_hint)
302            .blob(&Self::column_blob_name(column_id))
303            .await
304            .context(PuffinReadBlobSnafu)?
305            .reader()
306            .await
307            .context(PuffinBuildReaderSnafu)
308    }
309
310    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
311        &self,
312        reader: R,
313        predicates: &[InListPredicate],
314        output: &mut [(usize, Vec<Range<usize>>)],
315    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
316        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
317
318        for (_, row_group_output) in output.iter_mut() {
319            // All rows are filtered out, skip the search
320            if row_group_output.is_empty() {
321                continue;
322            }
323
324            *row_group_output = applier.search(predicates, row_group_output).await?;
325        }
326
327        Ok(())
328    }
329
330    /// Returns the predicate key.
331    pub fn predicate_key(&self) -> &PredicateKey {
332        &self.predicate_key
333    }
334}
335
336fn is_blob_not_found(err: &Error) -> bool {
337    matches!(
338        err,
339        Error::PuffinReadBlob {
340            source: puffin::error::Error::BlobNotFound { .. },
341            ..
342        }
343    )
344}
345
346#[cfg(test)]
347mod tests {
348
349    use datafusion_expr::{col, lit, Expr};
350    use futures::future::BoxFuture;
351    use puffin::puffin_manager::PuffinWriter;
352    use store_api::metadata::RegionMetadata;
353
354    use super::*;
355    use crate::sst::index::bloom_filter::creator::tests::{
356        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
357    };
358    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
359
360    #[allow(clippy::type_complexity)]
361    fn tester(
362        region_dir: String,
363        object_store: ObjectStore,
364        metadata: &RegionMetadata,
365        puffin_manager_factory: PuffinManagerFactory,
366        file_id: FileId,
367    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
368           + use<'_> {
369        move |exprs, row_groups| {
370            let region_dir = region_dir.clone();
371            let object_store = object_store.clone();
372            let metadata = metadata.clone();
373            let puffin_manager_factory = puffin_manager_factory.clone();
374            let exprs = exprs.to_vec();
375
376            Box::pin(async move {
377                let builder = BloomFilterIndexApplierBuilder::new(
378                    region_dir,
379                    object_store,
380                    &metadata,
381                    puffin_manager_factory,
382                );
383
384                let applier = builder.build(&exprs).unwrap().unwrap();
385                applier
386                    .apply(file_id, None, row_groups.into_iter())
387                    .await
388                    .unwrap()
389            })
390        }
391    }
392
393    #[tokio::test]
394    #[allow(clippy::single_range_in_vec_init)]
395    async fn test_bloom_filter_applier() {
396        // tag_str:
397        //   - type: string
398        //   - index: bloom filter
399        //   - granularity: 2
400        //   - column_id: 1
401        //
402        // ts:
403        //   - type: timestamp
404        //   - index: time index
405        //   - column_id: 2
406        //
407        // field_u64:
408        //   - type: uint64
409        //   - index: bloom filter
410        //   - granularity: 4
411        //   - column_id: 3
412        let region_metadata = mock_region_metadata();
413        let prefix = "test_bloom_filter_applier_";
414        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
415        let object_store = mock_object_store();
416        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
417        let memory_usage_threshold = Some(1024);
418        let file_id = FileId::random();
419        let region_dir = "region_dir".to_string();
420
421        let mut indexer =
422            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
423                .unwrap()
424                .unwrap();
425
426        // push 20 rows
427        let mut batch = new_batch("tag1", 0..10);
428        indexer.update(&mut batch).await.unwrap();
429        let mut batch = new_batch("tag2", 10..20);
430        indexer.update(&mut batch).await.unwrap();
431
432        let puffin_manager = factory.build(
433            object_store.clone(),
434            RegionFilePathFactory::new(region_dir.clone()),
435        );
436
437        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
438        indexer.finish(&mut puffin_writer).await.unwrap();
439        puffin_writer.finish().await.unwrap();
440
441        let tester = tester(
442            region_dir.clone(),
443            object_store.clone(),
444            &region_metadata,
445            factory.clone(),
446            file_id,
447        );
448
449        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
450        // row group: | o  row group |  o row group | o  row group     |  o row group     |
451        // tag_str:   |      o pred                 |   x pred                            |
452        let res = tester(
453            &[col("tag_str").eq(lit("tag1"))],
454            vec![(5, true), (5, true), (5, true), (5, true)],
455        )
456        .await;
457        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
458
459        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
460        // row group: | o  row group |  x row group | o  row group     |  o row group     |
461        // tag_str:   |      o pred                 |   x pred                            |
462        let res = tester(
463            &[col("tag_str").eq(lit("tag1"))],
464            vec![(5, true), (5, false), (5, true), (5, true)],
465        )
466        .await;
467        assert_eq!(res, vec![(0, vec![0..5])]);
468
469        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
470        // row group: | o  row group |  o row group | o  row group     |  o row group     |
471        // tag_str:   |      o pred                 |   x pred                            |
472        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
473        let res = tester(
474            &[
475                col("tag_str").eq(lit("tag1")),
476                col("field_u64").eq(lit(1u64)),
477            ],
478            vec![(5, true), (5, true), (5, true), (5, true)],
479        )
480        .await;
481        assert_eq!(res, vec![(0, vec![0..4])]);
482
483        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
484        // row group: | o  row group |  o row group | x  row group     |  o row group     |
485        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
486        let res = tester(
487            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
488            vec![(5, true), (5, true), (5, false), (5, true)],
489        )
490        .await;
491        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
492    }
493}