1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::warn;
24use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
25use index::bloom_filter::reader::{
26 BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
27};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
44 Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
49pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
50use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
51use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
52
53#[derive(Default, Clone)]
55pub struct BloomFilterIndexApplyMetrics {
56 pub apply_elapsed: std::time::Duration,
58 pub blob_cache_miss: usize,
60 pub blob_read_bytes: u64,
62 pub read_metrics: BloomFilterReadMetrics,
64}
65
66impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 let Self {
69 apply_elapsed,
70 blob_cache_miss,
71 blob_read_bytes,
72 read_metrics,
73 } = self;
74
75 if self.is_empty() {
76 return write!(f, "{{}}");
77 }
78 write!(f, "{{")?;
79
80 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
81
82 if *blob_cache_miss > 0 {
83 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
84 }
85 if *blob_read_bytes > 0 {
86 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
87 }
88 write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
89
90 write!(f, "}}")
91 }
92}
93
94impl BloomFilterIndexApplyMetrics {
95 pub fn is_empty(&self) -> bool {
97 self.apply_elapsed.is_zero()
98 }
99
100 pub fn merge_from(&mut self, other: &Self) {
102 self.apply_elapsed += other.apply_elapsed;
103 self.blob_cache_miss += other.blob_cache_miss;
104 self.blob_read_bytes += other.blob_read_bytes;
105 self.read_metrics.merge_from(&other.read_metrics);
106 }
107}
108
109pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
110
111pub struct BloomFilterIndexApplier {
113 table_dir: String,
115
116 path_type: PathType,
118
119 object_store: ObjectStore,
121
122 file_cache: Option<FileCacheRef>,
124
125 puffin_manager_factory: PuffinManagerFactory,
127
128 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
130
131 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
133
134 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
137
138 predicate_key: PredicateKey,
140}
141
142impl BloomFilterIndexApplier {
143 pub fn new(
147 table_dir: String,
148 path_type: PathType,
149 object_store: ObjectStore,
150 puffin_manager_factory: PuffinManagerFactory,
151 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
152 ) -> Self {
153 let predicates = Arc::new(predicates);
154 Self {
155 table_dir,
156 path_type,
157 object_store,
158 file_cache: None,
159 puffin_manager_factory,
160 puffin_metadata_cache: None,
161 bloom_filter_index_cache: None,
162 predicate_key: PredicateKey::new_bloom(predicates.clone()),
163 predicates,
164 }
165 }
166
167 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
168 self.file_cache = file_cache;
169 self
170 }
171
172 pub fn with_puffin_metadata_cache(
173 mut self,
174 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
175 ) -> Self {
176 self.puffin_metadata_cache = puffin_metadata_cache;
177 self
178 }
179
180 pub fn with_bloom_filter_cache(
181 mut self,
182 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
183 ) -> Self {
184 self.bloom_filter_index_cache = bloom_filter_index_cache;
185 self
186 }
187
188 pub async fn apply(
202 &self,
203 file_id: RegionIndexId,
204 file_size_hint: Option<u64>,
205 row_groups: impl Iterator<Item = (usize, bool)>,
206 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
207 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
208 let apply_start = Instant::now();
209
210 let mut input = Vec::with_capacity(row_groups.size_hint().0);
212 let mut start = 0;
213 for (i, (len, to_search)) in row_groups.enumerate() {
214 let end = start + len;
215 if to_search {
216 input.push((i, start..end));
217 }
218 start = end;
219 }
220
221 let mut output = input
224 .iter()
225 .map(|(i, range)| (*i, vec![range.clone()]))
226 .collect::<Vec<_>>();
227
228 for (column_id, predicates) in self.predicates.iter() {
229 let blob = match self
230 .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
231 .await?
232 {
233 Some(blob) => blob,
234 None => continue,
235 };
236
237 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
239 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
240 if let Some(m) = &mut metrics {
241 m.blob_read_bytes += blob_size;
242 }
243 let reader = CachedBloomFilterIndexBlobReader::new(
244 file_id.file_id(),
245 file_id.version,
246 *column_id,
247 Tag::Skipping,
248 blob_size,
249 BloomFilterReaderImpl::new(blob),
250 bloom_filter_cache.clone(),
251 );
252 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
253 .await
254 .context(ApplyBloomFilterIndexSnafu)?;
255 } else {
256 let reader = BloomFilterReaderImpl::new(blob);
257 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
258 .await
259 .context(ApplyBloomFilterIndexSnafu)?;
260 }
261 }
262
263 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
265 let start = input.start;
266 for range in output.iter_mut() {
267 range.start -= start;
268 range.end -= start;
269 }
270 }
271
272 let elapsed = apply_start.elapsed();
274 INDEX_APPLY_ELAPSED
275 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
276 .observe(elapsed.as_secs_f64());
277
278 if let Some(m) = metrics {
279 m.apply_elapsed += elapsed;
280 }
281
282 Ok(output)
283 }
284
285 async fn blob_reader(
289 &self,
290 file_id: RegionIndexId,
291 column_id: ColumnId,
292 file_size_hint: Option<u64>,
293 metrics: Option<&mut BloomFilterIndexApplyMetrics>,
294 ) -> Result<Option<BlobReader>> {
295 let reader = match self
296 .cached_blob_reader(file_id, column_id, file_size_hint)
297 .await
298 {
299 Ok(Some(puffin_reader)) => puffin_reader,
300 other => {
301 if let Some(m) = metrics {
302 m.blob_cache_miss += 1;
303 }
304 if let Err(err) = other {
305 if is_blob_not_found(&err) {
307 return Ok(None);
308 }
309 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
310 }
311 let res = self
312 .remote_blob_reader(file_id, column_id, file_size_hint)
313 .await;
314 if let Err(err) = res {
315 if is_blob_not_found(&err) {
317 return Ok(None);
318 }
319 return Err(err);
320 }
321
322 res?
323 }
324 };
325
326 Ok(Some(reader))
327 }
328
329 async fn cached_blob_reader(
331 &self,
332 file_id: RegionIndexId,
333 column_id: ColumnId,
334 file_size_hint: Option<u64>,
335 ) -> Result<Option<BlobReader>> {
336 let Some(file_cache) = &self.file_cache else {
337 return Ok(None);
338 };
339
340 let index_key = IndexKey::new(
341 file_id.region_id(),
342 file_id.file_id(),
343 FileType::Puffin(file_id.version),
344 );
345 if file_cache.get(index_key).await.is_none() {
346 return Ok(None);
347 };
348
349 let puffin_manager = self.puffin_manager_factory.build(
350 file_cache.local_store(),
351 WriteCachePathProvider::new(file_cache.clone()),
352 );
353 let blob_name = Self::column_blob_name(column_id);
354
355 let reader = puffin_manager
356 .reader(&file_id)
357 .await
358 .context(PuffinBuildReaderSnafu)?
359 .with_file_size_hint(file_size_hint)
360 .blob(&blob_name)
361 .await
362 .context(PuffinReadBlobSnafu)?
363 .reader()
364 .await
365 .context(PuffinBuildReaderSnafu)?;
366 Ok(Some(reader))
367 }
368
369 fn column_blob_name(column_id: ColumnId) -> String {
371 format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
372 }
373
374 async fn remote_blob_reader(
376 &self,
377 file_id: RegionIndexId,
378 column_id: ColumnId,
379 file_size_hint: Option<u64>,
380 ) -> Result<BlobReader> {
381 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
382
383 trigger_index_background_download(
385 self.file_cache.as_ref(),
386 &file_id,
387 file_size_hint,
388 &path_factory,
389 &self.object_store,
390 );
391
392 let puffin_manager = self
393 .puffin_manager_factory
394 .build(self.object_store.clone(), path_factory)
395 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
396
397 let blob_name = Self::column_blob_name(column_id);
398
399 puffin_manager
400 .reader(&file_id)
401 .await
402 .context(PuffinBuildReaderSnafu)?
403 .with_file_size_hint(file_size_hint)
404 .blob(&blob_name)
405 .await
406 .context(PuffinReadBlobSnafu)?
407 .reader()
408 .await
409 .context(PuffinBuildReaderSnafu)
410 }
411
412 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
413 &self,
414 reader: R,
415 predicates: &[InListPredicate],
416 output: &mut [(usize, Vec<Range<usize>>)],
417 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
418 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
419 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
420
421 for (_, row_group_output) in output.iter_mut() {
422 if row_group_output.is_empty() {
424 continue;
425 }
426
427 let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
428 *row_group_output = applier
429 .search(predicates, row_group_output, read_metrics)
430 .await?;
431 }
432
433 Ok(())
434 }
435
436 pub fn predicate_key(&self) -> &PredicateKey {
438 &self.predicate_key
439 }
440}
441
442fn is_blob_not_found(err: &Error) -> bool {
443 matches!(
444 err,
445 Error::PuffinReadBlob {
446 source: puffin::error::Error::BlobNotFound { .. },
447 ..
448 }
449 )
450}
451
452#[cfg(test)]
453mod tests {
454
455 use datafusion_expr::{Expr, col, lit};
456 use futures::future::BoxFuture;
457 use puffin::puffin_manager::PuffinWriter;
458 use store_api::metadata::RegionMetadata;
459 use store_api::storage::FileId;
460
461 use super::*;
462 use crate::sst::file::RegionFileId;
463 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
464 use crate::sst::index::bloom_filter::creator::tests::{
465 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
466 };
467
468 #[allow(clippy::type_complexity)]
469 fn tester(
470 table_dir: String,
471 object_store: ObjectStore,
472 metadata: &RegionMetadata,
473 puffin_manager_factory: PuffinManagerFactory,
474 file_id: RegionIndexId,
475 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
476 + use<'_> {
477 move |exprs, row_groups| {
478 let table_dir = table_dir.clone();
479 let object_store: ObjectStore = object_store.clone();
480 let metadata = metadata.clone();
481 let puffin_manager_factory = puffin_manager_factory.clone();
482 let exprs = exprs.to_vec();
483
484 Box::pin(async move {
485 let builder = BloomFilterIndexApplierBuilder::new(
486 table_dir,
487 PathType::Bare,
488 object_store,
489 &metadata,
490 puffin_manager_factory,
491 );
492
493 let applier = builder.build(&exprs).unwrap().unwrap();
494 applier
495 .apply(file_id, None, row_groups.into_iter(), None)
496 .await
497 .unwrap()
498 .into_iter()
499 .filter(|(_, ranges)| !ranges.is_empty())
500 .collect()
501 })
502 }
503 }
504
505 #[tokio::test]
506 #[allow(clippy::single_range_in_vec_init)]
507 async fn test_bloom_filter_applier() {
508 let region_metadata = mock_region_metadata();
525 let prefix = "test_bloom_filter_applier_";
526 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
527 let object_store = mock_object_store();
528 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
529 let memory_usage_threshold = Some(1024);
530 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
531 let file_id = RegionIndexId::new(file_id, 0);
532 let table_dir = "table_dir".to_string();
533
534 let mut indexer = BloomFilterIndexer::new(
535 file_id.file_id(),
536 ®ion_metadata,
537 intm_mgr,
538 memory_usage_threshold,
539 )
540 .unwrap()
541 .unwrap();
542
543 let mut batch = new_batch("tag1", 0..10);
545 indexer.update(&mut batch).await.unwrap();
546 let mut batch = new_batch("tag2", 10..20);
547 indexer.update(&mut batch).await.unwrap();
548
549 let puffin_manager = factory.build(
550 object_store.clone(),
551 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
552 );
553
554 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
555 indexer.finish(&mut puffin_writer).await.unwrap();
556 puffin_writer.finish().await.unwrap();
557
558 let tester = tester(
559 table_dir.clone(),
560 object_store.clone(),
561 ®ion_metadata,
562 factory.clone(),
563 file_id,
564 );
565
566 let res = tester(
570 &[col("tag_str").eq(lit("tag1"))],
571 vec![(5, true), (5, true), (5, true), (5, true)],
572 )
573 .await;
574 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
575
576 let res = tester(
580 &[col("tag_str").eq(lit("tag1"))],
581 vec![(5, true), (5, false), (5, true), (5, true)],
582 )
583 .await;
584 assert_eq!(res, vec![(0, vec![0..5])]);
585
586 let res = tester(
591 &[
592 col("tag_str").eq(lit("tag1")),
593 col("field_u64").eq(lit(1u64)),
594 ],
595 vec![(5, true), (5, true), (5, true), (5, true)],
596 )
597 .await;
598 assert_eq!(res, vec![(0, vec![0..4])]);
599
600 let res = tester(
604 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
605 vec![(5, true), (5, true), (5, false), (5, true)],
606 )
607 .await;
608 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
609 }
610}