1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::region_request::PathType;
30use store_api::storage::ColumnId;
31
32use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
33use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
34use crate::cache::index::bloom_filter_index::{
35 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
36};
37use crate::cache::index::result_cache::PredicateKey;
38use crate::error::{
39 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
40 Result,
41};
42use crate::metrics::INDEX_APPLY_ELAPSED;
43use crate::sst::file::RegionFileId;
44pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
45use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
46use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
47use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
48
49pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
50
51pub struct BloomFilterIndexApplier {
53 table_dir: String,
55
56 path_type: PathType,
58
59 object_store: ObjectStore,
61
62 file_cache: Option<FileCacheRef>,
64
65 puffin_manager_factory: PuffinManagerFactory,
67
68 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
70
71 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
73
74 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
77
78 predicate_key: PredicateKey,
80}
81
82impl BloomFilterIndexApplier {
83 pub fn new(
87 table_dir: String,
88 path_type: PathType,
89 object_store: ObjectStore,
90 puffin_manager_factory: PuffinManagerFactory,
91 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
92 ) -> Self {
93 let predicates = Arc::new(predicates);
94 Self {
95 table_dir,
96 path_type,
97 object_store,
98 file_cache: None,
99 puffin_manager_factory,
100 puffin_metadata_cache: None,
101 bloom_filter_index_cache: None,
102 predicate_key: PredicateKey::new_bloom(predicates.clone()),
103 predicates,
104 }
105 }
106
107 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
108 self.file_cache = file_cache;
109 self
110 }
111
112 pub fn with_puffin_metadata_cache(
113 mut self,
114 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
115 ) -> Self {
116 self.puffin_metadata_cache = puffin_metadata_cache;
117 self
118 }
119
120 pub fn with_bloom_filter_cache(
121 mut self,
122 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
123 ) -> Self {
124 self.bloom_filter_index_cache = bloom_filter_index_cache;
125 self
126 }
127
128 pub async fn apply(
136 &self,
137 file_id: RegionFileId,
138 file_size_hint: Option<u64>,
139 row_groups: impl Iterator<Item = (usize, bool)>,
140 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
141 let _timer = INDEX_APPLY_ELAPSED
142 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
143 .start_timer();
144
145 let mut input = Vec::with_capacity(row_groups.size_hint().0);
147 let mut start = 0;
148 for (i, (len, to_search)) in row_groups.enumerate() {
149 let end = start + len;
150 if to_search {
151 input.push((i, start..end));
152 }
153 start = end;
154 }
155
156 let mut output = input
159 .iter()
160 .map(|(i, range)| (*i, vec![range.clone()]))
161 .collect::<Vec<_>>();
162
163 for (column_id, predicates) in self.predicates.iter() {
164 let blob = match self
165 .blob_reader(file_id, *column_id, file_size_hint)
166 .await?
167 {
168 Some(blob) => blob,
169 None => continue,
170 };
171
172 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
174 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
175 let reader = CachedBloomFilterIndexBlobReader::new(
176 file_id.file_id(),
177 *column_id,
178 Tag::Skipping,
179 blob_size,
180 BloomFilterReaderImpl::new(blob),
181 bloom_filter_cache.clone(),
182 );
183 self.apply_predicates(reader, predicates, &mut output)
184 .await
185 .context(ApplyBloomFilterIndexSnafu)?;
186 } else {
187 let reader = BloomFilterReaderImpl::new(blob);
188 self.apply_predicates(reader, predicates, &mut output)
189 .await
190 .context(ApplyBloomFilterIndexSnafu)?;
191 }
192 }
193
194 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
196 let start = input.start;
197 for range in output.iter_mut() {
198 range.start -= start;
199 range.end -= start;
200 }
201 }
202
203 Ok(output)
204 }
205
206 async fn blob_reader(
210 &self,
211 file_id: RegionFileId,
212 column_id: ColumnId,
213 file_size_hint: Option<u64>,
214 ) -> Result<Option<BlobReader>> {
215 let reader = match self
216 .cached_blob_reader(file_id, column_id, file_size_hint)
217 .await
218 {
219 Ok(Some(puffin_reader)) => puffin_reader,
220 other => {
221 if let Err(err) = other {
222 if is_blob_not_found(&err) {
224 return Ok(None);
225 }
226 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
227 }
228 let res = self
229 .remote_blob_reader(file_id, column_id, file_size_hint)
230 .await;
231 if let Err(err) = res {
232 if is_blob_not_found(&err) {
234 return Ok(None);
235 }
236 return Err(err);
237 }
238
239 res?
240 }
241 };
242
243 Ok(Some(reader))
244 }
245
246 async fn cached_blob_reader(
248 &self,
249 file_id: RegionFileId,
250 column_id: ColumnId,
251 file_size_hint: Option<u64>,
252 ) -> Result<Option<BlobReader>> {
253 let Some(file_cache) = &self.file_cache else {
254 return Ok(None);
255 };
256
257 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
258 if file_cache.get(index_key).await.is_none() {
259 return Ok(None);
260 };
261
262 let puffin_manager = self.puffin_manager_factory.build(
263 file_cache.local_store(),
264 WriteCachePathProvider::new(file_cache.clone()),
265 );
266 let reader = puffin_manager
267 .reader(&file_id)
268 .await
269 .context(PuffinBuildReaderSnafu)?
270 .with_file_size_hint(file_size_hint)
271 .blob(&Self::column_blob_name(column_id))
272 .await
273 .context(PuffinReadBlobSnafu)?
274 .reader()
275 .await
276 .context(PuffinBuildReaderSnafu)?;
277 Ok(Some(reader))
278 }
279
280 fn column_blob_name(column_id: ColumnId) -> String {
282 format!("{INDEX_BLOB_TYPE}-{column_id}")
283 }
284
285 async fn remote_blob_reader(
287 &self,
288 file_id: RegionFileId,
289 column_id: ColumnId,
290 file_size_hint: Option<u64>,
291 ) -> Result<BlobReader> {
292 let puffin_manager = self
293 .puffin_manager_factory
294 .build(
295 self.object_store.clone(),
296 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
297 )
298 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
299
300 puffin_manager
301 .reader(&file_id)
302 .await
303 .context(PuffinBuildReaderSnafu)?
304 .with_file_size_hint(file_size_hint)
305 .blob(&Self::column_blob_name(column_id))
306 .await
307 .context(PuffinReadBlobSnafu)?
308 .reader()
309 .await
310 .context(PuffinBuildReaderSnafu)
311 }
312
313 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
314 &self,
315 reader: R,
316 predicates: &[InListPredicate],
317 output: &mut [(usize, Vec<Range<usize>>)],
318 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
319 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
320
321 for (_, row_group_output) in output.iter_mut() {
322 if row_group_output.is_empty() {
324 continue;
325 }
326
327 *row_group_output = applier.search(predicates, row_group_output).await?;
328 }
329
330 Ok(())
331 }
332
333 pub fn predicate_key(&self) -> &PredicateKey {
335 &self.predicate_key
336 }
337}
338
339fn is_blob_not_found(err: &Error) -> bool {
340 matches!(
341 err,
342 Error::PuffinReadBlob {
343 source: puffin::error::Error::BlobNotFound { .. },
344 ..
345 }
346 )
347}
348
349#[cfg(test)]
350mod tests {
351
352 use datafusion_expr::{col, lit, Expr};
353 use futures::future::BoxFuture;
354 use puffin::puffin_manager::PuffinWriter;
355 use store_api::metadata::RegionMetadata;
356
357 use super::*;
358 use crate::sst::file::FileId;
359 use crate::sst::index::bloom_filter::creator::tests::{
360 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
361 };
362 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
363
364 #[allow(clippy::type_complexity)]
365 fn tester(
366 table_dir: String,
367 object_store: ObjectStore,
368 metadata: &RegionMetadata,
369 puffin_manager_factory: PuffinManagerFactory,
370 file_id: RegionFileId,
371 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
372 + use<'_> {
373 move |exprs, row_groups| {
374 let table_dir = table_dir.clone();
375 let object_store: ObjectStore = object_store.clone();
376 let metadata = metadata.clone();
377 let puffin_manager_factory = puffin_manager_factory.clone();
378 let exprs = exprs.to_vec();
379
380 Box::pin(async move {
381 let builder = BloomFilterIndexApplierBuilder::new(
382 table_dir,
383 PathType::Bare,
384 object_store,
385 &metadata,
386 puffin_manager_factory,
387 );
388
389 let applier = builder.build(&exprs).unwrap().unwrap();
390 applier
391 .apply(file_id, None, row_groups.into_iter())
392 .await
393 .unwrap()
394 .into_iter()
395 .filter(|(_, ranges)| !ranges.is_empty())
396 .collect()
397 })
398 }
399 }
400
401 #[tokio::test]
402 #[allow(clippy::single_range_in_vec_init)]
403 async fn test_bloom_filter_applier() {
404 let region_metadata = mock_region_metadata();
421 let prefix = "test_bloom_filter_applier_";
422 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
423 let object_store = mock_object_store();
424 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
425 let memory_usage_threshold = Some(1024);
426 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
427 let table_dir = "table_dir".to_string();
428
429 let mut indexer = BloomFilterIndexer::new(
430 file_id.file_id(),
431 ®ion_metadata,
432 intm_mgr,
433 memory_usage_threshold,
434 )
435 .unwrap()
436 .unwrap();
437
438 let mut batch = new_batch("tag1", 0..10);
440 indexer.update(&mut batch).await.unwrap();
441 let mut batch = new_batch("tag2", 10..20);
442 indexer.update(&mut batch).await.unwrap();
443
444 let puffin_manager = factory.build(
445 object_store.clone(),
446 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
447 );
448
449 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
450 indexer.finish(&mut puffin_writer).await.unwrap();
451 puffin_writer.finish().await.unwrap();
452
453 let tester = tester(
454 table_dir.clone(),
455 object_store.clone(),
456 ®ion_metadata,
457 factory.clone(),
458 file_id,
459 );
460
461 let res = tester(
465 &[col("tag_str").eq(lit("tag1"))],
466 vec![(5, true), (5, true), (5, true), (5, true)],
467 )
468 .await;
469 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
470
471 let res = tester(
475 &[col("tag_str").eq(lit("tag1"))],
476 vec![(5, true), (5, false), (5, true), (5, true)],
477 )
478 .await;
479 assert_eq!(res, vec![(0, vec![0..5])]);
480
481 let res = tester(
486 &[
487 col("tag_str").eq(lit("tag1")),
488 col("field_u64").eq(lit(1u64)),
489 ],
490 vec![(5, true), (5, true), (5, true), (5, true)],
491 )
492 .await;
493 assert_eq!(res, vec![(0, vec![0..4])]);
494
495 let res = tester(
499 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
500 vec![(5, true), (5, true), (5, false), (5, true)],
501 )
502 .await;
503 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
504 }
505}