1use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41 table_dir: String,
42 path_type: PathType,
43 object_store: ObjectStore,
44 metadata: &'a RegionMetadata,
45 puffin_manager_factory: PuffinManagerFactory,
46 file_cache: Option<FileCacheRef>,
47 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53 pub fn new(
54 table_dir: String,
55 path_type: PathType,
56 object_store: ObjectStore,
57 metadata: &'a RegionMetadata,
58 puffin_manager_factory: PuffinManagerFactory,
59 ) -> Self {
60 Self {
61 table_dir,
62 path_type,
63 object_store,
64 metadata,
65 puffin_manager_factory,
66 file_cache: None,
67 puffin_metadata_cache: None,
68 bloom_filter_index_cache: None,
69 predicates: BTreeMap::default(),
70 }
71 }
72
73 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74 self.file_cache = file_cache;
75 self
76 }
77
78 pub fn with_puffin_metadata_cache(
79 mut self,
80 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81 ) -> Self {
82 self.puffin_metadata_cache = puffin_metadata_cache;
83 self
84 }
85
86 pub fn with_bloom_filter_index_cache(
87 mut self,
88 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89 ) -> Self {
90 self.bloom_filter_index_cache = bloom_filter_index_cache;
91 self
92 }
93
94 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96 for expr in exprs {
97 self.traverse_and_collect(expr);
98 }
99
100 if self.predicates.is_empty() {
101 return Ok(None);
102 }
103
104 let applier = BloomFilterIndexApplier::new(
105 self.table_dir,
106 self.path_type,
107 self.object_store,
108 self.puffin_manager_factory,
109 self.predicates,
110 )
111 .with_file_cache(self.file_cache)
112 .with_puffin_metadata_cache(self.puffin_metadata_cache)
113 .with_bloom_filter_cache(self.bloom_filter_index_cache);
114
115 Ok(Some(applier))
116 }
117
118 fn traverse_and_collect(&mut self, expr: &Expr) {
120 let res = match expr {
121 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
122 Operator::And => {
123 self.traverse_and_collect(left);
124 self.traverse_and_collect(right);
125 Ok(())
126 }
127 Operator::Eq => self.collect_eq(left, right),
128 Operator::Or => self.collect_or_eq_list(left, right),
129 _ => Ok(()),
130 },
131 Expr::InList(in_list) => self.collect_in_list(in_list),
132 _ => Ok(()),
133 };
134
135 if let Err(err) = res {
136 warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
137 }
138 }
139
140 fn column_id_and_type(
142 &self,
143 column_name: &str,
144 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
145 let column = self
146 .metadata
147 .column_by_name(column_name)
148 .context(ColumnNotFoundSnafu {
149 column: column_name,
150 })?;
151
152 Ok(Some((
153 column.column_id,
154 column.column_schema.data_type.clone(),
155 )))
156 }
157
158 fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
160 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
161 return Ok(());
162 };
163 if lit.is_null() {
164 return Ok(());
165 }
166 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
167 return Ok(());
168 };
169 let value = encode_lit(lit, data_type)?;
170 self.predicates
171 .entry(column_id)
172 .or_default()
173 .push(InListPredicate {
174 list: BTreeSet::from([value]),
175 });
176
177 Ok(())
178 }
179
180 fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
182 let Expr::Column(column) = &in_list.expr.as_ref() else {
184 return Ok(());
185 };
186 if in_list.list.is_empty() || in_list.negated {
187 return Ok(());
188 }
189
190 let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
191 return Ok(());
192 };
193
194 let predicates = in_list
196 .list
197 .iter()
198 .filter_map(Self::nonnull_lit)
199 .map(|lit| encode_lit(lit, data_type.clone()));
200
201 let mut valid_predicates = BTreeSet::new();
203 for predicate in predicates {
204 match predicate {
205 Ok(p) => {
206 valid_predicates.insert(p);
207 }
208 Err(e) => warn!(e; "Failed to convert value in InList"),
209 }
210 }
211
212 if !valid_predicates.is_empty() {
213 self.predicates
214 .entry(column_id)
215 .or_default()
216 .push(InListPredicate {
217 list: valid_predicates,
218 });
219 }
220
221 Ok(())
222 }
223
224 fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
226 let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
227 left: l,
228 op: Operator::Eq,
229 right: r,
230 }) = left
231 {
232 (l, r, right)
233 } else if let Expr::BinaryExpr(BinaryExpr {
234 left: l,
235 op: Operator::Eq,
236 right: r,
237 }) = right
238 {
239 (l, r, left)
240 } else {
241 return Ok(());
242 };
243
244 let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
245 return Ok(());
246 };
247 if lit.is_null() {
248 return Ok(());
249 }
250 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
251 return Ok(());
252 };
253
254 let mut inlist = BTreeSet::new();
255 inlist.insert(encode_lit(lit, data_type.clone())?);
256 if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
257 self.predicates
258 .entry(column_id)
259 .or_default()
260 .push(InListPredicate { list: inlist });
261 }
262
263 Ok(())
264 }
265
266 fn collect_or_eq_list_rec(
267 column_name: &str,
268 data_type: &ConcreteDataType,
269 expr: &Expr,
270 inlist: &mut BTreeSet<Bytes>,
271 ) -> Result<bool> {
272 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
273 match op {
274 Operator::Or => {
275 let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
276 .then(|| {
277 Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
278 })
279 .transpose()?
280 .unwrap_or(false);
281 return Ok(r);
282 }
283 Operator::Eq => {
284 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
285 return Ok(false);
286 };
287 if lit.is_null() || column_name != col.name {
288 return Ok(false);
289 }
290 let bytes = encode_lit(lit, data_type.clone())?;
291 inlist.insert(bytes);
292 return Ok(true);
293 }
294 _ => {}
295 }
296 }
297
298 Ok(false)
299 }
300
301 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
303 match expr {
304 Expr::Literal(lit) if !lit.is_null() => Some(lit),
305 _ => None,
306 }
307 }
308
309 fn eq_expr_col_lit<'b>(
311 left: &'b Expr,
312 right: &'b Expr,
313 ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
314 let (col, lit) = match (left, right) {
315 (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
316 (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
317 _ => return Ok(None),
318 };
319 Ok(Some((col, lit)))
320 }
321}
322
323fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
326 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
327 let mut bytes = vec![];
328 let field = SortField::new(data_type);
329 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
330 .context(EncodeSnafu)?;
331 Ok(bytes)
332}
333
334#[cfg(test)]
335mod tests {
336 use api::v1::SemanticType;
337 use datafusion_common::Column;
338 use datafusion_expr::{col, lit};
339 use datatypes::schema::ColumnSchema;
340 use object_store::services::Memory;
341 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
342 use store_api::storage::RegionId;
343
344 use super::*;
345
346 fn test_region_metadata() -> RegionMetadata {
347 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
348 builder
349 .push_column_metadata(ColumnMetadata {
350 column_schema: ColumnSchema::new(
351 "column1",
352 ConcreteDataType::string_datatype(),
353 false,
354 ),
355 semantic_type: SemanticType::Tag,
356 column_id: 1,
357 })
358 .push_column_metadata(ColumnMetadata {
359 column_schema: ColumnSchema::new(
360 "column2",
361 ConcreteDataType::int64_datatype(),
362 false,
363 ),
364 semantic_type: SemanticType::Field,
365 column_id: 2,
366 })
367 .push_column_metadata(ColumnMetadata {
368 column_schema: ColumnSchema::new(
369 "column3",
370 ConcreteDataType::timestamp_millisecond_datatype(),
371 false,
372 ),
373 semantic_type: SemanticType::Timestamp,
374 column_id: 3,
375 })
376 .primary_key(vec![1]);
377 builder.build().unwrap()
378 }
379
380 fn test_object_store() -> ObjectStore {
381 ObjectStore::new(Memory::default()).unwrap().finish()
382 }
383
384 fn column(name: &str) -> Expr {
385 Expr::Column(Column::from_name(name))
386 }
387
388 fn string_lit(s: impl Into<String>) -> Expr {
389 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
390 }
391
392 #[test]
393 fn test_build_with_exprs() {
394 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
395 let metadata = test_region_metadata();
396 let builder = BloomFilterIndexApplierBuilder::new(
397 "test".to_string(),
398 PathType::Bare,
399 test_object_store(),
400 &metadata,
401 factory,
402 );
403 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
404 left: Box::new(column("column1")),
405 op: Operator::Eq,
406 right: Box::new(string_lit("value1")),
407 })];
408 let result = builder.build(&exprs).unwrap();
409 assert!(result.is_some());
410
411 let predicates = result.unwrap().predicates;
412 assert_eq!(predicates.len(), 1);
413
414 let column_predicates = predicates.get(&1).unwrap();
415 assert_eq!(column_predicates.len(), 1);
416
417 let expected = encode_lit(
418 &ScalarValue::Utf8(Some("value1".to_string())),
419 ConcreteDataType::string_datatype(),
420 )
421 .unwrap();
422 assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
423 }
424
425 fn int64_lit(i: i64) -> Expr {
426 Expr::Literal(ScalarValue::Int64(Some(i)))
427 }
428
429 #[test]
430 fn test_build_with_in_list() {
431 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
432 let metadata = test_region_metadata();
433 let builder = BloomFilterIndexApplierBuilder::new(
434 "test".to_string(),
435 PathType::Bare,
436 test_object_store(),
437 &metadata,
438 factory,
439 );
440
441 let exprs = vec![Expr::InList(InList {
442 expr: Box::new(column("column2")),
443 list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
444 negated: false,
445 })];
446
447 let result = builder.build(&exprs).unwrap();
448 assert!(result.is_some());
449
450 let predicates = result.unwrap().predicates;
451 let column_predicates = predicates.get(&2).unwrap();
452 assert_eq!(column_predicates.len(), 1);
453 assert_eq!(column_predicates[0].list.len(), 3);
454 }
455
456 #[test]
457 fn test_build_with_or_chain() {
458 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
459 let metadata = test_region_metadata();
460 let builder = || {
461 BloomFilterIndexApplierBuilder::new(
462 "test".to_string(),
463 PathType::Bare,
464 test_object_store(),
465 &metadata,
466 factory.clone(),
467 )
468 };
469
470 let expr = col("column1")
471 .eq(lit("value1"))
472 .or(col("column1")
473 .eq(lit("value2"))
474 .or(col("column1").eq(lit("value4"))))
475 .or(col("column1").eq(lit("value3")));
476
477 let result = builder().build(&[expr]).unwrap();
478 assert!(result.is_some());
479
480 let predicates = result.unwrap().predicates;
481 let column_predicates = predicates.get(&1).unwrap();
482 assert_eq!(column_predicates.len(), 1);
483 assert_eq!(column_predicates[0].list.len(), 4);
484 let or_chain_predicates = &column_predicates[0].list;
485 let encode_str = |s: &str| {
486 encode_lit(
487 &ScalarValue::Utf8(Some(s.to_string())),
488 ConcreteDataType::string_datatype(),
489 )
490 .unwrap()
491 };
492 assert!(or_chain_predicates.contains(&encode_str("value1")));
493 assert!(or_chain_predicates.contains(&encode_str("value2")));
494 assert!(or_chain_predicates.contains(&encode_str("value3")));
495 assert!(or_chain_predicates.contains(&encode_str("value4")));
496
497 let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None)));
499 let result = builder().build(&[expr]).unwrap();
500 assert!(result.is_none());
501
502 let expr = col("column1")
504 .eq(lit("value1"))
505 .or(col("column2").eq(lit("value2")));
506 let result = builder().build(&[expr]).unwrap();
507 assert!(result.is_none());
508
509 let expr = col("column1")
511 .eq(lit("value1"))
512 .or(col("column1").gt_eq(lit("value2")));
513 let result = builder().build(&[expr]).unwrap();
514 assert!(result.is_none());
515 }
516
517 #[test]
518 fn test_build_with_and_expressions() {
519 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
520 let metadata = test_region_metadata();
521 let builder = BloomFilterIndexApplierBuilder::new(
522 "test".to_string(),
523 PathType::Bare,
524 test_object_store(),
525 &metadata,
526 factory,
527 );
528 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
529 left: Box::new(Expr::BinaryExpr(BinaryExpr {
530 left: Box::new(column("column1")),
531 op: Operator::Eq,
532 right: Box::new(string_lit("value1")),
533 })),
534 op: Operator::And,
535 right: Box::new(Expr::BinaryExpr(BinaryExpr {
536 left: Box::new(column("column2")),
537 op: Operator::Eq,
538 right: Box::new(int64_lit(42)),
539 })),
540 })];
541 let result = builder.build(&exprs).unwrap();
542 assert!(result.is_some());
543
544 let predicates = result.unwrap().predicates;
545 assert_eq!(predicates.len(), 2);
546 assert!(predicates.contains_key(&1));
547 assert!(predicates.contains_key(&2));
548 }
549
550 #[test]
551 fn test_build_with_null_values() {
552 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
553 let metadata = test_region_metadata();
554 let builder = BloomFilterIndexApplierBuilder::new(
555 "test".to_string(),
556 PathType::Bare,
557 test_object_store(),
558 &metadata,
559 factory,
560 );
561
562 let exprs = vec![
563 Expr::BinaryExpr(BinaryExpr {
564 left: Box::new(column("column1")),
565 op: Operator::Eq,
566 right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
567 }),
568 Expr::InList(InList {
569 expr: Box::new(column("column2")),
570 list: vec![
571 int64_lit(1),
572 Expr::Literal(ScalarValue::Int64(None)),
573 int64_lit(3),
574 ],
575 negated: false,
576 }),
577 ];
578
579 let result = builder.build(&exprs).unwrap();
580 assert!(result.is_some());
581
582 let predicates = result.unwrap().predicates;
583 assert!(!predicates.contains_key(&1)); let column2_predicates = predicates.get(&2).unwrap();
585 assert_eq!(column2_predicates[0].list.len(), 2);
586 }
587
588 #[test]
589 fn test_build_with_invalid_expressions() {
590 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
591 let metadata = test_region_metadata();
592 let builder = BloomFilterIndexApplierBuilder::new(
593 "test".to_string(),
594 PathType::Bare,
595 test_object_store(),
596 &metadata,
597 factory,
598 );
599 let exprs = vec![
600 Expr::BinaryExpr(BinaryExpr {
602 left: Box::new(column("column1")),
603 op: Operator::Gt,
604 right: Box::new(string_lit("value1")),
605 }),
606 Expr::BinaryExpr(BinaryExpr {
608 left: Box::new(column("non_existent")),
609 op: Operator::Eq,
610 right: Box::new(string_lit("value")),
611 }),
612 Expr::InList(InList {
614 expr: Box::new(column("column2")),
615 list: vec![int64_lit(1), int64_lit(2)],
616 negated: true,
617 }),
618 ];
619
620 let result = builder.build(&exprs).unwrap();
621 assert!(result.is_none());
622 }
623
624 #[test]
625 fn test_build_with_multiple_predicates_same_column() {
626 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
627 let metadata = test_region_metadata();
628 let builder = BloomFilterIndexApplierBuilder::new(
629 "test".to_string(),
630 PathType::Bare,
631 test_object_store(),
632 &metadata,
633 factory,
634 );
635 let exprs = vec![
636 Expr::BinaryExpr(BinaryExpr {
637 left: Box::new(column("column1")),
638 op: Operator::Eq,
639 right: Box::new(string_lit("value1")),
640 }),
641 Expr::InList(InList {
642 expr: Box::new(column("column1")),
643 list: vec![string_lit("value2"), string_lit("value3")],
644 negated: false,
645 }),
646 ];
647
648 let result = builder.build(&exprs).unwrap();
649 assert!(result.is_some());
650
651 let predicates = result.unwrap().predicates;
652 let column_predicates = predicates.get(&1).unwrap();
653 assert_eq!(column_predicates.len(), 2);
654 }
655}