1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::{tracing, warn};
24use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
25use index::bloom_filter::reader::{
26 BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
27};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
44 Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
49pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
50use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
51use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, trigger_index_background_download};
52
53#[derive(Default, Clone)]
55pub struct BloomFilterIndexApplyMetrics {
56 pub apply_elapsed: std::time::Duration,
58 pub blob_cache_miss: usize,
60 pub blob_read_bytes: u64,
62 pub read_metrics: BloomFilterReadMetrics,
64}
65
66impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 let Self {
69 apply_elapsed,
70 blob_cache_miss,
71 blob_read_bytes,
72 read_metrics,
73 } = self;
74
75 if self.is_empty() {
76 return write!(f, "{{}}");
77 }
78 write!(f, "{{")?;
79
80 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
81
82 if *blob_cache_miss > 0 {
83 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
84 }
85 if *blob_read_bytes > 0 {
86 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
87 }
88 write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
89
90 write!(f, "}}")
91 }
92}
93
94impl BloomFilterIndexApplyMetrics {
95 pub fn is_empty(&self) -> bool {
97 self.apply_elapsed.is_zero()
98 }
99
100 pub fn merge_from(&mut self, other: &Self) {
102 self.apply_elapsed += other.apply_elapsed;
103 self.blob_cache_miss += other.blob_cache_miss;
104 self.blob_read_bytes += other.blob_read_bytes;
105 self.read_metrics.merge_from(&other.read_metrics);
106 }
107}
108
109pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
110
111pub struct BloomFilterIndexApplier {
113 table_dir: String,
115
116 path_type: PathType,
118
119 object_store: ObjectStore,
121
122 file_cache: Option<FileCacheRef>,
124
125 puffin_manager_factory: PuffinManagerFactory,
127
128 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
130
131 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
133
134 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
137
138 predicate_key: PredicateKey,
140}
141
142impl BloomFilterIndexApplier {
143 pub fn new(
147 table_dir: String,
148 path_type: PathType,
149 object_store: ObjectStore,
150 puffin_manager_factory: PuffinManagerFactory,
151 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
152 ) -> Self {
153 let predicates = Arc::new(predicates);
154 Self {
155 table_dir,
156 path_type,
157 object_store,
158 file_cache: None,
159 puffin_manager_factory,
160 puffin_metadata_cache: None,
161 bloom_filter_index_cache: None,
162 predicate_key: PredicateKey::new_bloom(predicates.clone()),
163 predicates,
164 }
165 }
166
167 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
168 self.file_cache = file_cache;
169 self
170 }
171
172 pub fn with_puffin_metadata_cache(
173 mut self,
174 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
175 ) -> Self {
176 self.puffin_metadata_cache = puffin_metadata_cache;
177 self
178 }
179
180 pub fn with_bloom_filter_cache(
181 mut self,
182 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
183 ) -> Self {
184 self.bloom_filter_index_cache = bloom_filter_index_cache;
185 self
186 }
187
188 #[tracing::instrument(
203 skip_all,
204 fields(file_id = %file_id)
205 )]
206 pub async fn apply(
207 &self,
208 file_id: RegionIndexId,
209 file_size_hint: Option<u64>,
210 row_groups: impl Iterator<Item = (usize, bool)>,
211 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
212 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
213 let apply_start = Instant::now();
214
215 let mut input = Vec::with_capacity(row_groups.size_hint().0);
217 let mut start = 0;
218 for (i, (len, to_search)) in row_groups.enumerate() {
219 let end = start + len;
220 if to_search {
221 input.push((i, start..end));
222 }
223 start = end;
224 }
225
226 let mut output = input
229 .iter()
230 .map(|(i, range)| (*i, vec![range.clone()]))
231 .collect::<Vec<_>>();
232
233 for (column_id, predicates) in self.predicates.iter() {
234 let blob = match self
235 .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
236 .await?
237 {
238 Some(blob) => blob,
239 None => continue,
240 };
241
242 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
244 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
245 if let Some(m) = &mut metrics {
246 m.blob_read_bytes += blob_size;
247 }
248 let reader = CachedBloomFilterIndexBlobReader::new(
249 file_id.file_id(),
250 file_id.version,
251 *column_id,
252 Tag::Skipping,
253 blob_size,
254 BloomFilterReaderImpl::new(blob),
255 bloom_filter_cache.clone(),
256 );
257 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
258 .await
259 .context(ApplyBloomFilterIndexSnafu)?;
260 } else {
261 let reader = BloomFilterReaderImpl::new(blob);
262 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
263 .await
264 .context(ApplyBloomFilterIndexSnafu)?;
265 }
266 }
267
268 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
270 let start = input.start;
271 for range in output.iter_mut() {
272 range.start -= start;
273 range.end -= start;
274 }
275 }
276
277 let elapsed = apply_start.elapsed();
279 INDEX_APPLY_ELAPSED
280 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
281 .observe(elapsed.as_secs_f64());
282
283 if let Some(m) = metrics {
284 m.apply_elapsed += elapsed;
285 }
286
287 Ok(output)
288 }
289
290 async fn blob_reader(
294 &self,
295 file_id: RegionIndexId,
296 column_id: ColumnId,
297 file_size_hint: Option<u64>,
298 metrics: Option<&mut BloomFilterIndexApplyMetrics>,
299 ) -> Result<Option<BlobReader>> {
300 let reader = match self
301 .cached_blob_reader(file_id, column_id, file_size_hint)
302 .await
303 {
304 Ok(Some(puffin_reader)) => puffin_reader,
305 other => {
306 if let Some(m) = metrics {
307 m.blob_cache_miss += 1;
308 }
309 if let Err(err) = other {
310 if is_blob_not_found(&err) {
312 return Ok(None);
313 }
314 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
315 }
316 let res = self
317 .remote_blob_reader(file_id, column_id, file_size_hint)
318 .await;
319 if let Err(err) = res {
320 if is_blob_not_found(&err) {
322 return Ok(None);
323 }
324 return Err(err);
325 }
326
327 res?
328 }
329 };
330
331 Ok(Some(reader))
332 }
333
334 async fn cached_blob_reader(
336 &self,
337 file_id: RegionIndexId,
338 column_id: ColumnId,
339 file_size_hint: Option<u64>,
340 ) -> Result<Option<BlobReader>> {
341 let Some(file_cache) = &self.file_cache else {
342 return Ok(None);
343 };
344
345 let index_key = IndexKey::new(
346 file_id.region_id(),
347 file_id.file_id(),
348 FileType::Puffin(file_id.version),
349 );
350 if file_cache.get(index_key).await.is_none() {
351 return Ok(None);
352 };
353
354 let puffin_manager = self.puffin_manager_factory.build(
355 file_cache.local_store(),
356 WriteCachePathProvider::new(file_cache.clone()),
357 );
358 let blob_name = Self::column_blob_name(column_id);
359
360 let reader = puffin_manager
361 .reader(&file_id)
362 .await
363 .context(PuffinBuildReaderSnafu)?
364 .with_file_size_hint(file_size_hint)
365 .blob(&blob_name)
366 .await
367 .context(PuffinReadBlobSnafu)?
368 .reader()
369 .await
370 .context(PuffinBuildReaderSnafu)?;
371 Ok(Some(reader))
372 }
373
374 fn column_blob_name(column_id: ColumnId) -> String {
376 format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
377 }
378
379 async fn remote_blob_reader(
381 &self,
382 file_id: RegionIndexId,
383 column_id: ColumnId,
384 file_size_hint: Option<u64>,
385 ) -> Result<BlobReader> {
386 let path_factory = RegionFilePathFactory::new(self.table_dir.clone(), self.path_type);
387
388 trigger_index_background_download(
390 self.file_cache.as_ref(),
391 &file_id,
392 file_size_hint,
393 &path_factory,
394 &self.object_store,
395 );
396
397 let puffin_manager = self
398 .puffin_manager_factory
399 .build(self.object_store.clone(), path_factory)
400 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
401
402 let blob_name = Self::column_blob_name(column_id);
403
404 puffin_manager
405 .reader(&file_id)
406 .await
407 .context(PuffinBuildReaderSnafu)?
408 .with_file_size_hint(file_size_hint)
409 .blob(&blob_name)
410 .await
411 .context(PuffinReadBlobSnafu)?
412 .reader()
413 .await
414 .context(PuffinBuildReaderSnafu)
415 }
416
417 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
418 &self,
419 reader: R,
420 predicates: &[InListPredicate],
421 output: &mut [(usize, Vec<Range<usize>>)],
422 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
423 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
424 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
425
426 for (_, row_group_output) in output.iter_mut() {
427 if row_group_output.is_empty() {
429 continue;
430 }
431
432 let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
433 *row_group_output = applier
434 .search(predicates, row_group_output, read_metrics)
435 .await?;
436 }
437
438 Ok(())
439 }
440
441 pub fn predicate_key(&self) -> &PredicateKey {
443 &self.predicate_key
444 }
445}
446
447fn is_blob_not_found(err: &Error) -> bool {
448 matches!(
449 err,
450 Error::PuffinReadBlob {
451 source: puffin::error::Error::BlobNotFound { .. },
452 ..
453 }
454 )
455}
456
457#[cfg(test)]
458mod tests {
459
460 use datafusion_expr::{Expr, col, lit};
461 use futures::future::BoxFuture;
462 use puffin::puffin_manager::PuffinWriter;
463 use store_api::metadata::RegionMetadata;
464 use store_api::storage::FileId;
465
466 use super::*;
467 use crate::sst::file::RegionFileId;
468 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
469 use crate::sst::index::bloom_filter::creator::tests::{
470 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
471 };
472
473 #[allow(clippy::type_complexity)]
474 fn tester(
475 table_dir: String,
476 object_store: ObjectStore,
477 metadata: &RegionMetadata,
478 puffin_manager_factory: PuffinManagerFactory,
479 file_id: RegionIndexId,
480 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
481 + use<'_> {
482 move |exprs, row_groups| {
483 let table_dir = table_dir.clone();
484 let object_store: ObjectStore = object_store.clone();
485 let metadata = metadata.clone();
486 let puffin_manager_factory = puffin_manager_factory.clone();
487 let exprs = exprs.to_vec();
488
489 Box::pin(async move {
490 let builder = BloomFilterIndexApplierBuilder::new(
491 table_dir,
492 PathType::Bare,
493 object_store,
494 &metadata,
495 puffin_manager_factory,
496 );
497
498 let applier = builder.build(&exprs).unwrap().unwrap();
499 applier
500 .apply(file_id, None, row_groups.into_iter(), None)
501 .await
502 .unwrap()
503 .into_iter()
504 .filter(|(_, ranges)| !ranges.is_empty())
505 .collect()
506 })
507 }
508 }
509
510 #[tokio::test]
511 #[allow(clippy::single_range_in_vec_init)]
512 async fn test_bloom_filter_applier() {
513 let region_metadata = mock_region_metadata();
530 let prefix = "test_bloom_filter_applier_";
531 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
532 let object_store = mock_object_store();
533 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
534 let memory_usage_threshold = Some(1024);
535 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
536 let file_id = RegionIndexId::new(file_id, 0);
537 let table_dir = "table_dir".to_string();
538
539 let mut indexer = BloomFilterIndexer::new(
540 file_id.file_id(),
541 ®ion_metadata,
542 intm_mgr,
543 memory_usage_threshold,
544 )
545 .unwrap()
546 .unwrap();
547
548 let mut batch = new_batch("tag1", 0..10);
550 indexer.update(&mut batch).await.unwrap();
551 let mut batch = new_batch("tag2", 10..20);
552 indexer.update(&mut batch).await.unwrap();
553
554 let puffin_manager = factory.build(
555 object_store.clone(),
556 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
557 );
558
559 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
560 indexer.finish(&mut puffin_writer).await.unwrap();
561 puffin_writer.finish().await.unwrap();
562
563 let tester = tester(
564 table_dir.clone(),
565 object_store.clone(),
566 ®ion_metadata,
567 factory.clone(),
568 file_id,
569 );
570
571 let res = tester(
575 &[col("tag_str").eq(lit("tag1"))],
576 vec![(5, true), (5, true), (5, true), (5, true)],
577 )
578 .await;
579 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
580
581 let res = tester(
585 &[col("tag_str").eq(lit("tag1"))],
586 vec![(5, true), (5, false), (5, true), (5, true)],
587 )
588 .await;
589 assert_eq!(res, vec![(0, vec![0..5])]);
590
591 let res = tester(
596 &[
597 col("tag_str").eq(lit("tag1")),
598 col("field_u64").eq(lit(1u64)),
599 ],
600 vec![(5, true), (5, true), (5, true), (5, true)],
601 )
602 .await;
603 assert_eq!(res, vec![(0, vec![0..4])]);
604
605 let res = tester(
609 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
610 vec![(5, true), (5, true), (5, false), (5, true)],
611 )
612 .await;
613 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
614 }
615}