1mod builder;
16
17use std::collections::HashMap;
18use std::ops::Range;
19use std::sync::Arc;
20
21use common_base::range_read::RangeReader;
22use common_telemetry::warn;
23use index::bloom_filter::applier::{BloomFilterApplier, InListPredicate};
24use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use puffin::puffin_manager::{PuffinManager, PuffinReader};
28use snafu::ResultExt;
29use store_api::storage::{ColumnId, RegionId};
30
31use crate::access_layer::{RegionFilePathFactory, WriteCachePathProvider};
32use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
33use crate::cache::index::bloom_filter_index::{
34 BloomFilterIndexCacheRef, CachedBloomFilterIndexBlobReader, Tag,
35};
36use crate::error::{
37 ApplyBloomFilterIndexSnafu, Error, MetadataSnafu, PuffinBuildReaderSnafu, PuffinReadBlobSnafu,
38 Result,
39};
40use crate::metrics::INDEX_APPLY_ELAPSED;
41use crate::sst::file::FileId;
42pub use crate::sst::index::bloom_filter::applier::builder::BloomFilterIndexApplierBuilder;
43use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
44use crate::sst::index::puffin_manager::{BlobReader, PuffinManagerFactory};
45use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
46
47pub(crate) type BloomFilterIndexApplierRef = Arc<BloomFilterIndexApplier>;
48
49pub struct BloomFilterIndexApplier {
51 region_dir: String,
53
54 region_id: RegionId,
56
57 object_store: ObjectStore,
59
60 file_cache: Option<FileCacheRef>,
62
63 puffin_manager_factory: PuffinManagerFactory,
65
66 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
68
69 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
71
72 predicates: HashMap<ColumnId, Vec<InListPredicate>>,
75}
76
77impl BloomFilterIndexApplier {
78 pub fn new(
82 region_dir: String,
83 region_id: RegionId,
84 object_store: ObjectStore,
85 puffin_manager_factory: PuffinManagerFactory,
86 predicates: HashMap<ColumnId, Vec<InListPredicate>>,
87 ) -> Self {
88 Self {
89 region_dir,
90 region_id,
91 object_store,
92 file_cache: None,
93 puffin_manager_factory,
94 puffin_metadata_cache: None,
95 bloom_filter_index_cache: None,
96 predicates,
97 }
98 }
99
100 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
101 self.file_cache = file_cache;
102 self
103 }
104
105 pub fn with_puffin_metadata_cache(
106 mut self,
107 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
108 ) -> Self {
109 self.puffin_metadata_cache = puffin_metadata_cache;
110 self
111 }
112
113 pub fn with_bloom_filter_cache(
114 mut self,
115 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
116 ) -> Self {
117 self.bloom_filter_index_cache = bloom_filter_index_cache;
118 self
119 }
120
121 pub async fn apply(
126 &self,
127 file_id: FileId,
128 file_size_hint: Option<u64>,
129 row_groups: impl Iterator<Item = (usize, bool)>,
130 ) -> Result<Vec<(usize, Vec<Range<usize>>)>> {
131 let _timer = INDEX_APPLY_ELAPSED
132 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
133 .start_timer();
134
135 let mut input = Vec::with_capacity(row_groups.size_hint().0);
137 let mut start = 0;
138 for (i, (len, to_search)) in row_groups.enumerate() {
139 let end = start + len;
140 if to_search {
141 input.push((i, start..end));
142 }
143 start = end;
144 }
145
146 let mut output = input
149 .iter()
150 .map(|(i, range)| (*i, vec![range.clone()]))
151 .collect::<Vec<_>>();
152
153 for (column_id, predicates) in &self.predicates {
154 let blob = match self
155 .blob_reader(file_id, *column_id, file_size_hint)
156 .await?
157 {
158 Some(blob) => blob,
159 None => continue,
160 };
161
162 if let Some(bloom_filter_cache) = &self.bloom_filter_index_cache {
164 let blob_size = blob.metadata().await.context(MetadataSnafu)?.content_length;
165 let reader = CachedBloomFilterIndexBlobReader::new(
166 file_id,
167 *column_id,
168 Tag::Skipping,
169 blob_size,
170 BloomFilterReaderImpl::new(blob),
171 bloom_filter_cache.clone(),
172 );
173 self.apply_predicates(reader, predicates, &mut output)
174 .await
175 .context(ApplyBloomFilterIndexSnafu)?;
176 } else {
177 let reader = BloomFilterReaderImpl::new(blob);
178 self.apply_predicates(reader, predicates, &mut output)
179 .await
180 .context(ApplyBloomFilterIndexSnafu)?;
181 }
182 }
183
184 for ((_, output), (_, input)) in output.iter_mut().zip(input) {
186 let start = input.start;
187 for range in output.iter_mut() {
188 range.start -= start;
189 range.end -= start;
190 }
191 }
192 output.retain(|(_, ranges)| !ranges.is_empty());
193
194 Ok(output)
195 }
196
197 async fn blob_reader(
201 &self,
202 file_id: FileId,
203 column_id: ColumnId,
204 file_size_hint: Option<u64>,
205 ) -> Result<Option<BlobReader>> {
206 let reader = match self
207 .cached_blob_reader(file_id, column_id, file_size_hint)
208 .await
209 {
210 Ok(Some(puffin_reader)) => puffin_reader,
211 other => {
212 if let Err(err) = other {
213 if is_blob_not_found(&err) {
215 return Ok(None);
216 }
217 warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
218 }
219 let res = self
220 .remote_blob_reader(file_id, column_id, file_size_hint)
221 .await;
222 if let Err(err) = res {
223 if is_blob_not_found(&err) {
225 return Ok(None);
226 }
227 return Err(err);
228 }
229
230 res?
231 }
232 };
233
234 Ok(Some(reader))
235 }
236
237 async fn cached_blob_reader(
239 &self,
240 file_id: FileId,
241 column_id: ColumnId,
242 file_size_hint: Option<u64>,
243 ) -> Result<Option<BlobReader>> {
244 let Some(file_cache) = &self.file_cache else {
245 return Ok(None);
246 };
247
248 let index_key = IndexKey::new(self.region_id, file_id, FileType::Puffin);
249 if file_cache.get(index_key).await.is_none() {
250 return Ok(None);
251 };
252
253 let puffin_manager = self.puffin_manager_factory.build(
254 file_cache.local_store(),
255 WriteCachePathProvider::new(self.region_id, file_cache.clone()),
256 );
257 let reader = puffin_manager
258 .reader(&file_id)
259 .await
260 .context(PuffinBuildReaderSnafu)?
261 .with_file_size_hint(file_size_hint)
262 .blob(&Self::column_blob_name(column_id))
263 .await
264 .context(PuffinReadBlobSnafu)?
265 .reader()
266 .await
267 .context(PuffinBuildReaderSnafu)?;
268 Ok(Some(reader))
269 }
270
271 fn column_blob_name(column_id: ColumnId) -> String {
273 format!("{INDEX_BLOB_TYPE}-{column_id}")
274 }
275
276 async fn remote_blob_reader(
278 &self,
279 file_id: FileId,
280 column_id: ColumnId,
281 file_size_hint: Option<u64>,
282 ) -> Result<BlobReader> {
283 let puffin_manager = self
284 .puffin_manager_factory
285 .build(
286 self.object_store.clone(),
287 RegionFilePathFactory::new(self.region_dir.clone()),
288 )
289 .with_puffin_metadata_cache(self.puffin_metadata_cache.clone());
290
291 puffin_manager
292 .reader(&file_id)
293 .await
294 .context(PuffinBuildReaderSnafu)?
295 .with_file_size_hint(file_size_hint)
296 .blob(&Self::column_blob_name(column_id))
297 .await
298 .context(PuffinReadBlobSnafu)?
299 .reader()
300 .await
301 .context(PuffinBuildReaderSnafu)
302 }
303
304 async fn apply_predicates<R: BloomFilterReader + Send + 'static>(
305 &self,
306 reader: R,
307 predicates: &[InListPredicate],
308 output: &mut [(usize, Vec<Range<usize>>)],
309 ) -> std::result::Result<(), index::bloom_filter::error::Error> {
310 let mut applier = BloomFilterApplier::new(Box::new(reader)).await?;
311
312 for (_, row_group_output) in output.iter_mut() {
313 if row_group_output.is_empty() {
315 continue;
316 }
317
318 *row_group_output = applier.search(predicates, row_group_output).await?;
319 }
320
321 Ok(())
322 }
323}
324
325fn is_blob_not_found(err: &Error) -> bool {
326 matches!(
327 err,
328 Error::PuffinReadBlob {
329 source: puffin::error::Error::BlobNotFound { .. },
330 ..
331 }
332 )
333}
334
335#[cfg(test)]
336mod tests {
337
338 use datafusion_expr::{col, lit, Expr};
339 use futures::future::BoxFuture;
340 use puffin::puffin_manager::PuffinWriter;
341 use store_api::metadata::RegionMetadata;
342
343 use super::*;
344 use crate::sst::index::bloom_filter::creator::tests::{
345 mock_object_store, mock_region_metadata, new_batch, new_intm_mgr,
346 };
347 use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
348
349 #[allow(clippy::type_complexity)]
350 fn tester(
351 region_dir: String,
352 object_store: ObjectStore,
353 metadata: &RegionMetadata,
354 puffin_manager_factory: PuffinManagerFactory,
355 file_id: FileId,
356 ) -> impl Fn(&[Expr], Vec<(usize, bool)>) -> BoxFuture<'static, Vec<(usize, Vec<Range<usize>>)>>
357 + use<'_> {
358 move |exprs, row_groups| {
359 let region_dir = region_dir.clone();
360 let object_store = object_store.clone();
361 let metadata = metadata.clone();
362 let puffin_manager_factory = puffin_manager_factory.clone();
363 let exprs = exprs.to_vec();
364
365 Box::pin(async move {
366 let builder = BloomFilterIndexApplierBuilder::new(
367 region_dir,
368 object_store,
369 &metadata,
370 puffin_manager_factory,
371 );
372
373 let applier = builder.build(&exprs).unwrap().unwrap();
374 applier
375 .apply(file_id, None, row_groups.into_iter())
376 .await
377 .unwrap()
378 })
379 }
380 }
381
382 #[tokio::test]
383 #[allow(clippy::single_range_in_vec_init)]
384 async fn test_bloom_filter_applier() {
385 let region_metadata = mock_region_metadata();
402 let prefix = "test_bloom_filter_applier_";
403 let (d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
404 let object_store = mock_object_store();
405 let intm_mgr = new_intm_mgr(d.path().to_string_lossy()).await;
406 let memory_usage_threshold = Some(1024);
407 let file_id = FileId::random();
408 let region_dir = "region_dir".to_string();
409
410 let mut indexer =
411 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
412 .unwrap()
413 .unwrap();
414
415 let mut batch = new_batch("tag1", 0..10);
417 indexer.update(&mut batch).await.unwrap();
418 let mut batch = new_batch("tag2", 10..20);
419 indexer.update(&mut batch).await.unwrap();
420
421 let puffin_manager = factory.build(
422 object_store.clone(),
423 RegionFilePathFactory::new(region_dir.clone()),
424 );
425
426 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
427 indexer.finish(&mut puffin_writer).await.unwrap();
428 puffin_writer.finish().await.unwrap();
429
430 let tester = tester(
431 region_dir.clone(),
432 object_store.clone(),
433 ®ion_metadata,
434 factory.clone(),
435 file_id,
436 );
437
438 let res = tester(
442 &[col("tag_str").eq(lit("tag1"))],
443 vec![(5, true), (5, true), (5, true), (5, true)],
444 )
445 .await;
446 assert_eq!(res, vec![(0, vec![0..5]), (1, vec![0..5])]);
447
448 let res = tester(
452 &[col("tag_str").eq(lit("tag1"))],
453 vec![(5, true), (5, false), (5, true), (5, true)],
454 )
455 .await;
456 assert_eq!(res, vec![(0, vec![0..5])]);
457
458 let res = tester(
463 &[
464 col("tag_str").eq(lit("tag1")),
465 col("field_u64").eq(lit(1u64)),
466 ],
467 vec![(5, true), (5, true), (5, true), (5, true)],
468 )
469 .await;
470 assert_eq!(res, vec![(0, vec![0..4])]);
471
472 let res = tester(
476 &[col("field_u64").in_list(vec![lit(1u64), lit(11u64)], false)],
477 vec![(5, true), (5, true), (5, false), (5, true)],
478 )
479 .await;
480 assert_eq!(res, vec![(0, vec![0..4]), (1, vec![3..5])]);
481 }
482}