1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
39 Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
44use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
47
48pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
49
50pub struct BloomFilterIndexApplier {
52 region_dir: String,
54
55 region_id: RegionId,
57
58 object_store: ObjectStore,
60
61 file_cache: Option<FileCacheRef>,
63
64 puffin_manager_factory: PuffinManagerFactory,
66
67 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
69
70 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
72
73 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
76
77 predicate_key: PredicateKey,
79}
80
81impl BloomFilterIndexApplier {
82 pub fn new(
86 region_dir: String,
87 region_id: RegionId,
88 object_store: ObjectStore,
89 puffin_manager_factory: PuffinManagerFactory,
90 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
91 ) -> Self {
92 let predicates = Arc::new(predicates);
93 Self {
94 region_dir,
95 region_id,
96 object_store,
97 file_cache: None,
98 puffin_manager_factory,
99 puffin_metadata_cache: None,
100 bloom_filter_index_cache: None,
101 predicate_key: PredicateKey::new_bloom(predicates.clone()),
102 predicates,
103 }
104 }
105
106 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107 self.file_cache = file_cache;
108 self
109 }
110
111 pub fn with_puffin_metadata_cache(
112 mut self,
113 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114 ) -> Self {
115 self.puffin_metadata_cache = puffin_metadata_cache;
116 self
117 }
118
119 pub fn with_bloom_filter_cache(
120 mut self,
121 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
122 ) -> Self {
123 self.bloom_filter_index_cache = bloom_filter_index_cache;
124 self
125 }
126
127 pub async fn apply(
135 &self,
136 file_id: FileId,
137 file_size_hint: Option<u64>,
138 row_groups: impl Iterator<Item = (usize, bool)>,
139 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
140 let _timer = INDEX_APPLY_ELAPSED
141 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
142 .start_timer();
143
144 let mut input = Vec::with_capacity(row_groups.size_hint().0);
146 let mut start = 0;
147 for (i, (len, to_search)) in row_groups.enumerate() {
148 let end = start + len;
149 if to_search {
150 input.push((i, start..end));
151 }
152 start = end;
153 }
154
155 let mut output = input
158 .iter()
159 .map(|(i, range)| (*i, vec![range.clone()]))
160 .collect::<Vec<_>>();
161
162 for (column_id, predicates) in self.predicates.iter() {
163 let blob = match self
164 .blob_reader(file_id, *column_id, file_size_hint)
165 .await?
166 {
167 Some(blob) => blob,
168 None => continue,
169 };
170
171 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
173 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
174 let reader = CachedBloomFilterIndexBlobReader::new(
175 file_id,
176 *column_id,
177 Tag::Skipping,
178 blob_size,
179 BloomFilterReaderImpl::new(blob),
180 bloom_filter_cache.clone(),
181 );
182 self.apply_predicates(reader, predicates, &mut output)
183 .await
184 .context(ApplyBloomFilterIndexSnafu)?;
185 } else {
186 let reader = BloomFilterReaderImpl::new(blob);
187 self.apply_predicates(reader, predicates, &mut output)
188 .await
189 .context(ApplyBloomFilterIndexSnafu)?;
190 }
191 }
192
193 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
195 let start = input.start;
196 for range in output.iter_mut() {
197 range.start -= start;
198 range.end -= start;
199 }
200 }
201
202 Ok(output)
203 }
204
205 async fn blob_reader(
209 &self,
210 file_id: FileId,
211 column_id: ColumnId,
212 file_size_hint: Option<u64>,
213 ) -> Result<Option<BlobReader>> {
214 let reader = match self
215 .cached_blob_reader(file_id, column_id, file_size_hint)
216 .await
217 {
218 Ok(Some(puffin_reader)) => puffin_reader,
219 other => {
220 if let Err(err) = other {
221 if is_blob_not_found(&err) {
223 return Ok(None);
224 }
225 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
226 }
227 let res = self
228 .remote_blob_reader(file_id, column_id, file_size_hint)
229 .await;
230 if let Err(err) = res {
231 if is_blob_not_found(&err) {
233 return Ok(None);
234 }
235 return Err(err);
236 }
237
238 res?
239 }
240 };
241
242 Ok(Some(reader))
243 }
244
245 async fn cached_blob_reader(
247 &self,
248 file_id: FileId,
249 column_id: ColumnId,
250 file_size_hint: Option<u64>,
251 ) -> Result<Option<BlobReader>> {
252 let Some(file_cache) = &self.file_cache else {
253 return Ok(None);
254 };
255
256 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
257 if file_cache.get(index_key).await.is_none() {
258 return Ok(None);
259 };
260
261 let puffin_manager = self.puffin_manager_factory.build(
262 file_cache.local_store(),
263 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
264 );
265 let reader = puffin_manager
266 .reader(&file_id)
267 .await
268 .context(PuffinBuildReaderSnafu)?
269 .with_file_size_hint(file_size_hint)
270 .blob(&Self::column_blob_name(column_id))
271 .await
272 .context(PuffinReadBlobSnafu)?
273 .reader()
274 .await
275 .context(PuffinBuildReaderSnafu)?;
276 Ok(Some(reader))
277 }
278
279 fn column_blob_name(column_id: ColumnId) -> String {
281 format!("{INDEX_BLOB_TYPE}-{column_id}")
282 }
283
284 async fn remote_blob_reader(
286 &self,
287 file_id: FileId,
288 column_id: ColumnId,
289 file_size_hint: Option<u64>,
290 ) -> Result<BlobReader> {
291 let puffin_manager = self
292 .puffin_manager_factory
293 .build(
294 self.object_store.clone(),
295 RegionFilePathFactory::new(self.region_dir.clone()),
296 )
297 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
298
299 puffin_manager
300 .reader(&file_id)
301 .await
302 .context(PuffinBuildReaderSnafu)?
303 .with_file_size_hint(file_size_hint)
304 .blob(&Self::column_blob_name(column_id))
305 .await
306 .context(PuffinReadBlobSnafu)?
307 .reader()
308 .await
309 .context(PuffinBuildReaderSnafu)
310 }
311
312 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
313 &self,
314 reader: R,
315 predicates: &[InListPredicate],
316 output: &mut [(usize, Vec<Range<usize>>)],
317 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
318 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
319
320 for (_, row_group_output) in output.iter_mut() {
321 if row_group_output.is_empty() {
323 continue;
324 }
325
326 *row_group_output = applier.search(predicates, row_group_output).await?;
327 }
328
329 Ok(())
330 }
331
332 pub fn predicate_key(&self) -> &PredicateKey {
334 &self.predicate_key
335 }
336}
337
338fn is_blob_not_found(err: &Error) -> bool {
339 matches!(
340 err,
341 Error::PuffinReadBlob {
342 source: puffin::error::Error::BlobNotFound { .. },
343 ..
344 }
345 )
346}
347
348#[cfg(test)]
349mod tests {
350
351 use datafusion_expr::{col, lit, Expr};
352 use futures::future::BoxFuture;
353 use puffin::puffin_manager::PuffinWriter;
354 use store_api::metadata::RegionMetadata;
355
356 use super::*;
357 use crate::sst::index::bloom_filter::creator::tests::{
358 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
359 };
360 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
361
362 #[allow(clippy::type_complexity)]
363 fn tester(
364 region_dir: String,
365 object_store: ObjectStore,
366 metadata: &RegionMetadata,
367 puffin_manager_factory: PuffinManagerFactory,
368 file_id: FileId,
369 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
370 + use<'_> {
371 move |exprs, row_groups| {
372 let region_dir = region_dir.clone();
373 let object_store = object_store.clone();
374 let metadata = metadata.clone();
375 let puffin_manager_factory = puffin_manager_factory.clone();
376 let exprs = exprs.to_vec();
377
378 Box::pin(async move {
379 let builder = BloomFilterIndexApplierBuilder::new(
380 region_dir,
381 object_store,
382 &metadata,
383 puffin_manager_factory,
384 );
385
386 let applier = builder.build(&exprs).unwrap().unwrap();
387 applier
388 .apply(file_id, None, row_groups.into_iter())
389 .await
390 .unwrap()
391 .into_iter()
392 .filter(|(_, ranges)| !ranges.is_empty())
393 .collect()
394 })
395 }
396 }
397
398 #[tokio::test]
399 #[allow(clippy::single_range_in_vec_init)]
400 async fn test_bloom_filter_applier() {
401 let region_metadata = mock_region_metadata();
418 let prefix = "test_bloom_filter_applier_";
419 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
420 let object_store = mock_object_store();
421 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
422 let memory_usage_threshold = Some(1024);
423 let file_id = FileId::random();
424 let region_dir = "region_dir".to_string();
425
426 let mut indexer =
427 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
428 .unwrap()
429 .unwrap();
430
431 let mut batch = new_batch("tag1", 0..10);
433 indexer.update(&mut batch).await.unwrap();
434 let mut batch = new_batch("tag2", 10..20);
435 indexer.update(&mut batch).await.unwrap();
436
437 let puffin_manager = factory.build(
438 object_store.clone(),
439 RegionFilePathFactory::new(region_dir.clone()),
440 );
441
442 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
443 indexer.finish(&mut puffin_writer).await.unwrap();
444 puffin_writer.finish().await.unwrap();
445
446 let tester = tester(
447 region_dir.clone(),
448 object_store.clone(),
449 ®ion_metadata,
450 factory.clone(),
451 file_id,
452 );
453
454 let res = tester(
458 &[col("tag_str").eq(lit("tag1"))],
459 vec![(5, true), (5, true), (5, true), (5, true)],
460 )
461 .await;
462 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
463
464 let res = tester(
468 &[col("tag_str").eq(lit("tag1"))],
469 vec![(5, true), (5, false), (5, true), (5, true)],
470 )
471 .await;
472 assert_eq!(res, vec![(0, vec![0..5])]);
473
474 let res = tester(
479 &[
480 col("tag_str").eq(lit("tag1")),
481 col("field_u64").eq(lit(1u64)),
482 ],
483 vec![(5, true), (5, true), (5, true), (5, true)],
484 )
485 .await;
486 assert_eq!(res, vec![(0, vec![0..4])]);
487
488 let res = tester(
492 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
493 vec![(5, true), (5, true), (5, false), (5, true)],
494 )
495 .await;
496 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
497 }
498}