mito2/sst/index/bloom_filter/
applier.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::warn;
24use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
25use index::bloom_filter::reader::{
26    BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
27};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39    BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43    ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
44    Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
49use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
50pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
51use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
52
53/// Metrics for tracking bloom filter index apply operations.
54#[derive(Default, Clone)]
55pub struct BloomFilterIndexApplyMetrics {
56    /// Total time spent applying the index.
57    pub apply_elapsed: std::time::Duration,
58    /// Number of blob cache misses.
59    pub blob_cache_miss: usize,
60    /// Total size of blobs read (in bytes).
61    pub blob_read_bytes: u64,
62    /// Metrics for bloom filter read operations.
63    pub read_metrics: BloomFilterReadMetrics,
64}
65
66impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        let Self {
69            apply_elapsed,
70            blob_cache_miss,
71            blob_read_bytes,
72            read_metrics,
73        } = self;
74
75        if self.is_empty() {
76            return write!(f, "{{}}");
77        }
78        write!(f, "{{")?;
79
80        write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
81
82        if *blob_cache_miss > 0 {
83            write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
84        }
85        if *blob_read_bytes > 0 {
86            write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
87        }
88        write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
89
90        write!(f, "}}")
91    }
92}
93
94impl BloomFilterIndexApplyMetrics {
95    /// Returns true if the metrics are empty (contain no meaningful data).
96    pub fn is_empty(&self) -> bool {
97        self.apply_elapsed.is_zero()
98    }
99
100    /// Merges another metrics into this one.
101    pub fn merge_from(&mut self, other: &Self) {
102        self.apply_elapsed += other.apply_elapsed;
103        self.blob_cache_miss += other.blob_cache_miss;
104        self.blob_read_bytes += other.blob_read_bytes;
105        self.read_metrics.merge_from(&other.read_metrics);
106    }
107}
108
109pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
110
111/// `BloomFilterIndexApplier` applies bloom filter predicates to the SST file.
112pub struct BloomFilterIndexApplier {
113    /// Directory of the table.
114    table_dir: String,
115
116    /// Path type for generating file paths.
117    path_type: PathType,
118
119    /// Object store to read the index file.
120    object_store: ObjectStore,
121
122    /// File cache to read the index file.
123    file_cache: Option<FileCacheRef>,
124
125    /// Factory to create puffin manager.
126    puffin_manager_factory: PuffinManagerFactory,
127
128    /// Cache for puffin metadata.
129    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
130
131    /// Cache for bloom filter index.
132    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
133
134    /// Bloom filter predicates.
135    /// For each column, the value will be retained only if it contains __all__ predicates.
136    predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
137
138    /// Predicate key. Used to identify the predicate and fetch result from cache.
139    predicate_key: PredicateKey,
140}
141
142impl BloomFilterIndexApplier {
143    /// Creates a new `BloomFilterIndexApplier`.
144    ///
145    /// For each column, the value will be retained only if it contains __all__ predicates.
146    pub fn new(
147        table_dir: String,
148        path_type: PathType,
149        object_store: ObjectStore,
150        puffin_manager_factory: PuffinManagerFactory,
151        predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
152    ) -> Self {
153        let predicates = Arc::new(predicates);
154        Self {
155            table_dir,
156            path_type,
157            object_store,
158            file_cache: None,
159            puffin_manager_factory,
160            puffin_metadata_cache: None,
161            bloom_filter_index_cache: None,
162            predicate_key: PredicateKey::new_bloom(predicates.clone()),
163            predicates,
164        }
165    }
166
167    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
168        self.file_cache = file_cache;
169        self
170    }
171
172    pub fn with_puffin_metadata_cache(
173        mut self,
174        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
175    ) -> Self {
176        self.puffin_metadata_cache = puffin_metadata_cache;
177        self
178    }
179
180    pub fn with_bloom_filter_cache(
181        mut self,
182        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
183    ) -> Self {
184        self.bloom_filter_index_cache = bloom_filter_index_cache;
185        self
186    }
187
188    /// Applies bloom filter predicates to the provided SST file and returns a
189    /// list of row group ranges that match the predicates.
190    ///
191    /// The `row_groups` iterator provides the row group lengths and whether to search in the row group.
192    ///
193    /// Row group id existing in the returned result means that the row group is searched.
194    /// Empty ranges means that the row group is searched but no rows are found.
195    ///
196    /// # Arguments
197    /// * `file_id` - The region file ID to apply predicates to
198    /// * `file_size_hint` - Optional hint for file size to avoid extra metadata reads
199    /// * `row_groups` - Iterator of row group lengths and whether to search in the row group
200    /// * `metrics` - Optional mutable reference to collect metrics on demand
201    pub async fn apply(
202        &self,
203        file_id: RegionIndexId,
204        file_size_hint: Option<u64>,
205        row_groups: impl Iterator<Item = (usize, bool)>,
206        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
207    ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
208        let apply_start = Instant::now();
209
210        // Calculates row groups' ranges based on start of the file.
211        let mut input = Vec::with_capacity(row_groups.size_hint().0);
212        let mut start = 0;
213        for (i, (len, to_search)) in row_groups.enumerate() {
214            let end = start + len;
215            if to_search {
216                input.push((i, start..end));
217            }
218            start = end;
219        }
220
221        // Initializes output with input ranges, but ranges are based on start of the file not the row group,
222        // so we need to adjust them later.
223        let mut output = input
224            .iter()
225            .map(|(i, range)| (*i, vec![range.clone()]))
226            .collect::<Vec<_>>();
227
228        for (column_id, predicates) in self.predicates.iter() {
229            let blob = match self
230                .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
231                .await?
232            {
233                Some(blob) => blob,
234                None => continue,
235            };
236
237            // Create appropriate reader based on whether we have caching enabled
238            if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
239                let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
240                if let Some(m) = &mut metrics {
241                    m.blob_read_bytes += blob_size;
242                }
243                let reader = CachedBloomFilterIndexBlobReader::new(
244                    file_id.file_id(),
245                    file_id.version,
246                    *column_id,
247                    Tag::Skipping,
248                    blob_size,
249                    BloomFilterReaderImpl::new(blob),
250                    bloom_filter_cache.clone(),
251                );
252                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
253                    .await
254                    .context(ApplyBloomFilterIndexSnafu)?;
255            } else {
256                let reader = BloomFilterReaderImpl::new(blob);
257                self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
258                    .await
259                    .context(ApplyBloomFilterIndexSnafu)?;
260            }
261        }
262
263        // adjust ranges to be based on row group
264        for ((_, output), (_, input)) in output.iter_mut().zip(input) {
265            let start = input.start;
266            for range in output.iter_mut() {
267                range.start -= start;
268                range.end -= start;
269            }
270        }
271
272        // Record elapsed time to histogram and collect metrics if requested
273        let elapsed = apply_start.elapsed();
274        INDEX_APPLY_ELAPSED
275            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
276            .observe(elapsed.as_secs_f64());
277
278        if let Some(m) = metrics {
279            m.apply_elapsed += elapsed;
280        }
281
282        Ok(output)
283    }
284
285    /// Creates a blob reader from the cached or remote index file.
286    ///
287    /// Returus `None` if the column does not have an index.
288    async fn blob_reader(
289        &self,
290        file_id: RegionIndexId,
291        column_id: ColumnId,
292        file_size_hint: Option<u64>,
293        metrics: Option<&mut BloomFilterIndexApplyMetrics>,
294    ) -> Result<Option<BlobReader>> {
295        let reader = match self
296            .cached_blob_reader(file_id, column_id, file_size_hint)
297            .await
298        {
299            Ok(Some(puffin_reader)) => puffin_reader,
300            other => {
301                if let Some(m) = metrics {
302                    m.blob_cache_miss += 1;
303                }
304                if let Err(err) = other {
305                    // Blob not found means no index for this column
306                    if is_blob_not_found(&err) {
307                        return Ok(None);
308                    }
309                    warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
310                }
311                let res = self
312                    .remote_blob_reader(file_id, column_id, file_size_hint)
313                    .await;
314                if let Err(err) = res {
315                    // Blob not found means no index for this column
316                    if is_blob_not_found(&err) {
317                        return Ok(None);
318                    }
319                    return Err(err);
320                }
321
322                res?
323            }
324        };
325
326        Ok(Some(reader))
327    }
328
329    /// Creates a blob reader from the cached index file
330    async fn cached_blob_reader(
331        &self,
332        file_id: RegionIndexId,
333        column_id: ColumnId,
334        file_size_hint: Option<u64>,
335    ) -> Result<Option<BlobReader>> {
336        let Some(file_cache) = &self.file_cache else {
337            return Ok(None);
338        };
339
340        let index_key = IndexKey::new(
341            file_id.region_id(),
342            file_id.file_id(),
343            FileType::Puffin(file_id.version),
344        );
345        if file_cache.get(index_key).await.is_none() {
346            return Ok(None);
347        };
348
349        let puffin_manager = self.puffin_manager_factory.build(
350            file_cache.local_store(),
351            WriteCachePathProvider::new(file_cache.clone()),
352        );
353        let blob_name = Self::column_blob_name(column_id);
354
355        let reader = puffin_manager
356            .reader(&file_id)
357            .await
358            .context(PuffinBuildReaderSnafu)?
359            .with_file_size_hint(file_size_hint)
360            .blob(&blob_name)
361            .await
362            .context(PuffinReadBlobSnafu)?
363            .reader()
364            .await
365            .context(PuffinBuildReaderSnafu)?;
366        Ok(Some(reader))
367    }
368
369    // TODO(ruihang): use the same util with the code in creator
370    fn column_blob_name(column_id: ColumnId) -> String {
371        format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
372    }
373
374    /// Creates a blob reader from the remote index file
375    async fn remote_blob_reader(
376        &self,
377        file_id: RegionIndexId,
378        column_id: ColumnId,
379        file_size_hint: Option<u64>,
380    ) -> Result<BlobReader> {
381        let puffin_manager = self
382            .puffin_manager_factory
383            .build(
384                self.object_store.clone(),
385                RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
386            )
387            .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
388
389        let blob_name = Self::column_blob_name(column_id);
390
391        puffin_manager
392            .reader(&file_id)
393            .await
394            .context(PuffinBuildReaderSnafu)?
395            .with_file_size_hint(file_size_hint)
396            .blob(&blob_name)
397            .await
398            .context(PuffinReadBlobSnafu)?
399            .reader()
400            .await
401            .context(PuffinBuildReaderSnafu)
402    }
403
404    async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
405        &self,
406        reader: R,
407        predicates: &[InListPredicate],
408        output: &mut [(usize, Vec<Range<usize>>)],
409        mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
410    ) -> std::result::Result<(), index::bloom_filter::error::Error> {
411        let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
412
413        for (_, row_group_output) in output.iter_mut() {
414            // All rows are filtered out, skip the search
415            if row_group_output.is_empty() {
416                continue;
417            }
418
419            let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
420            *row_group_output = applier
421                .search(predicates, row_group_output, read_metrics)
422                .await?;
423        }
424
425        Ok(())
426    }
427
428    /// Returns the predicate key.
429    pub fn predicate_key(&self) -> &PredicateKey {
430        &self.predicate_key
431    }
432}
433
434fn is_blob_not_found(err: &Error) -> bool {
435    matches!(
436        err,
437        Error::PuffinReadBlob {
438            source: puffin::error::Error::BlobNotFound { .. },
439            ..
440        }
441    )
442}
443
444#[cfg(test)]
445mod tests {
446
447    use datafusion_expr::{Expr, col, lit};
448    use futures::future::BoxFuture;
449    use puffin::puffin_manager::PuffinWriter;
450    use store_api::metadata::RegionMetadata;
451    use store_api::storage::FileId;
452
453    use super::*;
454    use crate::sst::file::RegionFileId;
455    use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
456    use crate::sst::index::bloom_filter::creator::tests::{
457        mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
458    };
459
460    #[allow(clippy::type_complexity)]
461    fn tester(
462        table_dir: String,
463        object_store: ObjectStore,
464        metadata: &RegionMetadata,
465        puffin_manager_factory: PuffinManagerFactory,
466        file_id: RegionIndexId,
467    ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
468    + use<'_> {
469        move |exprs, row_groups| {
470            let table_dir = table_dir.clone();
471            let object_store: ObjectStore = object_store.clone();
472            let metadata = metadata.clone();
473            let puffin_manager_factory = puffin_manager_factory.clone();
474            let exprs = exprs.to_vec();
475
476            Box::pin(async move {
477                let builder = BloomFilterIndexApplierBuilder::new(
478                    table_dir,
479                    PathType::Bare,
480                    object_store,
481                    &metadata,
482                    puffin_manager_factory,
483                );
484
485                let applier = builder.build(&exprs).unwrap().unwrap();
486                applier
487                    .apply(file_id, None, row_groups.into_iter(), None)
488                    .await
489                    .unwrap()
490                    .into_iter()
491                    .filter(|(_, ranges)| !ranges.is_empty())
492                    .collect()
493            })
494        }
495    }
496
497    #[tokio::test]
498    #[allow(clippy::single_range_in_vec_init)]
499    async fn test_bloom_filter_applier() {
500        // tag_str:
501        //   - type: string
502        //   - index: bloom filter
503        //   - granularity: 2
504        //   - column_id: 1
505        //
506        // ts:
507        //   - type: timestamp
508        //   - index: time index
509        //   - column_id: 2
510        //
511        // field_u64:
512        //   - type: uint64
513        //   - index: bloom filter
514        //   - granularity: 4
515        //   - column_id: 3
516        let region_metadata = mock_region_metadata();
517        let prefix = "test_bloom_filter_applier_";
518        let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
519        let object_store = mock_object_store();
520        let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
521        let memory_usage_threshold = Some(1024);
522        let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
523        let file_id = RegionIndexId::new(file_id, 0);
524        let table_dir = "table_dir".to_string();
525
526        let mut indexer = BloomFilterIndexer::new(
527            file_id.file_id(),
528            &region_metadata,
529            intm_mgr,
530            memory_usage_threshold,
531        )
532        .unwrap()
533        .unwrap();
534
535        // push 20 rows
536        let mut batch = new_batch("tag1", 0..10);
537        indexer.update(&mut batch).await.unwrap();
538        let mut batch = new_batch("tag2", 10..20);
539        indexer.update(&mut batch).await.unwrap();
540
541        let puffin_manager = factory.build(
542            object_store.clone(),
543            RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
544        );
545
546        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
547        indexer.finish(&mut puffin_writer).await.unwrap();
548        puffin_writer.finish().await.unwrap();
549
550        let tester = tester(
551            table_dir.clone(),
552            object_store.clone(),
553            &region_metadata,
554            factory.clone(),
555            file_id,
556        );
557
558        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
559        // row group: | o  row group |  o row group | o  row group     |  o row group     |
560        // tag_str:   |      o pred                 |   x pred                            |
561        let res = tester(
562            &[col("tag_str").eq(lit("tag1"))],
563            vec![(5, true), (5, true), (5, true), (5, true)],
564        )
565        .await;
566        assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
567
568        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
569        // row group: | o  row group |  x row group | o  row group     |  o row group     |
570        // tag_str:   |      o pred                 |   x pred                            |
571        let res = tester(
572            &[col("tag_str").eq(lit("tag1"))],
573            vec![(5, true), (5, false), (5, true), (5, true)],
574        )
575        .await;
576        assert_eq!(res, vec![(0, vec![0..5])]);
577
578        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
579        // row group: | o  row group |  o row group | o  row group     |  o row group     |
580        // tag_str:   |      o pred                 |   x pred                            |
581        // field_u64: | o pred   | x pred    |  x pred     |  x pred       | x pred       |
582        let res = tester(
583            &[
584                col("tag_str").eq(lit("tag1")),
585                col("field_u64").eq(lit(1u64)),
586            ],
587            vec![(5, true), (5, true), (5, true), (5, true)],
588        )
589        .await;
590        assert_eq!(res, vec![(0, vec![0..4])]);
591
592        // rows        0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19
593        // row group: | o  row group |  o row group | x  row group     |  o row group     |
594        // field_u64: | o pred   | x pred    |  o pred     |  x pred       | x pred       |
595        let res = tester(
596            &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
597            vec![(5, true), (5, true), (5, false), (5, true)],
598        )
599        .await;
600        assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
601    }
602}