1use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::storage::ColumnId;
32
33use crate::cache::file_cache::FileCacheRef;
34use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
35use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
36use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38
39pub struct BloomFilterIndexApplierBuilder<'a> {
40 region_dir: String,
41 object_store: ObjectStore,
42 metadata: &'a RegionMetadata,
43 puffin_manager_factory: PuffinManagerFactory,
44 file_cache: Option<FileCacheRef>,
45 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
46 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
47 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
48}
49
50impl<'a> BloomFilterIndexApplierBuilder<'a> {
51 pub fn new(
52 region_dir: String,
53 object_store: ObjectStore,
54 metadata: &'a RegionMetadata,
55 puffin_manager_factory: PuffinManagerFactory,
56 ) -> Self {
57 Self {
58 region_dir,
59 object_store,
60 metadata,
61 puffin_manager_factory,
62 file_cache: None,
63 puffin_metadata_cache: None,
64 bloom_filter_index_cache: None,
65 predicates: BTreeMap::default(),
66 }
67 }
68
69 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
70 self.file_cache = file_cache;
71 self
72 }
73
74 pub fn with_puffin_metadata_cache(
75 mut self,
76 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
77 ) -> Self {
78 self.puffin_metadata_cache = puffin_metadata_cache;
79 self
80 }
81
82 pub fn with_bloom_filter_index_cache(
83 mut self,
84 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
85 ) -> Self {
86 self.bloom_filter_index_cache = bloom_filter_index_cache;
87 self
88 }
89
90 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
92 for expr in exprs {
93 self.traverse_and_collect(expr);
94 }
95
96 if self.predicates.is_empty() {
97 return Ok(None);
98 }
99
100 let applier = BloomFilterIndexApplier::new(
101 self.region_dir,
102 self.metadata.region_id,
103 self.object_store,
104 self.puffin_manager_factory,
105 self.predicates,
106 )
107 .with_file_cache(self.file_cache)
108 .with_puffin_metadata_cache(self.puffin_metadata_cache)
109 .with_bloom_filter_cache(self.bloom_filter_index_cache);
110
111 Ok(Some(applier))
112 }
113
114 fn traverse_and_collect(&mut self, expr: &Expr) {
116 let res = match expr {
117 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
118 Operator::And => {
119 self.traverse_and_collect(left);
120 self.traverse_and_collect(right);
121 Ok(())
122 }
123 Operator::Eq => self.collect_eq(left, right),
124 Operator::Or => self.collect_or_eq_list(left, right),
125 _ => Ok(()),
126 },
127 Expr::InList(in_list) => self.collect_in_list(in_list),
128 _ => Ok(()),
129 };
130
131 if let Err(err) = res {
132 warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
133 }
134 }
135
136 fn column_id_and_type(
138 &self,
139 column_name: &str,
140 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
141 let column = self
142 .metadata
143 .column_by_name(column_name)
144 .context(ColumnNotFoundSnafu {
145 column: column_name,
146 })?;
147
148 Ok(Some((
149 column.column_id,
150 column.column_schema.data_type.clone(),
151 )))
152 }
153
154 fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
156 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
157 return Ok(());
158 };
159 if lit.is_null() {
160 return Ok(());
161 }
162 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
163 return Ok(());
164 };
165 let value = encode_lit(lit, data_type)?;
166 self.predicates
167 .entry(column_id)
168 .or_default()
169 .push(InListPredicate {
170 list: BTreeSet::from([value]),
171 });
172
173 Ok(())
174 }
175
176 fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
178 let Expr::Column(column) = &in_list.expr.as_ref() else {
180 return Ok(());
181 };
182 if in_list.list.is_empty() || in_list.negated {
183 return Ok(());
184 }
185
186 let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
187 return Ok(());
188 };
189
190 let predicates = in_list
192 .list
193 .iter()
194 .filter_map(Self::nonnull_lit)
195 .map(|lit| encode_lit(lit, data_type.clone()));
196
197 let mut valid_predicates = BTreeSet::new();
199 for predicate in predicates {
200 match predicate {
201 Ok(p) => {
202 valid_predicates.insert(p);
203 }
204 Err(e) => warn!(e; "Failed to convert value in InList"),
205 }
206 }
207
208 if !valid_predicates.is_empty() {
209 self.predicates
210 .entry(column_id)
211 .or_default()
212 .push(InListPredicate {
213 list: valid_predicates,
214 });
215 }
216
217 Ok(())
218 }
219
220 fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
222 let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
223 left: l,
224 op: Operator::Eq,
225 right: r,
226 }) = left
227 {
228 (l, r, right)
229 } else if let Expr::BinaryExpr(BinaryExpr {
230 left: l,
231 op: Operator::Eq,
232 right: r,
233 }) = right
234 {
235 (l, r, left)
236 } else {
237 return Ok(());
238 };
239
240 let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
241 return Ok(());
242 };
243 if lit.is_null() {
244 return Ok(());
245 }
246 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
247 return Ok(());
248 };
249
250 let mut inlist = BTreeSet::new();
251 inlist.insert(encode_lit(lit, data_type.clone())?);
252 if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
253 self.predicates
254 .entry(column_id)
255 .or_default()
256 .push(InListPredicate { list: inlist });
257 }
258
259 Ok(())
260 }
261
262 fn collect_or_eq_list_rec(
263 column_name: &str,
264 data_type: &ConcreteDataType,
265 expr: &Expr,
266 inlist: &mut BTreeSet<Bytes>,
267 ) -> Result<bool> {
268 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
269 match op {
270 Operator::Or => {
271 let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
272 .then(|| {
273 Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
274 })
275 .transpose()?
276 .unwrap_or(false);
277 return Ok(r);
278 }
279 Operator::Eq => {
280 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
281 return Ok(false);
282 };
283 if lit.is_null() || column_name != col.name {
284 return Ok(false);
285 }
286 let bytes = encode_lit(lit, data_type.clone())?;
287 inlist.insert(bytes);
288 return Ok(true);
289 }
290 _ => {}
291 }
292 }
293
294 Ok(false)
295 }
296
297 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
299 match expr {
300 Expr::Literal(lit) if !lit.is_null() => Some(lit),
301 _ => None,
302 }
303 }
304
305 fn eq_expr_col_lit<'b>(
307 left: &'b Expr,
308 right: &'b Expr,
309 ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
310 let (col, lit) = match (left, right) {
311 (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
312 (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
313 _ => return Ok(None),
314 };
315 Ok(Some((col, lit)))
316 }
317}
318
319fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
322 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
323 let mut bytes = vec![];
324 let field = SortField::new(data_type);
325 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
326 .context(EncodeSnafu)?;
327 Ok(bytes)
328}
329
330#[cfg(test)]
331mod tests {
332 use api::v1::SemanticType;
333 use datafusion_common::Column;
334 use datafusion_expr::{col, lit};
335 use datatypes::schema::ColumnSchema;
336 use object_store::services::Memory;
337 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
338 use store_api::storage::RegionId;
339
340 use super::*;
341
342 fn test_region_metadata() -> RegionMetadata {
343 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
344 builder
345 .push_column_metadata(ColumnMetadata {
346 column_schema: ColumnSchema::new(
347 "column1",
348 ConcreteDataType::string_datatype(),
349 false,
350 ),
351 semantic_type: SemanticType::Tag,
352 column_id: 1,
353 })
354 .push_column_metadata(ColumnMetadata {
355 column_schema: ColumnSchema::new(
356 "column2",
357 ConcreteDataType::int64_datatype(),
358 false,
359 ),
360 semantic_type: SemanticType::Field,
361 column_id: 2,
362 })
363 .push_column_metadata(ColumnMetadata {
364 column_schema: ColumnSchema::new(
365 "column3",
366 ConcreteDataType::timestamp_millisecond_datatype(),
367 false,
368 ),
369 semantic_type: SemanticType::Timestamp,
370 column_id: 3,
371 })
372 .primary_key(vec![1]);
373 builder.build().unwrap()
374 }
375
376 fn test_object_store() -> ObjectStore {
377 ObjectStore::new(Memory::default()).unwrap().finish()
378 }
379
380 fn column(name: &str) -> Expr {
381 Expr::Column(Column::from_name(name))
382 }
383
384 fn string_lit(s: impl Into<String>) -> Expr {
385 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
386 }
387
388 #[test]
389 fn test_build_with_exprs() {
390 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
391 let metadata = test_region_metadata();
392 let builder = BloomFilterIndexApplierBuilder::new(
393 "test".to_string(),
394 test_object_store(),
395 &metadata,
396 factory,
397 );
398 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
399 left: Box::new(column("column1")),
400 op: Operator::Eq,
401 right: Box::new(string_lit("value1")),
402 })];
403 let result = builder.build(&exprs).unwrap();
404 assert!(result.is_some());
405
406 let predicates = result.unwrap().predicates;
407 assert_eq!(predicates.len(), 1);
408
409 let column_predicates = predicates.get(&1).unwrap();
410 assert_eq!(column_predicates.len(), 1);
411
412 let expected = encode_lit(
413 &ScalarValue::Utf8(Some("value1".to_string())),
414 ConcreteDataType::string_datatype(),
415 )
416 .unwrap();
417 assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
418 }
419
420 fn int64_lit(i: i64) -> Expr {
421 Expr::Literal(ScalarValue::Int64(Some(i)))
422 }
423
424 #[test]
425 fn test_build_with_in_list() {
426 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
427 let metadata = test_region_metadata();
428 let builder = BloomFilterIndexApplierBuilder::new(
429 "test".to_string(),
430 test_object_store(),
431 &metadata,
432 factory,
433 );
434
435 let exprs = vec![Expr::InList(InList {
436 expr: Box::new(column("column2")),
437 list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
438 negated: false,
439 })];
440
441 let result = builder.build(&exprs).unwrap();
442 assert!(result.is_some());
443
444 let predicates = result.unwrap().predicates;
445 let column_predicates = predicates.get(&2).unwrap();
446 assert_eq!(column_predicates.len(), 1);
447 assert_eq!(column_predicates[0].list.len(), 3);
448 }
449
450 #[test]
451 fn test_build_with_or_chain() {
452 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
453 let metadata = test_region_metadata();
454 let builder = || {
455 BloomFilterIndexApplierBuilder::new(
456 "test".to_string(),
457 test_object_store(),
458 &metadata,
459 factory.clone(),
460 )
461 };
462
463 let expr = col("column1")
464 .eq(lit("value1"))
465 .or(col("column1")
466 .eq(lit("value2"))
467 .or(col("column1").eq(lit("value4"))))
468 .or(col("column1").eq(lit("value3")));
469
470 let result = builder().build(&[expr]).unwrap();
471 assert!(result.is_some());
472
473 let predicates = result.unwrap().predicates;
474 let column_predicates = predicates.get(&1).unwrap();
475 assert_eq!(column_predicates.len(), 1);
476 assert_eq!(column_predicates[0].list.len(), 4);
477 let or_chain_predicates = &column_predicates[0].list;
478 let encode_str = |s: &str| {
479 encode_lit(
480 &ScalarValue::Utf8(Some(s.to_string())),
481 ConcreteDataType::string_datatype(),
482 )
483 .unwrap()
484 };
485 assert!(or_chain_predicates.contains(&encode_str("value1")));
486 assert!(or_chain_predicates.contains(&encode_str("value2")));
487 assert!(or_chain_predicates.contains(&encode_str("value3")));
488 assert!(or_chain_predicates.contains(&encode_str("value4")));
489
490 let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None)));
492 let result = builder().build(&[expr]).unwrap();
493 assert!(result.is_none());
494
495 let expr = col("column1")
497 .eq(lit("value1"))
498 .or(col("column2").eq(lit("value2")));
499 let result = builder().build(&[expr]).unwrap();
500 assert!(result.is_none());
501
502 let expr = col("column1")
504 .eq(lit("value1"))
505 .or(col("column1").gt_eq(lit("value2")));
506 let result = builder().build(&[expr]).unwrap();
507 assert!(result.is_none());
508 }
509
510 #[test]
511 fn test_build_with_and_expressions() {
512 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
513 let metadata = test_region_metadata();
514 let builder = BloomFilterIndexApplierBuilder::new(
515 "test".to_string(),
516 test_object_store(),
517 &metadata,
518 factory,
519 );
520 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
521 left: Box::new(Expr::BinaryExpr(BinaryExpr {
522 left: Box::new(column("column1")),
523 op: Operator::Eq,
524 right: Box::new(string_lit("value1")),
525 })),
526 op: Operator::And,
527 right: Box::new(Expr::BinaryExpr(BinaryExpr {
528 left: Box::new(column("column2")),
529 op: Operator::Eq,
530 right: Box::new(int64_lit(42)),
531 })),
532 })];
533 let result = builder.build(&exprs).unwrap();
534 assert!(result.is_some());
535
536 let predicates = result.unwrap().predicates;
537 assert_eq!(predicates.len(), 2);
538 assert!(predicates.contains_key(&1));
539 assert!(predicates.contains_key(&2));
540 }
541
542 #[test]
543 fn test_build_with_null_values() {
544 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
545 let metadata = test_region_metadata();
546 let builder = BloomFilterIndexApplierBuilder::new(
547 "test".to_string(),
548 test_object_store(),
549 &metadata,
550 factory,
551 );
552
553 let exprs = vec![
554 Expr::BinaryExpr(BinaryExpr {
555 left: Box::new(column("column1")),
556 op: Operator::Eq,
557 right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
558 }),
559 Expr::InList(InList {
560 expr: Box::new(column("column2")),
561 list: vec![
562 int64_lit(1),
563 Expr::Literal(ScalarValue::Int64(None)),
564 int64_lit(3),
565 ],
566 negated: false,
567 }),
568 ];
569
570 let result = builder.build(&exprs).unwrap();
571 assert!(result.is_some());
572
573 let predicates = result.unwrap().predicates;
574 assert!(!predicates.contains_key(&1)); let column2_predicates = predicates.get(&2).unwrap();
576 assert_eq!(column2_predicates[0].list.len(), 2);
577 }
578
579 #[test]
580 fn test_build_with_invalid_expressions() {
581 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
582 let metadata = test_region_metadata();
583 let builder = BloomFilterIndexApplierBuilder::new(
584 "test".to_string(),
585 test_object_store(),
586 &metadata,
587 factory,
588 );
589 let exprs = vec![
590 Expr::BinaryExpr(BinaryExpr {
592 left: Box::new(column("column1")),
593 op: Operator::Gt,
594 right: Box::new(string_lit("value1")),
595 }),
596 Expr::BinaryExpr(BinaryExpr {
598 left: Box::new(column("non_existent")),
599 op: Operator::Eq,
600 right: Box::new(string_lit("value")),
601 }),
602 Expr::InList(InList {
604 expr: Box::new(column("column2")),
605 list: vec![int64_lit(1), int64_lit(2)],
606 negated: true,
607 }),
608 ];
609
610 let result = builder.build(&exprs).unwrap();
611 assert!(result.is_none());
612 }
613
614 #[test]
615 fn test_build_with_multiple_predicates_same_column() {
616 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
617 let metadata = test_region_metadata();
618 let builder = BloomFilterIndexApplierBuilder::new(
619 "test".to_string(),
620 test_object_store(),
621 &metadata,
622 factory,
623 );
624 let exprs = vec![
625 Expr::BinaryExpr(BinaryExpr {
626 left: Box::new(column("column1")),
627 op: Operator::Eq,
628 right: Box::new(string_lit("value1")),
629 }),
630 Expr::InList(InList {
631 expr: Box::new(column("column1")),
632 list: vec![string_lit("value2"), string_lit("value3")],
633 negated: false,
634 }),
635 ];
636
637 let result = builder.build(&exprs).unwrap();
638 assert!(result.is_some());
639
640 let predicates = result.unwrap().predicates;
641 let column_predicates = predicates.get(&1).unwrap();
642 assert_eq!(column_predicates.len(), 2);
643 }
644}