1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::cache::index::result_cache::PredicateKey;
37use crate::error::{
38 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
39 Result,
40};
41use crate::metrics::INDEX_APPLY_ELAPSED;
42use crate::sst::file::FileId;
43pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
44use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
45use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
46use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
47
48pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
49
50pub struct BloomFilterIndexApplier {
52 region_dir: String,
54
55 region_id: RegionId,
57
58 object_store: ObjectStore,
60
61 file_cache: Option<FileCacheRef>,
63
64 puffin_manager_factory: PuffinManagerFactory,
66
67 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
69
70 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
72
73 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
76
77 predicate_key: PredicateKey,
79}
80
81impl BloomFilterIndexApplier {
82 pub fn new(
86 region_dir: String,
87 region_id: RegionId,
88 object_store: ObjectStore,
89 puffin_manager_factory: PuffinManagerFactory,
90 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
91 ) -> Self {
92 let predicates = Arc::new(predicates);
93 Self {
94 region_dir,
95 region_id,
96 object_store,
97 file_cache: None,
98 puffin_manager_factory,
99 puffin_metadata_cache: None,
100 bloom_filter_index_cache: None,
101 predicate_key: PredicateKey::new_bloom(predicates.clone()),
102 predicates,
103 }
104 }
105
106 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107 self.file_cache = file_cache;
108 self
109 }
110
111 pub fn with_puffin_metadata_cache(
112 mut self,
113 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114 ) -> Self {
115 self.puffin_metadata_cache = puffin_metadata_cache;
116 self
117 }
118
119 pub fn with_bloom_filter_cache(
120 mut self,
121 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
122 ) -> Self {
123 self.bloom_filter_index_cache = bloom_filter_index_cache;
124 self
125 }
126
127 pub async fn apply(
132 &self,
133 file_id: FileId,
134 file_size_hint: Option<u64>,
135 row_groups: impl Iterator<Item = (usize, bool)>,
136 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
137 let _timer = INDEX_APPLY_ELAPSED
138 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
139 .start_timer();
140
141 let mut input = Vec::with_capacity(row_groups.size_hint().0);
143 let mut start = 0;
144 for (i, (len, to_search)) in row_groups.enumerate() {
145 let end = start + len;
146 if to_search {
147 input.push((i, start..end));
148 }
149 start = end;
150 }
151
152 let mut output = input
155 .iter()
156 .map(|(i, range)| (*i, vec![range.clone()]))
157 .collect::<Vec<_>>();
158
159 for (column_id, predicates) in self.predicates.iter() {
160 let blob = match self
161 .blob_reader(file_id, *column_id, file_size_hint)
162 .await?
163 {
164 Some(blob) => blob,
165 None => continue,
166 };
167
168 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
170 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
171 let reader = CachedBloomFilterIndexBlobReader::new(
172 file_id,
173 *column_id,
174 Tag::Skipping,
175 blob_size,
176 BloomFilterReaderImpl::new(blob),
177 bloom_filter_cache.clone(),
178 );
179 self.apply_predicates(reader, predicates, &mut output)
180 .await
181 .context(ApplyBloomFilterIndexSnafu)?;
182 } else {
183 let reader = BloomFilterReaderImpl::new(blob);
184 self.apply_predicates(reader, predicates, &mut output)
185 .await
186 .context(ApplyBloomFilterIndexSnafu)?;
187 }
188 }
189
190 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
192 let start = input.start;
193 for range in output.iter_mut() {
194 range.start -= start;
195 range.end -= start;
196 }
197 }
198 output.retain(|(_, ranges)| !ranges.is_empty());
199
200 Ok(output)
201 }
202
203 async fn blob_reader(
207 &self,
208 file_id: FileId,
209 column_id: ColumnId,
210 file_size_hint: Option<u64>,
211 ) -> Result<Option<BlobReader>> {
212 let reader = match self
213 .cached_blob_reader(file_id, column_id, file_size_hint)
214 .await
215 {
216 Ok(Some(puffin_reader)) => puffin_reader,
217 other => {
218 if let Err(err) = other {
219 if is_blob_not_found(&err) {
221 return Ok(None);
222 }
223 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
224 }
225 let res = self
226 .remote_blob_reader(file_id, column_id, file_size_hint)
227 .await;
228 if let Err(err) = res {
229 if is_blob_not_found(&err) {
231 return Ok(None);
232 }
233 return Err(err);
234 }
235
236 res?
237 }
238 };
239
240 Ok(Some(reader))
241 }
242
243 async fn cached_blob_reader(
245 &self,
246 file_id: FileId,
247 column_id: ColumnId,
248 file_size_hint: Option<u64>,
249 ) -> Result<Option<BlobReader>> {
250 let Some(file_cache) = &self.file_cache else {
251 return Ok(None);
252 };
253
254 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
255 if file_cache.get(index_key).await.is_none() {
256 return Ok(None);
257 };
258
259 let puffin_manager = self.puffin_manager_factory.build(
260 file_cache.local_store(),
261 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
262 );
263 let reader = puffin_manager
264 .reader(&file_id)
265 .await
266 .context(PuffinBuildReaderSnafu)?
267 .with_file_size_hint(file_size_hint)
268 .blob(&Self::column_blob_name(column_id))
269 .await
270 .context(PuffinReadBlobSnafu)?
271 .reader()
272 .await
273 .context(PuffinBuildReaderSnafu)?;
274 Ok(Some(reader))
275 }
276
277 fn column_blob_name(column_id: ColumnId) -> String {
279 format!("{INDEX_BLOB_TYPE}-{column_id}")
280 }
281
282 async fn remote_blob_reader(
284 &self,
285 file_id: FileId,
286 column_id: ColumnId,
287 file_size_hint: Option<u64>,
288 ) -> Result<BlobReader> {
289 let puffin_manager = self
290 .puffin_manager_factory
291 .build(
292 self.object_store.clone(),
293 RegionFilePathFactory::new(self.region_dir.clone()),
294 )
295 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
296
297 puffin_manager
298 .reader(&file_id)
299 .await
300 .context(PuffinBuildReaderSnafu)?
301 .with_file_size_hint(file_size_hint)
302 .blob(&Self::column_blob_name(column_id))
303 .await
304 .context(PuffinReadBlobSnafu)?
305 .reader()
306 .await
307 .context(PuffinBuildReaderSnafu)
308 }
309
310 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
311 &self,
312 reader: R,
313 predicates: &[InListPredicate],
314 output: &mut [(usize, Vec<Range<usize>>)],
315 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
316 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
317
318 for (_, row_group_output) in output.iter_mut() {
319 if row_group_output.is_empty() {
321 continue;
322 }
323
324 *row_group_output = applier.search(predicates, row_group_output).await?;
325 }
326
327 Ok(())
328 }
329
330 pub fn predicate_key(&self) -> &PredicateKey {
332 &self.predicate_key
333 }
334}
335
336fn is_blob_not_found(err: &Error) -> bool {
337 matches!(
338 err,
339 Error::PuffinReadBlob {
340 source: puffin::error::Error::BlobNotFound { .. },
341 ..
342 }
343 )
344}
345
346#[cfg(test)]
347mod tests {
348
349 use datafusion_expr::{col, lit, Expr};
350 use futures::future::BoxFuture;
351 use puffin::puffin_manager::PuffinWriter;
352 use store_api::metadata::RegionMetadata;
353
354 use super::*;
355 use crate::sst::index::bloom_filter::creator::tests::{
356 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
357 };
358 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
359
360 #[allow(clippy::type_complexity)]
361 fn tester(
362 region_dir: String,
363 object_store: ObjectStore,
364 metadata: &RegionMetadata,
365 puffin_manager_factory: PuffinManagerFactory,
366 file_id: FileId,
367 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
368 + use<'_> {
369 move |exprs, row_groups| {
370 let region_dir = region_dir.clone();
371 let object_store = object_store.clone();
372 let metadata = metadata.clone();
373 let puffin_manager_factory = puffin_manager_factory.clone();
374 let exprs = exprs.to_vec();
375
376 Box::pin(async move {
377 let builder = BloomFilterIndexApplierBuilder::new(
378 region_dir,
379 object_store,
380 &metadata,
381 puffin_manager_factory,
382 );
383
384 let applier = builder.build(&exprs).unwrap().unwrap();
385 applier
386 .apply(file_id, None, row_groups.into_iter())
387 .await
388 .unwrap()
389 })
390 }
391 }
392
393 #[tokio::test]
394 #[allow(clippy::single_range_in_vec_init)]
395 async fn test_bloom_filter_applier() {
396 let region_metadata = mock_region_metadata();
413 let prefix = "test_bloom_filter_applier_";
414 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
415 let object_store = mock_object_store();
416 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
417 let memory_usage_threshold = Some(1024);
418 let file_id = FileId::random();
419 let region_dir = "region_dir".to_string();
420
421 let mut indexer =
422 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
423 .unwrap()
424 .unwrap();
425
426 let mut batch = new_batch("tag1", 0..10);
428 indexer.update(&mut batch).await.unwrap();
429 let mut batch = new_batch("tag2", 10..20);
430 indexer.update(&mut batch).await.unwrap();
431
432 let puffin_manager = factory.build(
433 object_store.clone(),
434 RegionFilePathFactory::new(region_dir.clone()),
435 );
436
437 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
438 indexer.finish(&mut puffin_writer).await.unwrap();
439 puffin_writer.finish().await.unwrap();
440
441 let tester = tester(
442 region_dir.clone(),
443 object_store.clone(),
444 ®ion_metadata,
445 factory.clone(),
446 file_id,
447 );
448
449 let res = tester(
453 &[col("tag_str").eq(lit("tag1"))],
454 vec![(5, true), (5, true), (5, true), (5, true)],
455 )
456 .await;
457 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
458
459 let res = tester(
463 &[col("tag_str").eq(lit("tag1"))],
464 vec![(5, true), (5, false), (5, true), (5, true)],
465 )
466 .await;
467 assert_eq!(res, vec![(0, vec![0..5])]);
468
469 let res = tester(
474 &[
475 col("tag_str").eq(lit("tag1")),
476 col("field_u64").eq(lit(1u64)),
477 ],
478 vec![(5, true), (5, true), (5, true), (5, true)],
479 )
480 .await;
481 assert_eq!(res, vec![(0, vec![0..4])]);
482
483 let res = tester(
487 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
488 vec![(5, true), (5, true), (5, false), (5, true)],
489 )
490 .await;
491 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
492 }
493}