1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::{tracing, warn};
24use datatypes::data_type::ConcreteDataType;
25use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
26use index::bloom_filter::reader::{
27 BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
28};
29use index::target::IndexTarget;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use puffin::puffin_manager::{PuffinManager, PuffinReader};
33use snafu::ResultExt;
34use store_api::metadata::RegionMetadataRef;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
39use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
40use crate::cache::index::bloom_filter_index::{
41 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
42};
43use crate::error::{
44 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
45 Result,
46};
47use crate::metrics::INDEX_APPLY_ELAPSED;
48use crate::sst::file::RegionIndexId;
49use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
50pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
51use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
52use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
53
54#[derive(Default, Clone)]
56pub struct BloomFilterIndexApplyMetrics {
57 pub apply_elapsed: std::time::Duration,
59 pub blob_cache_miss: usize,
61 pub blob_read_bytes: u64,
63 pub read_metrics: BloomFilterReadMetrics,
65}
66
67impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 let Self {
70 apply_elapsed,
71 blob_cache_miss,
72 blob_read_bytes,
73 read_metrics,
74 } = self;
75
76 if self.is_empty() {
77 return write!(f, "{{}}");
78 }
79 write!(f, "{{")?;
80
81 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
82
83 if *blob_cache_miss > 0 {
84 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
85 }
86 if *blob_read_bytes > 0 {
87 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
88 }
89 write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
90
91 write!(f, "}}")
92 }
93}
94
95impl BloomFilterIndexApplyMetrics {
96 pub fn is_empty(&self) -> bool {
98 self.apply_elapsed.is_zero()
99 }
100
101 pub fn merge_from(&mut self, other: &Self) {
103 self.apply_elapsed += other.apply_elapsed;
104 self.blob_cache_miss += other.blob_cache_miss;
105 self.blob_read_bytes += other.blob_read_bytes;
106 self.read_metrics.merge_from(&other.read_metrics);
107 }
108}
109
110pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
111
112pub struct BloomFilterIndexApplier {
114 table_dir: String,
116
117 path_type: PathType,
119
120 object_store: ObjectStore,
122
123 file_cache: Option<FileCacheRef>,
125
126 puffin_manager_factory: PuffinManagerFactory,
128
129 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
131
132 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
134
135 default_predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
138
139 expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
141}
142
143impl BloomFilterIndexApplier {
144 pub fn new(
148 table_dir: String,
149 path_type: PathType,
150 object_store: ObjectStore,
151 puffin_manager_factory: PuffinManagerFactory,
152 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
153 expected_predicate_col_types: BTreeMap<ColumnId, ConcreteDataType>,
154 ) -> Self {
155 let default_predicates = Arc::new(predicates);
156 Self {
157 table_dir,
158 path_type,
159 object_store,
160 file_cache: None,
161 puffin_manager_factory,
162 puffin_metadata_cache: None,
163 bloom_filter_index_cache: None,
164 default_predicates,
165 expected_predicate_col_types,
166 }
167 }
168
169 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
170 self.file_cache = file_cache;
171 self
172 }
173
174 pub fn with_puffin_metadata_cache(
175 mut self,
176 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
177 ) -> Self {
178 self.puffin_metadata_cache = puffin_metadata_cache;
179 self
180 }
181
182 pub fn with_bloom_filter_cache(
183 mut self,
184 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
185 ) -> Self {
186 self.bloom_filter_index_cache = bloom_filter_index_cache;
187 self
188 }
189
190 #[tracing::instrument(
205 skip_all,
206 fields(file_id = %file_id)
207 )]
208 pub async fn apply(
209 &self,
210 file_id: RegionIndexId,
211 file_size_hint: Option<u64>,
212 predicates: &BTreeMap<ColumnId, Vec<InListPredicate>>,
213 row_groups: impl Iterator<Item = (usize, bool)>,
214 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
215 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
216 let apply_start = Instant::now();
217
218 let mut input = Vec::with_capacity(row_groups.size_hint().0);
220 let mut start = 0;
221 for (i, (len, to_search)) in row_groups.enumerate() {
222 let end = start + len;
223 if to_search {
224 input.push((i, start..end));
225 }
226 start = end;
227 }
228
229 let mut output = input
232 .iter()
233 .map(|(i, range)| (*i, vec![range.clone()]))
234 .collect::<Vec<_>>();
235
236 for (column_id, predicates) in predicates {
237 let blob = match self
238 .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
239 .await?
240 {
241 Some(blob) => blob,
242 None => continue,
243 };
244
245 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
247 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
248 if let Some(m) = &mut metrics {
249 m.blob_read_bytes += blob_size;
250 }
251 let reader = CachedBloomFilterIndexBlobReader::new(
252 file_id.file_id(),
253 file_id.version,
254 *column_id,
255 Tag::Skipping,
256 blob_size,
257 BloomFilterReaderImpl::new(blob),
258 bloom_filter_cache.clone(),
259 );
260 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
261 .await
262 .context(ApplyBloomFilterIndexSnafu)?;
263 } else {
264 let reader = BloomFilterReaderImpl::new(blob);
265 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
266 .await
267 .context(ApplyBloomFilterIndexSnafu)?;
268 }
269 }
270
271 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
273 let start = input.start;
274 for range in output.iter_mut() {
275 range.start -= start;
276 range.end -= start;
277 }
278 }
279
280 let elapsed = apply_start.elapsed();
282 INDEX_APPLY_ELAPSED
283 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
284 .observe(elapsed.as_secs_f64());
285
286 if let Some(m) = metrics {
287 m.apply_elapsed += elapsed;
288 }
289
290 Ok(output)
291 }
292
293 async fn blob_reader(
297 &self,
298 file_id: RegionIndexId,
299 column_id: ColumnId,
300 file_size_hint: Option<u64>,
301 metrics: Option<&mut BloomFilterIndexApplyMetrics>,
302 ) -> Result<Option<BlobReader>> {
303 let reader = match self
304 .cached_blob_reader(file_id, column_id, file_size_hint)
305 .await
306 {
307 Ok(Some(puffin_reader)) => puffin_reader,
308 other => {
309 if let Some(m) = metrics {
310 m.blob_cache_miss += 1;
311 }
312 if let Err(err) = other {
313 if is_blob_not_found(&err) {
315 return Ok(None);
316 }
317 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
318 }
319 let res = self
320 .remote_blob_reader(file_id, column_id, file_size_hint)
321 .await;
322 if let Err(err) = res {
323 if is_blob_not_found(&err) {
325 return Ok(None);
326 }
327 return Err(err);
328 }
329
330 res?
331 }
332 };
333
334 Ok(Some(reader))
335 }
336
337 async fn cached_blob_reader(
339 &self,
340 file_id: RegionIndexId,
341 column_id: ColumnId,
342 file_size_hint: Option<u64>,
343 ) -> Result<Option<BlobReader>> {
344 let Some(file_cache) = &self.file_cache else {
345 return Ok(None);
346 };
347
348 let index_key = IndexKey::new(
349 file_id.region_id(),
350 file_id.file_id(),
351 FileType::Puffin(file_id.version),
352 );
353 if file_cache.get(index_key).await.is_none() {
354 return Ok(None);
355 };
356
357 let puffin_manager = self.puffin_manager_factory.build(
358 file_cache.local_store(),
359 WriteCachePathProvider::new(file_cache.clone()),
360 );
361 let blob_name = Self::column_blob_name(column_id);
362
363 let reader = puffin_manager
364 .reader(&file_id)
365 .await
366 .context(PuffinBuildReaderSnafu)?
367 .with_file_size_hint(file_size_hint)
368 .blob(&blob_name)
369 .await
370 .context(PuffinReadBlobSnafu)?
371 .reader()
372 .await
373 .context(PuffinBuildReaderSnafu)?;
374 Ok(Some(reader))
375 }
376
377 fn column_blob_name(column_id: ColumnId) -> String {
379 format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
380 }
381
382 async fn remote_blob_reader(
384 &self,
385 file_id: RegionIndexId,
386 column_id: ColumnId,
387 file_size_hint: Option<u64>,
388 ) -> Result<BlobReader> {
389 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
390
391 trigger_index_background_download(
393 self.file_cache.as_ref(),
394 &file_id,
395 file_size_hint,
396 &path_factory,
397 &self.object_store,
398 );
399
400 let puffin_manager = self
401 .puffin_manager_factory
402 .build(self.object_store.clone(), path_factory)
403 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
404
405 let blob_name = Self::column_blob_name(column_id);
406
407 puffin_manager
408 .reader(&file_id)
409 .await
410 .context(PuffinBuildReaderSnafu)?
411 .with_file_size_hint(file_size_hint)
412 .blob(&blob_name)
413 .await
414 .context(PuffinReadBlobSnafu)?
415 .reader()
416 .await
417 .context(PuffinBuildReaderSnafu)
418 }
419
420 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
421 &self,
422 reader: R,
423 predicates: &[InListPredicate],
424 output: &mut [(usize, Vec<Range<usize>>)],
425 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
426 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
427 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
428
429 for (_, row_group_output) in output.iter_mut() {
430 if row_group_output.is_empty() {
432 continue;
433 }
434
435 let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
436 *row_group_output = applier
437 .search(predicates, row_group_output, read_metrics)
438 .await?;
439 }
440
441 Ok(())
442 }
443
444 pub fn compatible_predicate_for_sst(
448 &self,
449 sst_metadata: &RegionMetadataRef,
450 ) -> Option<Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>> {
451 let mut has_type_mismatch = false;
452 let mut compatible_col_ids = Vec::new();
453
454 for (col_id, expected) in &self.expected_predicate_col_types {
455 let Some(sst_col) = sst_metadata.column_by_id(*col_id) else {
456 has_type_mismatch = true;
457 continue;
458 };
459
460 if sst_col.column_schema.data_type != *expected {
461 has_type_mismatch = true;
462 continue;
463 }
464
465 compatible_col_ids.push(*col_id);
466 }
467
468 if compatible_col_ids.is_empty() {
469 return None;
470 }
471
472 if !has_type_mismatch {
473 return Some(self.default_predicates.clone());
474 }
475
476 let mut compatible_predicates = BTreeMap::new();
477 for col_id in compatible_col_ids {
478 if let Some(predicates) = self.default_predicates.get(&col_id) {
479 compatible_predicates.insert(col_id, predicates.clone());
480 }
481 }
482
483 Some(Arc::new(compatible_predicates))
484 }
485}
486
487fn is_blob_not_found(err: &Error) -> bool {
488 matches!(
489 err,
490 Error::PuffinReadBlob {
491 source: puffin::error::Error::BlobNotFound { .. },
492 ..
493 }
494 )
495}
496
497#[cfg(test)]
498mod tests {
499 use std::collections::BTreeSet;
500
501 use datafusion_expr::{Expr, col, lit};
502 use futures::future::BoxFuture;
503 use index::Bytes;
504 use object_store::services::Memory;
505 use puffin::puffin_manager::PuffinWriter;
506 use store_api::metadata::RegionMetadata;
507 use store_api::storage::FileId;
508
509 use super::*;
510 use crate::sst::file::RegionFileId;
511 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
512 use crate::sst::index::bloom_filter::creator::tests::{
513 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
514 };
515
516 #[tokio::test]
517 async fn test_compatible_predicate_for_sst() {
518 let (_d, puffin_manager_factory) =
519 PuffinManagerFactory::new_for_test_async("test_plan_for_sst_basic_").await;
520 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
521 let table_dir = "table_dir".to_string();
522
523 let predicates = BTreeMap::from_iter([(
524 1,
525 vec![InListPredicate {
526 list: BTreeSet::from_iter([Bytes::from("foo")]),
527 }],
528 )]);
529 let expected_predicate_col_types =
530 BTreeMap::from_iter([(1, ConcreteDataType::string_datatype())]);
531
532 let applier = BloomFilterIndexApplier::new(
533 table_dir,
534 PathType::Bare,
535 object_store,
536 puffin_manager_factory,
537 predicates,
538 expected_predicate_col_types,
539 );
540 let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
541 assert!(predicates.is_some());
542 }
543
544 #[tokio::test]
545 async fn test_compatible_predicate_for_sst_type_mismatch() {
546 let (_d, puffin_manager_factory) =
547 PuffinManagerFactory::new_for_test_async("test_plan_for_sst_type_mismatch_").await;
548 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
549 let table_dir = "table_dir".to_string();
550
551 let predicates = BTreeMap::from_iter([(
552 1,
553 vec![InListPredicate {
554 list: BTreeSet::from_iter([Bytes::from("foo")]),
555 }],
556 )]);
557 let expected_predicate_col_types =
558 BTreeMap::from_iter([(1, ConcreteDataType::int64_datatype())]);
559
560 let applier = BloomFilterIndexApplier::new(
561 table_dir,
562 PathType::Bare,
563 object_store,
564 puffin_manager_factory,
565 predicates,
566 expected_predicate_col_types,
567 );
568 let predicates = applier.compatible_predicate_for_sst(&mock_region_metadata());
569 assert!(predicates.is_none());
570 }
571
572 #[tokio::test]
573 async fn test_compatible_predicate_for_sst_partial_type_mismatch() {
574 let (_d, puffin_manager_factory) =
575 PuffinManagerFactory::new_for_test_async("test_plan_for_sst_partial_mismatch_").await;
576 let object_store = ObjectStore::new(Memory::default()).unwrap().finish();
577 let table_dir = "table_dir".to_string();
578
579 let predicates = BTreeMap::from_iter([
582 (
583 1,
584 vec![InListPredicate {
585 list: BTreeSet::from_iter([Bytes::from("foo")]),
586 }],
587 ),
588 (
589 3,
590 vec![InListPredicate {
591 list: BTreeSet::from_iter([Bytes::from("bar")]),
592 }],
593 ),
594 ]);
595 let expected_predicate_col_types = BTreeMap::from_iter([
596 (1, ConcreteDataType::string_datatype()),
597 (3, ConcreteDataType::int64_datatype()), ]);
599
600 let applier = BloomFilterIndexApplier::new(
601 table_dir,
602 PathType::Bare,
603 object_store,
604 puffin_manager_factory,
605 predicates,
606 expected_predicate_col_types,
607 );
608 let result = applier.compatible_predicate_for_sst(&mock_region_metadata());
609
610 let result = result.expect("expected Some with compatible subset");
612 assert!(
613 result.contains_key(&1),
614 "compatible column 1 must be present"
615 );
616 assert!(
617 !result.contains_key(&3),
618 "mismatched column 3 must be absent"
619 );
620 assert_eq!(result.len(), 1, "only the compatible predicate must remain");
621 }
622
623 #[allow(clippy::type_complexity)]
624 fn tester(
625 table_dir: String,
626 object_store: ObjectStore,
627 metadata: &RegionMetadata,
628 puffin_manager_factory: PuffinManagerFactory,
629 file_id: RegionIndexId,
630 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
631 + use<'_> {
632 move |exprs, row_groups| {
633 let table_dir = table_dir.clone();
634 let object_store: ObjectStore = object_store.clone();
635 let metadata = metadata.clone();
636 let puffin_manager_factory = puffin_manager_factory.clone();
637 let exprs = exprs.to_vec();
638
639 Box::pin(async move {
640 let builder = BloomFilterIndexApplierBuilder::new(
641 table_dir,
642 PathType::Bare,
643 object_store,
644 &metadata,
645 puffin_manager_factory,
646 );
647
648 let applier = builder.build(&exprs).unwrap().unwrap();
649 let predicates = applier
650 .compatible_predicate_for_sst(&Arc::new(metadata.clone()))
651 .unwrap();
652 applier
653 .apply(file_id, None, &predicates, row_groups.into_iter(), None)
654 .await
655 .unwrap()
656 .into_iter()
657 .filter(|(_, ranges)| !ranges.is_empty())
658 .collect()
659 })
660 }
661 }
662
663 #[tokio::test]
664 #[allow(clippy::single_range_in_vec_init)]
665 async fn test_bloom_filter_applier() {
666 let region_metadata = mock_region_metadata();
683 let prefix = "test_bloom_filter_applier_";
684 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
685 let object_store = mock_object_store();
686 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
687 let memory_usage_threshold = Some(1024);
688 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
689 let file_id = RegionIndexId::new(file_id, 0);
690 let table_dir = "table_dir".to_string();
691
692 let mut indexer = BloomFilterIndexer::new(
693 file_id.file_id(),
694 ®ion_metadata,
695 intm_mgr,
696 memory_usage_threshold,
697 )
698 .unwrap()
699 .unwrap();
700
701 let mut batch = new_batch("tag1", 0..10);
703 indexer.update(&mut batch).await.unwrap();
704 let mut batch = new_batch("tag2", 10..20);
705 indexer.update(&mut batch).await.unwrap();
706
707 let puffin_manager = factory.build(
708 object_store.clone(),
709 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
710 );
711
712 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
713 indexer.finish(&mut puffin_writer).await.unwrap();
714 puffin_writer.finish().await.unwrap();
715
716 let tester = tester(
717 table_dir.clone(),
718 object_store.clone(),
719 ®ion_metadata,
720 factory.clone(),
721 file_id,
722 );
723
724 let res = tester(
728 &[col("tag_str").eq(lit("tag1"))],
729 vec![(5, true), (5, true), (5, true), (5, true)],
730 )
731 .await;
732 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
733
734 let res = tester(
738 &[col("tag_str").eq(lit("tag1"))],
739 vec![(5, true), (5, false), (5, true), (5, true)],
740 )
741 .await;
742 assert_eq!(res, vec![(0, vec![0..5])]);
743
744 let res = tester(
749 &[
750 col("tag_str").eq(lit("tag1")),
751 col("field_u64").eq(lit(1u64)),
752 ],
753 vec![(5, true), (5, true), (5, true), (5, true)],
754 )
755 .await;
756 assert_eq!(res, vec![(0, vec![0..4])]);
757
758 let res = tester(
762 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
763 vec![(5, true), (5, true), (5, false), (5, true)],
764 )
765 .await;
766 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
767 }
768}