mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::{tracing, warn};
24use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
25use index::bloom_filter::reader::{
26    BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
27};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
44    Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
49pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
50use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
51use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
52
53/// Metrics for tracking bloom filter index apply operations.
54#[derive(Default, Clone)]
55pub struct BloomFilterIndexApplyMetrics {
56    /// Total time spent applying the index.
57    pub apply_elapsed: std::time::Duration,
58    /// Number of blob cache misses.
59    pub blob_cache_miss: usize,
60    /// Total size of blobs read (in bytes).
61    pub blob_read_bytes: u64,
62    /// Metrics for bloom filter read operations.
63    pub read_metrics: BloomFilterReadMetrics,
64}
65
66impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        let Self {
69            apply_elapsed,
70            blob_cache_miss,
71            blob_read_bytes,
72            read_metrics,
73        } = self;
74
75        if self.is_empty() {
76            return write!(f, "{{}}");
77        }
78        write!(f, "{{")?;
79
80        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
81
82        if *blob_cache_miss > 0 {
83            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
84        }
85        if *blob_read_bytes > 0 {
86            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
87        }
88        write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
89
90        write!(f, "}}")
91    }
92}
93
94impl BloomFilterIndexApplyMetrics {
95    /// Returns true if the metrics are empty (contain no meaningful data).
96    pub fn is_empty(&self) -> bool {
97        self.apply_elapsed.is_zero()
98    }
99
100    /// Merges another metrics into this one.
101    pub fn merge_from(&mut self, other: &Self) {
102        self.apply_elapsed += other.apply_elapsed;
103        self.blob_cache_miss += other.blob_cache_miss;
104        self.blob_read_bytes += other.blob_read_bytes;
105        self.read_metrics.merge_from(&other.read_metrics);
106    }
107}
108
109pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
110
111/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
112pub struct BloomFilterIndexApplier {
113    /// Directory of the table.
114    table_dir: String,
115
116    /// Path type for generating file paths.
117    path_type: PathType,
118
119    /// Object store to read the index file.
120    object_store: ObjectStore,
121
122    /// File cache to read the index file.
123    file_cache: Option<FileCacheRef>,
124
125    /// Factory to create puffin manager.
126    puffin_manager_factory: PuffinManagerFactory,
127
128    /// Cache for puffin metadata.
129    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
130
131    /// Cache for bloom filter index.
132    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
133
134    /// Bloom filter predicates.
135    /// For each column, the value will be retained only if it contains __all__ predicates.
136    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
137
138    /// Predicate key. Used to identify the predicate and fetch result from cache.
139    predicate_key: PredicateKey,
140}
141
142impl BloomFilterIndexApplier {
143    /// Creates a new `BloomFilterIndexApplier`.
144    ///
145    /// For each column, the value will be retained only if it contains __all__ predicates.
146    pub fn new(
147        table_dir: String,
148        path_type: PathType,
149        object_store: ObjectStore,
150        puffin_manager_factory: PuffinManagerFactory,
151        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
152    ) -> Self {
153        let predicates = Arc::new(predicates);
154        Self {
155            table_dir,
156            path_type,
157            object_store,
158            file_cache: None,
159            puffin_manager_factory,
160            puffin_metadata_cache: None,
161            bloom_filter_index_cache: None,
162            predicate_key: PredicateKey::new_bloom(predicates.clone()),
163            predicates,
164        }
165    }
166
167    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
168        self.file_cache = file_cache;
169        self
170    }
171
172    pub fn with_puffin_metadata_cache(
173        mut self,
174        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
175    ) -> Self {
176        self.puffin_metadata_cache = puffin_metadata_cache;
177        self
178    }
179
180    pub fn with_bloom_filter_cache(
181        mut self,
182        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
183    ) -> Self {
184        self.bloom_filter_index_cache = bloom_filter_index_cache;
185        self
186    }
187
188    /// Applies bloom filter predicates to the provided SST file and returns a
189    /// list of row group ranges that match the predicates.
190    ///
191    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
192    ///
193    /// Row group id existing in the returned result means that the row group is searched.
194    /// Empty ranges means that the row group is searched but no rows are found.
195    ///
196    ///
197    /// # Arguments
198    /// * `file_id` - The region file ID to apply predicates to
199    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
200    /// * `row_groups` - Iterator of row group lengths and whether to search in the row group
201    /// * `metrics` - Optional mutable reference to collect metrics on demand
202    #[tracing::instrument(
203        skip_all,
204        fields(file_id = %file_id)
205    )]
206    pub async fn apply(
207        &self,
208        file_id: RegionIndexId,
209        file_size_hint: Option<u64>,
210        row_groups: impl Iterator<Item = (usize, bool)>,
211        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
212    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
213        let apply_start = Instant::now();
214
215        // Calculates row groups' ranges based on start of the file.
216        let mut input = Vec::with_capacity(row_groups.size_hint().0);
217        let mut start = 0;
218        for (i, (len, to_search)) in row_groups.enumerate() {
219            let end = start + len;
220            if to_search {
221                input.push((i, start..end));
222            }
223            start = end;
224        }
225
226        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
227        // so we need to adjust them later.
228        let mut output = input
229            .iter()
230            .map(|(i, range)| (*i, vec![range.clone()]))
231            .collect::<Vec<_>>();
232
233        for (column_id, predicates) in self.predicates.iter() {
234            let blob = match self
235                .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
236                .await?
237            {
238                Some(blob) => blob,
239                None => continue,
240            };
241
242            // Create appropriate reader based on whether we have caching enabled
243            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
244                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
245                if let Some(m) = &mut metrics {
246                    m.blob_read_bytes += blob_size;
247                }
248                let reader = CachedBloomFilterIndexBlobReader::new(
249                    file_id.file_id(),
250                    file_id.version,
251                    *column_id,
252                    Tag::Skipping,
253                    blob_size,
254                    BloomFilterReaderImpl::new(blob),
255                    bloom_filter_cache.clone(),
256                );
257                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
258                    .await
259                    .context(ApplyBloomFilterIndexSnafu)?;
260            } else {
261                let reader = BloomFilterReaderImpl::new(blob);
262                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
263                    .await
264                    .context(ApplyBloomFilterIndexSnafu)?;
265            }
266        }
267
268        // adjust ranges to be based on row group
269        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
270            let start = input.start;
271            for range in output.iter_mut() {
272                range.start -= start;
273                range.end -= start;
274            }
275        }
276
277        // Record elapsed time to histogram and collect metrics if requested
278        let elapsed = apply_start.elapsed();
279        INDEX_APPLY_ELAPSED
280            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
281            .observe(elapsed.as_secs_f64());
282
283        if let Some(m) = metrics {
284            m.apply_elapsed += elapsed;
285        }
286
287        Ok(output)
288    }
289
290    /// Creates a blob reader from the cached or remote index file.
291    ///
292    /// Returus `None` if the column does not have an index.
293    async fn blob_reader(
294        &self,
295        file_id: RegionIndexId,
296        column_id: ColumnId,
297        file_size_hint: Option<u64>,
298        metrics: Option<&mut BloomFilterIndexApplyMetrics>,
299    ) -> Result<Option<BlobReader>> {
300        let reader = match self
301            .cached_blob_reader(file_id, column_id, file_size_hint)
302            .await
303        {
304            Ok(Some(puffin_reader)) => puffin_reader,
305            other => {
306                if let Some(m) = metrics {
307                    m.blob_cache_miss += 1;
308                }
309                if let Err(err) = other {
310                    // Blob not found means no index for this column
311                    if is_blob_not_found(&err) {
312                        return Ok(None);
313                    }
314                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
315                }
316                let res = self
317                    .remote_blob_reader(file_id, column_id, file_size_hint)
318                    .await;
319                if let Err(err) = res {
320                    // Blob not found means no index for this column
321                    if is_blob_not_found(&err) {
322                        return Ok(None);
323                    }
324                    return Err(err);
325                }
326
327                res?
328            }
329        };
330
331        Ok(Some(reader))
332    }
333
334    /// Creates a blob reader from the cached index file
335    async fn cached_blob_reader(
336        &self,
337        file_id: RegionIndexId,
338        column_id: ColumnId,
339        file_size_hint: Option<u64>,
340    ) -> Result<Option<BlobReader>> {
341        let Some(file_cache) = &self.file_cache else {
342            return Ok(None);
343        };
344
345        let index_key = IndexKey::new(
346            file_id.region_id(),
347            file_id.file_id(),
348            FileType::Puffin(file_id.version),
349        );
350        if file_cache.get(index_key).await.is_none() {
351            return Ok(None);
352        };
353
354        let puffin_manager = self.puffin_manager_factory.build(
355            file_cache.local_store(),
356            WriteCachePathProvider::new(file_cache.clone()),
357        );
358        let blob_name = Self::column_blob_name(column_id);
359
360        let reader = puffin_manager
361            .reader(&file_id)
362            .await
363            .context(PuffinBuildReaderSnafu)?
364            .with_file_size_hint(file_size_hint)
365            .blob(&blob_name)
366            .await
367            .context(PuffinReadBlobSnafu)?
368            .reader()
369            .await
370            .context(PuffinBuildReaderSnafu)?;
371        Ok(Some(reader))
372    }
373
374    // TODO(ruihang): use the same util with the code in creator
375    fn column_blob_name(column_id: ColumnId) -> String {
376        format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
377    }
378
379    /// Creates a blob reader from the remote index file
380    async fn remote_blob_reader(
381        &self,
382        file_id: RegionIndexId,
383        column_id: ColumnId,
384        file_size_hint: Option<u64>,
385    ) -> Result<BlobReader> {
386        let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
387
388        // Trigger background download if file cache and file size are available
389        trigger_index_background_download(
390            self.file_cache.as_ref(),
391            &file_id,
392            file_size_hint,
393            &path_factory,
394            &self.object_store,
395        );
396
397        let puffin_manager = self
398            .puffin_manager_factory
399            .build(self.object_store.clone(), path_factory)
400            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
401
402        let blob_name = Self::column_blob_name(column_id);
403
404        puffin_manager
405            .reader(&file_id)
406            .await
407            .context(PuffinBuildReaderSnafu)?
408            .with_file_size_hint(file_size_hint)
409            .blob(&blob_name)
410            .await
411            .context(PuffinReadBlobSnafu)?
412            .reader()
413            .await
414            .context(PuffinBuildReaderSnafu)
415    }
416
417    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
418        &self,
419        reader: R,
420        predicates: &[InListPredicate],
421        output: &mut [(usize, Vec<Range<usize>>)],
422        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
423    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
424        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
425
426        for (_, row_group_output) in output.iter_mut() {
427            // All rows are filtered out, skip the search
428            if row_group_output.is_empty() {
429                continue;
430            }
431
432            let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
433            *row_group_output = applier
434                .search(predicates, row_group_output, read_metrics)
435                .await?;
436        }
437
438        Ok(())
439    }
440
441    /// Returns the predicate key.
442    pub fn predicate_key(&self) -> &PredicateKey {
443        &self.predicate_key
444    }
445}
446
447fn is_blob_not_found(err: &Error) -> bool {
448    matches!(
449        err,
450        Error::PuffinReadBlob {
451            source: puffin::error::Error::BlobNotFound { .. },
452            ..
453        }
454    )
455}
456
457#[cfg(test)]
458mod tests {
459
460    use datafusion_expr::{Expr, col, lit};
461    use futures::future::BoxFuture;
462    use puffin::puffin_manager::PuffinWriter;
463    use store_api::metadata::RegionMetadata;
464    use store_api::storage::FileId;
465
466    use super::*;
467    use crate::sst::file::RegionFileId;
468    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
469    use crate::sst::index::bloom_filter::creator::tests::{
470        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
471    };
472
473    #[allow(clippy::type_complexity)]
474    fn tester(
475        table_dir: String,
476        object_store: ObjectStore,
477        metadata: &RegionMetadata,
478        puffin_manager_factory: PuffinManagerFactory,
479        file_id: RegionIndexId,
480    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
481    + use<'_> {
482        move |exprs, row_groups| {
483            let table_dir = table_dir.clone();
484            let object_store: ObjectStore = object_store.clone();
485            let metadata = metadata.clone();
486            let puffin_manager_factory = puffin_manager_factory.clone();
487            let exprs = exprs.to_vec();
488
489            Box::pin(async move {
490                let builder = BloomFilterIndexApplierBuilder::new(
491                    table_dir,
492                    PathType::Bare,
493                    object_store,
494                    &metadata,
495                    puffin_manager_factory,
496                );
497
498                let applier = builder.build(&exprs).unwrap().unwrap();
499                applier
500                    .apply(file_id, None, row_groups.into_iter(), None)
501                    .await
502                    .unwrap()
503                    .into_iter()
504                    .filter(|(_, ranges)| !ranges.is_empty())
505                    .collect()
506            })
507        }
508    }
509
510    #[tokio::test]
511    #[allow(clippy::single_range_in_vec_init)]
512    async fn test_bloom_filter_applier() {
513        // tag_str:
514        //   - type: string
515        //   - index: bloom filter
516        //   - granularity: 2
517        //   - column_id: 1
518        //
519        // ts:
520        //   - type: timestamp
521        //   - index: time index
522        //   - column_id: 2
523        //
524        // field_u64:
525        //   - type: uint64
526        //   - index: bloom filter
527        //   - granularity: 4
528        //   - column_id: 3
529        let region_metadata = mock_region_metadata();
530        let prefix = "test_bloom_filter_applier_";
531        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
532        let object_store = mock_object_store();
533        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
534        let memory_usage_threshold = Some(1024);
535        let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
536        let file_id = RegionIndexId::new(file_id, 0);
537        let table_dir = "table_dir".to_string();
538
539        let mut indexer = BloomFilterIndexer::new(
540            file_id.file_id(),
541            &region_metadata,
542            intm_mgr,
543            memory_usage_threshold,
544        )
545        .unwrap()
546        .unwrap();
547
548        // push 20 rows
549        let mut batch = new_batch("tag1", 0..10);
550        indexer.update(&mut batch).await.unwrap();
551        let mut batch = new_batch("tag2", 10..20);
552        indexer.update(&mut batch).await.unwrap();
553
554        let puffin_manager = factory.build(
555            object_store.clone(),
556            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
557        );
558
559        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
560        indexer.finish(&mut puffin_writer).await.unwrap();
561        puffin_writer.finish().await.unwrap();
562
563        let tester = tester(
564            table_dir.clone(),
565            object_store.clone(),
566            &region_metadata,
567            factory.clone(),
568            file_id,
569        );
570
571        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
572        // row group: | o  row group |  o row group | o  row group     |  o row group     |
573        // tag_str:   |      o pred                 |   x pred                            |
574        let res = tester(
575            &[col("tag_str").eq(lit("tag1"))],
576            vec![(5, true), (5, true), (5, true), (5, true)],
577        )
578        .await;
579        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
580
581        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
582        // row group: | o  row group |  x row group | o  row group     |  o row group     |
583        // tag_str:   |      o pred                 |   x pred                            |
584        let res = tester(
585            &[col("tag_str").eq(lit("tag1"))],
586            vec![(5, true), (5, false), (5, true), (5, true)],
587        )
588        .await;
589        assert_eq!(res, vec![(0, vec![0..5])]);
590
591        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
592        // row group: | o  row group |  o row group | o  row group     |  o row group     |
593        // tag_str:   |      o pred                 |   x pred                            |
594        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
595        let res = tester(
596            &[
597                col("tag_str").eq(lit("tag1")),
598                col("field_u64").eq(lit(1u64)),
599            ],
600            vec![(5, true), (5, true), (5, true), (5, true)],
601        )
602        .await;
603        assert_eq!(res, vec![(0, vec![0..4])]);
604
605        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
606        // row group: | o  row group |  o row group | x  row group     |  o row group     |
607        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
608        let res = tester(
609            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
610            vec![(5, true), (5, true), (5, false), (5, true)],
611        )
612        .await;
613        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
614    }
615}