1use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::Bytes;
24use index::bloom_filter::applier::InListPredicate;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41 table_dir: String,
42 path_type: PathType,
43 object_store: ObjectStore,
44 metadata: &'a RegionMetadata,
45 puffin_manager_factory: PuffinManagerFactory,
46 file_cache: Option<FileCacheRef>,
47 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53 pub fn new(
54 table_dir: String,
55 path_type: PathType,
56 object_store: ObjectStore,
57 metadata: &'a RegionMetadata,
58 puffin_manager_factory: PuffinManagerFactory,
59 ) -> Self {
60 Self {
61 table_dir,
62 path_type,
63 object_store,
64 metadata,
65 puffin_manager_factory,
66 file_cache: None,
67 puffin_metadata_cache: None,
68 bloom_filter_index_cache: None,
69 predicates: BTreeMap::default(),
70 }
71 }
72
73 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74 self.file_cache = file_cache;
75 self
76 }
77
78 pub fn with_puffin_metadata_cache(
79 mut self,
80 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81 ) -> Self {
82 self.puffin_metadata_cache = puffin_metadata_cache;
83 self
84 }
85
86 pub fn with_bloom_filter_index_cache(
87 mut self,
88 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89 ) -> Self {
90 self.bloom_filter_index_cache = bloom_filter_index_cache;
91 self
92 }
93
94 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96 for expr in exprs {
97 self.traverse_and_collect(expr);
98 }
99
100 if self.predicates.is_empty() {
101 return Ok(None);
102 }
103
104 let expected_predicate_column_types = self.expected_predicate_column_types();
105 let applier = BloomFilterIndexApplier::new(
106 self.table_dir,
107 self.path_type,
108 self.object_store,
109 self.puffin_manager_factory,
110 self.predicates,
111 expected_predicate_column_types,
112 )
113 .with_file_cache(self.file_cache)
114 .with_puffin_metadata_cache(self.puffin_metadata_cache)
115 .with_bloom_filter_cache(self.bloom_filter_index_cache);
116
117 Ok(Some(applier))
118 }
119
120 fn traverse_and_collect(&mut self, expr: &Expr) {
122 let res = match expr {
123 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
124 Operator::And => {
125 self.traverse_and_collect(left);
126 self.traverse_and_collect(right);
127 Ok(())
128 }
129 Operator::Eq => self.collect_eq(left, right),
130 Operator::Or => self.collect_or_eq_list(left, right),
131 _ => Ok(()),
132 },
133 Expr::InList(in_list) => self.collect_in_list(in_list),
134 _ => Ok(()),
135 };
136
137 if let Err(err) = res {
138 warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
139 }
140 }
141
142 fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
144 self.predicates
145 .keys()
146 .filter_map(|col_id| {
147 let col = self.metadata.column_by_id(*col_id)?;
148 Some((*col_id, col.column_schema.data_type.clone()))
149 })
150 .collect()
151 }
152
153 fn column_id_and_type(
155 &self,
156 column_name: &str,
157 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
158 let column = self
159 .metadata
160 .column_by_name(column_name)
161 .context(ColumnNotFoundSnafu {
162 column: column_name,
163 })?;
164
165 Ok(Some((
166 column.column_id,
167 column.column_schema.data_type.clone(),
168 )))
169 }
170
171 fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
173 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
174 return Ok(());
175 };
176 if lit.is_null() {
177 return Ok(());
178 }
179 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
180 return Ok(());
181 };
182 let value = encode_lit(lit, data_type)?;
183 self.predicates
184 .entry(column_id)
185 .or_default()
186 .push(InListPredicate {
187 list: BTreeSet::from([value]),
188 });
189
190 Ok(())
191 }
192
193 fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
195 let Expr::Column(column) = &in_list.expr.as_ref() else {
197 return Ok(());
198 };
199 if in_list.list.is_empty() || in_list.negated {
200 return Ok(());
201 }
202
203 let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
204 return Ok(());
205 };
206
207 let predicates = in_list
209 .list
210 .iter()
211 .filter_map(Self::nonnull_lit)
212 .map(|lit| encode_lit(lit, data_type.clone()));
213
214 let mut valid_predicates = BTreeSet::new();
216 for predicate in predicates {
217 match predicate {
218 Ok(p) => {
219 valid_predicates.insert(p);
220 }
221 Err(e) => warn!(e; "Failed to convert value in InList"),
222 }
223 }
224
225 if !valid_predicates.is_empty() {
226 self.predicates
227 .entry(column_id)
228 .or_default()
229 .push(InListPredicate {
230 list: valid_predicates,
231 });
232 }
233
234 Ok(())
235 }
236
237 fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
239 let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
240 left: l,
241 op: Operator::Eq,
242 right: r,
243 }) = left
244 {
245 (l, r, right)
246 } else if let Expr::BinaryExpr(BinaryExpr {
247 left: l,
248 op: Operator::Eq,
249 right: r,
250 }) = right
251 {
252 (l, r, left)
253 } else {
254 return Ok(());
255 };
256
257 let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
258 return Ok(());
259 };
260 if lit.is_null() {
261 return Ok(());
262 }
263 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
264 return Ok(());
265 };
266
267 let mut inlist = BTreeSet::new();
268 inlist.insert(encode_lit(lit, data_type.clone())?);
269 if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
270 self.predicates
271 .entry(column_id)
272 .or_default()
273 .push(InListPredicate { list: inlist });
274 }
275
276 Ok(())
277 }
278
279 fn collect_or_eq_list_rec(
280 column_name: &str,
281 data_type: &ConcreteDataType,
282 expr: &Expr,
283 inlist: &mut BTreeSet<Bytes>,
284 ) -> Result<bool> {
285 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
286 match op {
287 Operator::Or => {
288 let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
289 .then(|| {
290 Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
291 })
292 .transpose()?
293 .unwrap_or(false);
294 return Ok(r);
295 }
296 Operator::Eq => {
297 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
298 return Ok(false);
299 };
300 if lit.is_null() || column_name != col.name {
301 return Ok(false);
302 }
303 let bytes = encode_lit(lit, data_type.clone())?;
304 inlist.insert(bytes);
305 return Ok(true);
306 }
307 _ => {}
308 }
309 }
310
311 Ok(false)
312 }
313
314 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
316 match expr {
317 Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
318 _ => None,
319 }
320 }
321
322 fn eq_expr_col_lit<'b>(
324 left: &'b Expr,
325 right: &'b Expr,
326 ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
327 let (col, lit) = match (left, right) {
328 (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
329 (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
330 _ => return Ok(None),
331 };
332 Ok(Some((col, lit)))
333 }
334}
335
336fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
339 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
340 let mut bytes = vec![];
341 let field = SortField::new(data_type);
342 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
343 .context(EncodeSnafu)?;
344 Ok(bytes)
345}
346
347#[cfg(test)]
348mod tests {
349 use api::v1::SemanticType;
350 use datafusion_common::Column;
351 use datafusion_expr::{Literal, col, lit};
352 use datatypes::schema::ColumnSchema;
353 use object_store::services::Memory;
354 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
355 use store_api::storage::RegionId;
356
357 use super::*;
358
359 fn test_region_metadata() -> RegionMetadata {
360 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
361 builder
362 .push_column_metadata(ColumnMetadata {
363 column_schema: ColumnSchema::new(
364 "column1",
365 ConcreteDataType::string_datatype(),
366 false,
367 ),
368 semantic_type: SemanticType::Tag,
369 column_id: 1,
370 })
371 .push_column_metadata(ColumnMetadata {
372 column_schema: ColumnSchema::new(
373 "column2",
374 ConcreteDataType::int64_datatype(),
375 false,
376 ),
377 semantic_type: SemanticType::Field,
378 column_id: 2,
379 })
380 .push_column_metadata(ColumnMetadata {
381 column_schema: ColumnSchema::new(
382 "column3",
383 ConcreteDataType::timestamp_millisecond_datatype(),
384 false,
385 ),
386 semantic_type: SemanticType::Timestamp,
387 column_id: 3,
388 })
389 .primary_key(vec![1]);
390 builder.build().unwrap()
391 }
392
393 fn test_object_store() -> ObjectStore {
394 ObjectStore::new(Memory::default()).unwrap().finish()
395 }
396
397 fn column(name: &str) -> Expr {
398 Expr::Column(Column::from_name(name))
399 }
400
401 #[test]
402 fn test_build_with_exprs() {
403 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
404 let metadata = test_region_metadata();
405 let builder = BloomFilterIndexApplierBuilder::new(
406 "test".to_string(),
407 PathType::Bare,
408 test_object_store(),
409 &metadata,
410 factory,
411 );
412 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
413 left: Box::new(column("column1")),
414 op: Operator::Eq,
415 right: Box::new("value1".lit()),
416 })];
417 let result = builder.build(&exprs).unwrap();
418 assert!(result.is_some());
419
420 let predicates = result.unwrap().default_predicates;
421 assert_eq!(predicates.len(), 1);
422
423 let column_predicates = predicates.get(&1).unwrap();
424 assert_eq!(column_predicates.len(), 1);
425
426 let expected = encode_lit(
427 &ScalarValue::Utf8(Some("value1".to_string())),
428 ConcreteDataType::string_datatype(),
429 )
430 .unwrap();
431 assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
432 }
433
434 fn int64_lit(i: i64) -> Expr {
435 i.lit()
436 }
437
438 #[test]
439 fn test_build_with_in_list() {
440 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
441 let metadata = test_region_metadata();
442 let builder = BloomFilterIndexApplierBuilder::new(
443 "test".to_string(),
444 PathType::Bare,
445 test_object_store(),
446 &metadata,
447 factory,
448 );
449
450 let exprs = vec![Expr::InList(InList {
451 expr: Box::new(column("column2")),
452 list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
453 negated: false,
454 })];
455
456 let result = builder.build(&exprs).unwrap();
457 assert!(result.is_some());
458
459 let predicates = result.unwrap().default_predicates;
460 let column_predicates = predicates.get(&2).unwrap();
461 assert_eq!(column_predicates.len(), 1);
462 assert_eq!(column_predicates[0].list.len(), 3);
463 }
464
465 #[test]
466 fn test_build_with_or_chain() {
467 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
468 let metadata = test_region_metadata();
469 let builder = || {
470 BloomFilterIndexApplierBuilder::new(
471 "test".to_string(),
472 PathType::Bare,
473 test_object_store(),
474 &metadata,
475 factory.clone(),
476 )
477 };
478
479 let expr = col("column1")
480 .eq(lit("value1"))
481 .or(col("column1")
482 .eq(lit("value2"))
483 .or(col("column1").eq(lit("value4"))))
484 .or(col("column1").eq(lit("value3")));
485
486 let result = builder().build(&[expr]).unwrap();
487 assert!(result.is_some());
488
489 let predicates = result.unwrap().default_predicates;
490 let column_predicates = predicates.get(&1).unwrap();
491 assert_eq!(column_predicates.len(), 1);
492 assert_eq!(column_predicates[0].list.len(), 4);
493 let or_chain_predicates = &column_predicates[0].list;
494 let encode_str = |s: &str| {
495 encode_lit(
496 &ScalarValue::Utf8(Some(s.to_string())),
497 ConcreteDataType::string_datatype(),
498 )
499 .unwrap()
500 };
501 assert!(or_chain_predicates.contains(&encode_str("value1")));
502 assert!(or_chain_predicates.contains(&encode_str("value2")));
503 assert!(or_chain_predicates.contains(&encode_str("value3")));
504 assert!(or_chain_predicates.contains(&encode_str("value4")));
505
506 let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None), None));
508 let result = builder().build(&[expr]).unwrap();
509 assert!(result.is_none());
510
511 let expr = col("column1")
513 .eq(lit("value1"))
514 .or(col("column2").eq(lit("value2")));
515 let result = builder().build(&[expr]).unwrap();
516 assert!(result.is_none());
517
518 let expr = col("column1")
520 .eq(lit("value1"))
521 .or(col("column1").gt_eq(lit("value2")));
522 let result = builder().build(&[expr]).unwrap();
523 assert!(result.is_none());
524 }
525
526 #[test]
527 fn test_build_with_and_expressions() {
528 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
529 let metadata = test_region_metadata();
530 let builder = BloomFilterIndexApplierBuilder::new(
531 "test".to_string(),
532 PathType::Bare,
533 test_object_store(),
534 &metadata,
535 factory,
536 );
537 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
538 left: Box::new(Expr::BinaryExpr(BinaryExpr {
539 left: Box::new(column("column1")),
540 op: Operator::Eq,
541 right: Box::new("value1".lit()),
542 })),
543 op: Operator::And,
544 right: Box::new(Expr::BinaryExpr(BinaryExpr {
545 left: Box::new(column("column2")),
546 op: Operator::Eq,
547 right: Box::new(int64_lit(42)),
548 })),
549 })];
550 let result = builder.build(&exprs).unwrap();
551 assert!(result.is_some());
552
553 let predicates = result.unwrap().default_predicates;
554 assert_eq!(predicates.len(), 2);
555 assert!(predicates.contains_key(&1));
556 assert!(predicates.contains_key(&2));
557 }
558
559 #[test]
560 fn test_build_with_null_values() {
561 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
562 let metadata = test_region_metadata();
563 let builder = BloomFilterIndexApplierBuilder::new(
564 "test".to_string(),
565 PathType::Bare,
566 test_object_store(),
567 &metadata,
568 factory,
569 );
570
571 let exprs = vec![
572 Expr::BinaryExpr(BinaryExpr {
573 left: Box::new(column("column1")),
574 op: Operator::Eq,
575 right: Box::new(Expr::Literal(ScalarValue::Utf8(None), None)),
576 }),
577 Expr::InList(InList {
578 expr: Box::new(column("column2")),
579 list: vec![
580 int64_lit(1),
581 Expr::Literal(ScalarValue::Int64(None), None),
582 int64_lit(3),
583 ],
584 negated: false,
585 }),
586 ];
587
588 let result = builder.build(&exprs).unwrap();
589 assert!(result.is_some());
590
591 let predicates = result.unwrap().default_predicates;
592 assert!(!predicates.contains_key(&1)); let column2_predicates = predicates.get(&2).unwrap();
594 assert_eq!(column2_predicates[0].list.len(), 2);
595 }
596
597 #[test]
598 fn test_build_with_invalid_expressions() {
599 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
600 let metadata = test_region_metadata();
601 let builder = BloomFilterIndexApplierBuilder::new(
602 "test".to_string(),
603 PathType::Bare,
604 test_object_store(),
605 &metadata,
606 factory,
607 );
608 let exprs = vec![
609 Expr::BinaryExpr(BinaryExpr {
611 left: Box::new(column("column1")),
612 op: Operator::Gt,
613 right: Box::new("value1".lit()),
614 }),
615 Expr::BinaryExpr(BinaryExpr {
617 left: Box::new(column("non_existent")),
618 op: Operator::Eq,
619 right: Box::new("value".lit()),
620 }),
621 Expr::InList(InList {
623 expr: Box::new(column("column2")),
624 list: vec![int64_lit(1), int64_lit(2)],
625 negated: true,
626 }),
627 ];
628
629 let result = builder.build(&exprs).unwrap();
630 assert!(result.is_none());
631 }
632
633 #[test]
634 fn test_build_with_multiple_predicates_same_column() {
635 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
636 let metadata = test_region_metadata();
637 let builder = BloomFilterIndexApplierBuilder::new(
638 "test".to_string(),
639 PathType::Bare,
640 test_object_store(),
641 &metadata,
642 factory,
643 );
644 let exprs = vec![
645 Expr::BinaryExpr(BinaryExpr {
646 left: Box::new(column("column1")),
647 op: Operator::Eq,
648 right: Box::new("value1".lit()),
649 }),
650 Expr::InList(InList {
651 expr: Box::new(column("column1")),
652 list: vec!["value2".lit(), "value3".lit()],
653 negated: false,
654 }),
655 ];
656
657 let result = builder.build(&exprs).unwrap();
658 assert!(result.is_some());
659
660 let predicates = result.unwrap().default_predicates;
661 let column_predicates = predicates.get(&1).unwrap();
662 assert_eq!(column_predicates.len(), 2);
663 }
664}