1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::Instant;
21
22use common_base::range_read::RangeReader;
23use common_telemetry::warn;
24use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
25use index::bloom_filter::reader::{
26 BloomFilterReadMetrics, BloomFilterReader, BloomFilterReaderImpl,
27};
28use index::target::IndexTarget;
29use object_store::ObjectStore;
30use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
31use puffin::puffin_manager::{PuffinManager, PuffinReader};
32use snafu::ResultExt;
33use store_api::region_request::PathType;
34use store_api::storage::ColumnId;
35
36use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
37use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
38use crate::cache::index::bloom_filter_index::{
39 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
40};
41use crate::cache::index::result_cache::PredicateKey;
42use crate::error::{
43 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
44 Result,
45};
46use crate::metrics::INDEX_APPLY_ELAPSED;
47use crate::sst::file::RegionIndexId;
48use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
49use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
50pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
51use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
52
53#[derive(Default, Clone)]
55pub struct BloomFilterIndexApplyMetrics {
56 pub apply_elapsed: std::time::Duration,
58 pub blob_cache_miss: usize,
60 pub blob_read_bytes: u64,
62 pub read_metrics: BloomFilterReadMetrics,
64}
65
66impl std::fmt::Debug for BloomFilterIndexApplyMetrics {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 let Self {
69 apply_elapsed,
70 blob_cache_miss,
71 blob_read_bytes,
72 read_metrics,
73 } = self;
74
75 if self.is_empty() {
76 return write!(f, "{{}}");
77 }
78 write!(f, "{{")?;
79
80 write!(f, "\"apply_elapsed\":\"{:?}\"", apply_elapsed)?;
81
82 if *blob_cache_miss > 0 {
83 write!(f, ", \"blob_cache_miss\":{}", blob_cache_miss)?;
84 }
85 if *blob_read_bytes > 0 {
86 write!(f, ", \"blob_read_bytes\":{}", blob_read_bytes)?;
87 }
88 write!(f, ", \"read_metrics\":{:?}", read_metrics)?;
89
90 write!(f, "}}")
91 }
92}
93
94impl BloomFilterIndexApplyMetrics {
95 pub fn is_empty(&self) -> bool {
97 self.apply_elapsed.is_zero()
98 }
99
100 pub fn merge_from(&mut self, other: &Self) {
102 self.apply_elapsed += other.apply_elapsed;
103 self.blob_cache_miss += other.blob_cache_miss;
104 self.blob_read_bytes += other.blob_read_bytes;
105 self.read_metrics.merge_from(&other.read_metrics);
106 }
107}
108
109pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
110
111pub struct BloomFilterIndexApplier {
113 table_dir: String,
115
116 path_type: PathType,
118
119 object_store: ObjectStore,
121
122 file_cache: Option<FileCacheRef>,
124
125 puffin_manager_factory: PuffinManagerFactory,
127
128 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
130
131 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
133
134 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
137
138 predicate_key: PredicateKey,
140}
141
142impl BloomFilterIndexApplier {
143 pub fn new(
147 table_dir: String,
148 path_type: PathType,
149 object_store: ObjectStore,
150 puffin_manager_factory: PuffinManagerFactory,
151 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
152 ) -> Self {
153 let predicates = Arc::new(predicates);
154 Self {
155 table_dir,
156 path_type,
157 object_store,
158 file_cache: None,
159 puffin_manager_factory,
160 puffin_metadata_cache: None,
161 bloom_filter_index_cache: None,
162 predicate_key: PredicateKey::new_bloom(predicates.clone()),
163 predicates,
164 }
165 }
166
167 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
168 self.file_cache = file_cache;
169 self
170 }
171
172 pub fn with_puffin_metadata_cache(
173 mut self,
174 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
175 ) -> Self {
176 self.puffin_metadata_cache = puffin_metadata_cache;
177 self
178 }
179
180 pub fn with_bloom_filter_cache(
181 mut self,
182 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
183 ) -> Self {
184 self.bloom_filter_index_cache = bloom_filter_index_cache;
185 self
186 }
187
188 pub async fn apply(
202 &self,
203 file_id: RegionIndexId,
204 file_size_hint: Option<u64>,
205 row_groups: impl Iterator<Item = (usize, bool)>,
206 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
207 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
208 let apply_start = Instant::now();
209
210 let mut input = Vec::with_capacity(row_groups.size_hint().0);
212 let mut start = 0;
213 for (i, (len, to_search)) in row_groups.enumerate() {
214 let end = start + len;
215 if to_search {
216 input.push((i, start..end));
217 }
218 start = end;
219 }
220
221 let mut output = input
224 .iter()
225 .map(|(i, range)| (*i, vec![range.clone()]))
226 .collect::<Vec<_>>();
227
228 for (column_id, predicates) in self.predicates.iter() {
229 let blob = match self
230 .blob_reader(file_id, *column_id, file_size_hint, metrics.as_deref_mut())
231 .await?
232 {
233 Some(blob) => blob,
234 None => continue,
235 };
236
237 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
239 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
240 if let Some(m) = &mut metrics {
241 m.blob_read_bytes += blob_size;
242 }
243 let reader = CachedBloomFilterIndexBlobReader::new(
244 file_id.file_id(),
245 file_id.version,
246 *column_id,
247 Tag::Skipping,
248 blob_size,
249 BloomFilterReaderImpl::new(blob),
250 bloom_filter_cache.clone(),
251 );
252 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
253 .await
254 .context(ApplyBloomFilterIndexSnafu)?;
255 } else {
256 let reader = BloomFilterReaderImpl::new(blob);
257 self.apply_predicates(reader, predicates, &mut output, metrics.as_deref_mut())
258 .await
259 .context(ApplyBloomFilterIndexSnafu)?;
260 }
261 }
262
263 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
265 let start = input.start;
266 for range in output.iter_mut() {
267 range.start -= start;
268 range.end -= start;
269 }
270 }
271
272 let elapsed = apply_start.elapsed();
274 INDEX_APPLY_ELAPSED
275 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
276 .observe(elapsed.as_secs_f64());
277
278 if let Some(m) = metrics {
279 m.apply_elapsed += elapsed;
280 }
281
282 Ok(output)
283 }
284
285 async fn blob_reader(
289 &self,
290 file_id: RegionIndexId,
291 column_id: ColumnId,
292 file_size_hint: Option<u64>,
293 metrics: Option<&mut BloomFilterIndexApplyMetrics>,
294 ) -> Result<Option<BlobReader>> {
295 let reader = match self
296 .cached_blob_reader(file_id, column_id, file_size_hint)
297 .await
298 {
299 Ok(Some(puffin_reader)) => puffin_reader,
300 other => {
301 if let Some(m) = metrics {
302 m.blob_cache_miss += 1;
303 }
304 if let Err(err) = other {
305 if is_blob_not_found(&err) {
307 return Ok(None);
308 }
309 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
310 }
311 let res = self
312 .remote_blob_reader(file_id, column_id, file_size_hint)
313 .await;
314 if let Err(err) = res {
315 if is_blob_not_found(&err) {
317 return Ok(None);
318 }
319 return Err(err);
320 }
321
322 res?
323 }
324 };
325
326 Ok(Some(reader))
327 }
328
329 async fn cached_blob_reader(
331 &self,
332 file_id: RegionIndexId,
333 column_id: ColumnId,
334 file_size_hint: Option<u64>,
335 ) -> Result<Option<BlobReader>> {
336 let Some(file_cache) = &self.file_cache else {
337 return Ok(None);
338 };
339
340 let index_key = IndexKey::new(
341 file_id.region_id(),
342 file_id.file_id(),
343 FileType::Puffin(file_id.version),
344 );
345 if file_cache.get(index_key).await.is_none() {
346 return Ok(None);
347 };
348
349 let puffin_manager = self.puffin_manager_factory.build(
350 file_cache.local_store(),
351 WriteCachePathProvider::new(file_cache.clone()),
352 );
353 let blob_name = Self::column_blob_name(column_id);
354
355 let reader = puffin_manager
356 .reader(&file_id)
357 .await
358 .context(PuffinBuildReaderSnafu)?
359 .with_file_size_hint(file_size_hint)
360 .blob(&blob_name)
361 .await
362 .context(PuffinReadBlobSnafu)?
363 .reader()
364 .await
365 .context(PuffinBuildReaderSnafu)?;
366 Ok(Some(reader))
367 }
368
369 fn column_blob_name(column_id: ColumnId) -> String {
371 format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
372 }
373
374 async fn remote_blob_reader(
376 &self,
377 file_id: RegionIndexId,
378 column_id: ColumnId,
379 file_size_hint: Option<u64>,
380 ) -> Result<BlobReader> {
381 let puffin_manager = self
382 .puffin_manager_factory
383 .build(
384 self.object_store.clone(),
385 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
386 )
387 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
388
389 let blob_name = Self::column_blob_name(column_id);
390
391 puffin_manager
392 .reader(&file_id)
393 .await
394 .context(PuffinBuildReaderSnafu)?
395 .with_file_size_hint(file_size_hint)
396 .blob(&blob_name)
397 .await
398 .context(PuffinReadBlobSnafu)?
399 .reader()
400 .await
401 .context(PuffinBuildReaderSnafu)
402 }
403
404 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
405 &self,
406 reader: R,
407 predicates: &[InListPredicate],
408 output: &mut [(usize, Vec<Range<usize>>)],
409 mut metrics: Option<&mut BloomFilterIndexApplyMetrics>,
410 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
411 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
412
413 for (_, row_group_output) in output.iter_mut() {
414 if row_group_output.is_empty() {
416 continue;
417 }
418
419 let read_metrics = metrics.as_deref_mut().map(|m| &mut m.read_metrics);
420 *row_group_output = applier
421 .search(predicates, row_group_output, read_metrics)
422 .await?;
423 }
424
425 Ok(())
426 }
427
428 pub fn predicate_key(&self) -> &PredicateKey {
430 &self.predicate_key
431 }
432}
433
434fn is_blob_not_found(err: &Error) -> bool {
435 matches!(
436 err,
437 Error::PuffinReadBlob {
438 source: puffin::error::Error::BlobNotFound { .. },
439 ..
440 }
441 )
442}
443
444#[cfg(test)]
445mod tests {
446
447 use datafusion_expr::{Expr, col, lit};
448 use futures::future::BoxFuture;
449 use puffin::puffin_manager::PuffinWriter;
450 use store_api::metadata::RegionMetadata;
451 use store_api::storage::FileId;
452
453 use super::*;
454 use crate::sst::file::RegionFileId;
455 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
456 use crate::sst::index::bloom_filter::creator::tests::{
457 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
458 };
459
460 #[allow(clippy::type_complexity)]
461 fn tester(
462 table_dir: String,
463 object_store: ObjectStore,
464 metadata: &RegionMetadata,
465 puffin_manager_factory: PuffinManagerFactory,
466 file_id: RegionIndexId,
467 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
468 + use<'_> {
469 move |exprs, row_groups| {
470 let table_dir = table_dir.clone();
471 let object_store: ObjectStore = object_store.clone();
472 let metadata = metadata.clone();
473 let puffin_manager_factory = puffin_manager_factory.clone();
474 let exprs = exprs.to_vec();
475
476 Box::pin(async move {
477 let builder = BloomFilterIndexApplierBuilder::new(
478 table_dir,
479 PathType::Bare,
480 object_store,
481 &metadata,
482 puffin_manager_factory,
483 );
484
485 let applier = builder.build(&exprs).unwrap().unwrap();
486 applier
487 .apply(file_id, None, row_groups.into_iter(), None)
488 .await
489 .unwrap()
490 .into_iter()
491 .filter(|(_, ranges)| !ranges.is_empty())
492 .collect()
493 })
494 }
495 }
496
497 #[tokio::test]
498 #[allow(clippy::single_range_in_vec_init)]
499 async fn test_bloom_filter_applier() {
500 let region_metadata = mock_region_metadata();
517 let prefix = "test_bloom_filter_applier_";
518 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
519 let object_store = mock_object_store();
520 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
521 let memory_usage_threshold = Some(1024);
522 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
523 let file_id = RegionIndexId::new(file_id, 0);
524 let table_dir = "table_dir".to_string();
525
526 let mut indexer = BloomFilterIndexer::new(
527 file_id.file_id(),
528 ®ion_metadata,
529 intm_mgr,
530 memory_usage_threshold,
531 )
532 .unwrap()
533 .unwrap();
534
535 let mut batch = new_batch("tag1", 0..10);
537 indexer.update(&mut batch).await.unwrap();
538 let mut batch = new_batch("tag2", 10..20);
539 indexer.update(&mut batch).await.unwrap();
540
541 let puffin_manager = factory.build(
542 object_store.clone(),
543 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
544 );
545
546 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
547 indexer.finish(&mut puffin_writer).await.unwrap();
548 puffin_writer.finish().await.unwrap();
549
550 let tester = tester(
551 table_dir.clone(),
552 object_store.clone(),
553 ®ion_metadata,
554 factory.clone(),
555 file_id,
556 );
557
558 let res = tester(
562 &[col("tag_str").eq(lit("tag1"))],
563 vec![(5, true), (5, true), (5, true), (5, true)],
564 )
565 .await;
566 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
567
568 let res = tester(
572 &[col("tag_str").eq(lit("tag1"))],
573 vec![(5, true), (5, false), (5, true), (5, true)],
574 )
575 .await;
576 assert_eq!(res, vec![(0, vec![0..5])]);
577
578 let res = tester(
583 &[
584 col("tag_str").eq(lit("tag1")),
585 col("field_u64").eq(lit(1u64)),
586 ],
587 vec![(5, true), (5, true), (5, true), (5, true)],
588 )
589 .await;
590 assert_eq!(res, vec![(0, vec![0..4])]);
591
592 let res = tester(
596 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
597 vec![(5, true), (5, true), (5, false), (5, true)],
598 )
599 .await;
600 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
601 }
602}