1mod builder;
16
17use std::collections::BTreeMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use index::target::IndexTarget;
26use object_store::ObjectStore;
27use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
28use puffin::puffin_manager::{PuffinManager, PuffinReader};
29use snafu::ResultExt;
30use store_api::region_request::PathType;
31use store_api::storage::ColumnId;
32
33use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
34use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
35use crate::cache::index::bloom_filter_index::{
36 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
37};
38use crate::cache::index::result_cache::PredicateKey;
39use crate::error::{
40 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
41 Result,
42};
43use crate::metrics::INDEX_APPLY_ELAPSED;
44use crate::sst::file::RegionFileId;
45use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
46use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
47pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
48use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
49
50pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
51
52pub struct BloomFilterIndexApplier {
54 table_dir: String,
56
57 path_type: PathType,
59
60 object_store: ObjectStore,
62
63 file_cache: Option<FileCacheRef>,
65
66 puffin_manager_factory: PuffinManagerFactory,
68
69 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
71
72 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
74
75 predicates: Arc<BTreeMap<ColumnId, Vec<InListPredicate>>>,
78
79 predicate_key: PredicateKey,
81}
82
83impl BloomFilterIndexApplier {
84 pub fn new(
88 table_dir: String,
89 path_type: PathType,
90 object_store: ObjectStore,
91 puffin_manager_factory: PuffinManagerFactory,
92 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
93 ) -> Self {
94 let predicates = Arc::new(predicates);
95 Self {
96 table_dir,
97 path_type,
98 object_store,
99 file_cache: None,
100 puffin_manager_factory,
101 puffin_metadata_cache: None,
102 bloom_filter_index_cache: None,
103 predicate_key: PredicateKey::new_bloom(predicates.clone()),
104 predicates,
105 }
106 }
107
108 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
109 self.file_cache = file_cache;
110 self
111 }
112
113 pub fn with_puffin_metadata_cache(
114 mut self,
115 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
116 ) -> Self {
117 self.puffin_metadata_cache = puffin_metadata_cache;
118 self
119 }
120
121 pub fn with_bloom_filter_cache(
122 mut self,
123 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
124 ) -> Self {
125 self.bloom_filter_index_cache = bloom_filter_index_cache;
126 self
127 }
128
129 pub async fn apply(
137 &self,
138 file_id: RegionFileId,
139 file_size_hint: Option<u64>,
140 row_groups: impl Iterator<Item = (usize, bool)>,
141 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
142 let _timer = INDEX_APPLY_ELAPSED
143 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
144 .start_timer();
145
146 let mut input = Vec::with_capacity(row_groups.size_hint().0);
148 let mut start = 0;
149 for (i, (len, to_search)) in row_groups.enumerate() {
150 let end = start + len;
151 if to_search {
152 input.push((i, start..end));
153 }
154 start = end;
155 }
156
157 let mut output = input
160 .iter()
161 .map(|(i, range)| (*i, vec![range.clone()]))
162 .collect::<Vec<_>>();
163
164 for (column_id, predicates) in self.predicates.iter() {
165 let blob = match self
166 .blob_reader(file_id, *column_id, file_size_hint)
167 .await?
168 {
169 Some(blob) => blob,
170 None => continue,
171 };
172
173 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
175 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
176 let reader = CachedBloomFilterIndexBlobReader::new(
177 file_id.file_id(),
178 *column_id,
179 Tag::Skipping,
180 blob_size,
181 BloomFilterReaderImpl::new(blob),
182 bloom_filter_cache.clone(),
183 );
184 self.apply_predicates(reader, predicates, &mut output)
185 .await
186 .context(ApplyBloomFilterIndexSnafu)?;
187 } else {
188 let reader = BloomFilterReaderImpl::new(blob);
189 self.apply_predicates(reader, predicates, &mut output)
190 .await
191 .context(ApplyBloomFilterIndexSnafu)?;
192 }
193 }
194
195 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
197 let start = input.start;
198 for range in output.iter_mut() {
199 range.start -= start;
200 range.end -= start;
201 }
202 }
203
204 Ok(output)
205 }
206
207 async fn blob_reader(
211 &self,
212 file_id: RegionFileId,
213 column_id: ColumnId,
214 file_size_hint: Option<u64>,
215 ) -> Result<Option<BlobReader>> {
216 let reader = match self
217 .cached_blob_reader(file_id, column_id, file_size_hint)
218 .await
219 {
220 Ok(Some(puffin_reader)) => puffin_reader,
221 other => {
222 if let Err(err) = other {
223 if is_blob_not_found(&err) {
225 return Ok(None);
226 }
227 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
228 }
229 let res = self
230 .remote_blob_reader(file_id, column_id, file_size_hint)
231 .await;
232 if let Err(err) = res {
233 if is_blob_not_found(&err) {
235 return Ok(None);
236 }
237 return Err(err);
238 }
239
240 res?
241 }
242 };
243
244 Ok(Some(reader))
245 }
246
247 async fn cached_blob_reader(
249 &self,
250 file_id: RegionFileId,
251 column_id: ColumnId,
252 file_size_hint: Option<u64>,
253 ) -> Result<Option<BlobReader>> {
254 let Some(file_cache) = &self.file_cache else {
255 return Ok(None);
256 };
257
258 let index_key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Puffin);
259 if file_cache.get(index_key).await.is_none() {
260 return Ok(None);
261 };
262
263 let puffin_manager = self.puffin_manager_factory.build(
264 file_cache.local_store(),
265 WriteCachePathProvider::new(file_cache.clone()),
266 );
267 let blob_name = Self::column_blob_name(column_id);
268
269 let reader = puffin_manager
270 .reader(&file_id)
271 .await
272 .context(PuffinBuildReaderSnafu)?
273 .with_file_size_hint(file_size_hint)
274 .blob(&blob_name)
275 .await
276 .context(PuffinReadBlobSnafu)?
277 .reader()
278 .await
279 .context(PuffinBuildReaderSnafu)?;
280 Ok(Some(reader))
281 }
282
283 fn column_blob_name(column_id: ColumnId) -> String {
285 format!("{INDEX_BLOB_TYPE}-{}", IndexTarget::ColumnId(column_id))
286 }
287
288 async fn remote_blob_reader(
290 &self,
291 file_id: RegionFileId,
292 column_id: ColumnId,
293 file_size_hint: Option<u64>,
294 ) -> Result<BlobReader> {
295 let puffin_manager = self
296 .puffin_manager_factory
297 .build(
298 self.object_store.clone(),
299 RegionFilePathFactory::new(self.table_dir.clone(), self.path_type),
300 )
301 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
302
303 let blob_name = Self::column_blob_name(column_id);
304
305 puffin_manager
306 .reader(&file_id)
307 .await
308 .context(PuffinBuildReaderSnafu)?
309 .with_file_size_hint(file_size_hint)
310 .blob(&blob_name)
311 .await
312 .context(PuffinReadBlobSnafu)?
313 .reader()
314 .await
315 .context(PuffinBuildReaderSnafu)
316 }
317
318 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
319 &self,
320 reader: R,
321 predicates: &[InListPredicate],
322 output: &mut [(usize, Vec<Range<usize>>)],
323 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
324 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
325
326 for (_, row_group_output) in output.iter_mut() {
327 if row_group_output.is_empty() {
329 continue;
330 }
331
332 *row_group_output = applier.search(predicates, row_group_output).await?;
333 }
334
335 Ok(())
336 }
337
338 pub fn predicate_key(&self) -> &PredicateKey {
340 &self.predicate_key
341 }
342}
343
344fn is_blob_not_found(err: &Error) -> bool {
345 matches!(
346 err,
347 Error::PuffinReadBlob {
348 source: puffin::error::Error::BlobNotFound { .. },
349 ..
350 }
351 )
352}
353
354#[cfg(test)]
355mod tests {
356
357 use datafusion_expr::{Expr, col, lit};
358 use futures::future::BoxFuture;
359 use puffin::puffin_manager::PuffinWriter;
360 use store_api::metadata::RegionMetadata;
361 use store_api::storage::FileId;
362
363 use super::*;
364 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
365 use crate::sst::index::bloom_filter::creator::tests::{
366 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
367 };
368
369 #[allow(clippy::type_complexity)]
370 fn tester(
371 table_dir: String,
372 object_store: ObjectStore,
373 metadata: &RegionMetadata,
374 puffin_manager_factory: PuffinManagerFactory,
375 file_id: RegionFileId,
376 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
377 + use<'_> {
378 move |exprs, row_groups| {
379 let table_dir = table_dir.clone();
380 let object_store: ObjectStore = object_store.clone();
381 let metadata = metadata.clone();
382 let puffin_manager_factory = puffin_manager_factory.clone();
383 let exprs = exprs.to_vec();
384
385 Box::pin(async move {
386 let builder = BloomFilterIndexApplierBuilder::new(
387 table_dir,
388 PathType::Bare,
389 object_store,
390 &metadata,
391 puffin_manager_factory,
392 );
393
394 let applier = builder.build(&exprs).unwrap().unwrap();
395 applier
396 .apply(file_id, None, row_groups.into_iter())
397 .await
398 .unwrap()
399 .into_iter()
400 .filter(|(_, ranges)| !ranges.is_empty())
401 .collect()
402 })
403 }
404 }
405
406 #[tokio::test]
407 #[allow(clippy::single_range_in_vec_init)]
408 async fn test_bloom_filter_applier() {
409 let region_metadata = mock_region_metadata();
426 let prefix = "test_bloom_filter_applier_";
427 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
428 let object_store = mock_object_store();
429 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
430 let memory_usage_threshold = Some(1024);
431 let file_id = RegionFileId::new(region_metadata.region_id, FileId::random());
432 let table_dir = "table_dir".to_string();
433
434 let mut indexer = BloomFilterIndexer::new(
435 file_id.file_id(),
436 ®ion_metadata,
437 intm_mgr,
438 memory_usage_threshold,
439 )
440 .unwrap()
441 .unwrap();
442
443 let mut batch = new_batch("tag1", 0..10);
445 indexer.update(&mut batch).await.unwrap();
446 let mut batch = new_batch("tag2", 10..20);
447 indexer.update(&mut batch).await.unwrap();
448
449 let puffin_manager = factory.build(
450 object_store.clone(),
451 RegionFilePathFactory::new(table_dir.clone(), PathType::Bare),
452 );
453
454 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
455 indexer.finish(&mut puffin_writer).await.unwrap();
456 puffin_writer.finish().await.unwrap();
457
458 let tester = tester(
459 table_dir.clone(),
460 object_store.clone(),
461 ®ion_metadata,
462 factory.clone(),
463 file_id,
464 );
465
466 let res = tester(
470 &[col("tag_str").eq(lit("tag1"))],
471 vec![(5, true), (5, true), (5, true), (5, true)],
472 )
473 .await;
474 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
475
476 let res = tester(
480 &[col("tag_str").eq(lit("tag1"))],
481 vec![(5, true), (5, false), (5, true), (5, true)],
482 )
483 .await;
484 assert_eq!(res, vec![(0, vec![0..5])]);
485
486 let res = tester(
491 &[
492 col("tag_str").eq(lit("tag1")),
493 col("field_u64").eq(lit(1u64)),
494 ],
495 vec![(5, true), (5, true), (5, true), (5, true)],
496 )
497 .await;
498 assert_eq!(res, vec![(0, vec![0..4])]);
499
500 let res = tester(
504 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
505 vec![(5, true), (5, true), (5, false), (5, true)],
506 )
507 .await;
508 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
509 }
510}