1use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41 table_dir: String,
42 path_type: PathType,
43 object_store: ObjectStore,
44 metadata: &'a RegionMetadata,
45 puffin_manager_factory: PuffinManagerFactory,
46 file_cache: Option<FileCacheRef>,
47 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49 predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53 pub fn new(
54 table_dir: String,
55 path_type: PathType,
56 object_store: ObjectStore,
57 metadata: &'a RegionMetadata,
58 puffin_manager_factory: PuffinManagerFactory,
59 ) -> Self {
60 Self {
61 table_dir,
62 path_type,
63 object_store,
64 metadata,
65 puffin_manager_factory,
66 file_cache: None,
67 puffin_metadata_cache: None,
68 bloom_filter_index_cache: None,
69 predicates: BTreeMap::default(),
70 }
71 }
72
73 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74 self.file_cache = file_cache;
75 self
76 }
77
78 pub fn with_puffin_metadata_cache(
79 mut self,
80 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81 ) -> Self {
82 self.puffin_metadata_cache = puffin_metadata_cache;
83 self
84 }
85
86 pub fn with_bloom_filter_index_cache(
87 mut self,
88 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89 ) -> Self {
90 self.bloom_filter_index_cache = bloom_filter_index_cache;
91 self
92 }
93
94 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96 for expr in exprs {
97 self.traverse_and_collect(expr);
98 }
99
100 if self.predicates.is_empty() {
101 return Ok(None);
102 }
103
104 let applier = BloomFilterIndexApplier::new(
105 self.table_dir,
106 self.path_type,
107 self.object_store,
108 self.puffin_manager_factory,
109 self.predicates,
110 )
111 .with_file_cache(self.file_cache)
112 .with_puffin_metadata_cache(self.puffin_metadata_cache)
113 .with_bloom_filter_cache(self.bloom_filter_index_cache);
114
115 Ok(Some(applier))
116 }
117
118 fn traverse_and_collect(&mut self, expr: &Expr) {
120 let res = match expr {
121 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
122 Operator::And => {
123 self.traverse_and_collect(left);
124 self.traverse_and_collect(right);
125 Ok(())
126 }
127 Operator::Eq => self.collect_eq(left, right),
128 Operator::Or => self.collect_or_eq_list(left, right),
129 _ => Ok(()),
130 },
131 Expr::InList(in_list) => self.collect_in_list(in_list),
132 _ => Ok(()),
133 };
134
135 if let Err(err) = res {
136 warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
137 }
138 }
139
140 fn column_id_and_type(
142 &self,
143 column_name: &str,
144 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
145 let column = self
146 .metadata
147 .column_by_name(column_name)
148 .context(ColumnNotFoundSnafu {
149 column: column_name,
150 })?;
151
152 Ok(Some((
153 column.column_id,
154 column.column_schema.data_type.clone(),
155 )))
156 }
157
158 fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
160 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
161 return Ok(());
162 };
163 if lit.is_null() {
164 return Ok(());
165 }
166 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
167 return Ok(());
168 };
169 let value = encode_lit(lit, data_type)?;
170 self.predicates
171 .entry(column_id)
172 .or_default()
173 .push(InListPredicate {
174 list: BTreeSet::from([value]),
175 });
176
177 Ok(())
178 }
179
180 fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
182 let Expr::Column(column) = &in_list.expr.as_ref() else {
184 return Ok(());
185 };
186 if in_list.list.is_empty() || in_list.negated {
187 return Ok(());
188 }
189
190 let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
191 return Ok(());
192 };
193
194 let predicates = in_list
196 .list
197 .iter()
198 .filter_map(Self::nonnull_lit)
199 .map(|lit| encode_lit(lit, data_type.clone()));
200
201 let mut valid_predicates = BTreeSet::new();
203 for predicate in predicates {
204 match predicate {
205 Ok(p) => {
206 valid_predicates.insert(p);
207 }
208 Err(e) => warn!(e; "Failed to convert value in InList"),
209 }
210 }
211
212 if !valid_predicates.is_empty() {
213 self.predicates
214 .entry(column_id)
215 .or_default()
216 .push(InListPredicate {
217 list: valid_predicates,
218 });
219 }
220
221 Ok(())
222 }
223
224 fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
226 let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
227 left: l,
228 op: Operator::Eq,
229 right: r,
230 }) = left
231 {
232 (l, r, right)
233 } else if let Expr::BinaryExpr(BinaryExpr {
234 left: l,
235 op: Operator::Eq,
236 right: r,
237 }) = right
238 {
239 (l, r, left)
240 } else {
241 return Ok(());
242 };
243
244 let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
245 return Ok(());
246 };
247 if lit.is_null() {
248 return Ok(());
249 }
250 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
251 return Ok(());
252 };
253
254 let mut inlist = BTreeSet::new();
255 inlist.insert(encode_lit(lit, data_type.clone())?);
256 if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
257 self.predicates
258 .entry(column_id)
259 .or_default()
260 .push(InListPredicate { list: inlist });
261 }
262
263 Ok(())
264 }
265
266 fn collect_or_eq_list_rec(
267 column_name: &str,
268 data_type: &ConcreteDataType,
269 expr: &Expr,
270 inlist: &mut BTreeSet<Bytes>,
271 ) -> Result<bool> {
272 if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
273 match op {
274 Operator::Or => {
275 let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
276 .then(|| {
277 Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
278 })
279 .transpose()?
280 .unwrap_or(false);
281 return Ok(r);
282 }
283 Operator::Eq => {
284 let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
285 return Ok(false);
286 };
287 if lit.is_null() || column_name != col.name {
288 return Ok(false);
289 }
290 let bytes = encode_lit(lit, data_type.clone())?;
291 inlist.insert(bytes);
292 return Ok(true);
293 }
294 _ => {}
295 }
296 }
297
298 Ok(false)
299 }
300
301 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
303 match expr {
304 Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
305 _ => None,
306 }
307 }
308
309 fn eq_expr_col_lit<'b>(
311 left: &'b Expr,
312 right: &'b Expr,
313 ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
314 let (col, lit) = match (left, right) {
315 (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
316 (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
317 _ => return Ok(None),
318 };
319 Ok(Some((col, lit)))
320 }
321}
322
323fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
326 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
327 let mut bytes = vec![];
328 let field = SortField::new(data_type);
329 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
330 .context(EncodeSnafu)?;
331 Ok(bytes)
332}
333
334#[cfg(test)]
335mod tests {
336 use api::v1::SemanticType;
337 use datafusion_common::Column;
338 use datafusion_expr::{col, lit, Literal};
339 use datatypes::schema::ColumnSchema;
340 use object_store::services::Memory;
341 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
342 use store_api::storage::RegionId;
343
344 use super::*;
345
346 fn test_region_metadata() -> RegionMetadata {
347 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
348 builder
349 .push_column_metadata(ColumnMetadata {
350 column_schema: ColumnSchema::new(
351 "column1",
352 ConcreteDataType::string_datatype(),
353 false,
354 ),
355 semantic_type: SemanticType::Tag,
356 column_id: 1,
357 })
358 .push_column_metadata(ColumnMetadata {
359 column_schema: ColumnSchema::new(
360 "column2",
361 ConcreteDataType::int64_datatype(),
362 false,
363 ),
364 semantic_type: SemanticType::Field,
365 column_id: 2,
366 })
367 .push_column_metadata(ColumnMetadata {
368 column_schema: ColumnSchema::new(
369 "column3",
370 ConcreteDataType::timestamp_millisecond_datatype(),
371 false,
372 ),
373 semantic_type: SemanticType::Timestamp,
374 column_id: 3,
375 })
376 .primary_key(vec![1]);
377 builder.build().unwrap()
378 }
379
380 fn test_object_store() -> ObjectStore {
381 ObjectStore::new(Memory::default()).unwrap().finish()
382 }
383
384 fn column(name: &str) -> Expr {
385 Expr::Column(Column::from_name(name))
386 }
387
388 #[test]
389 fn test_build_with_exprs() {
390 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
391 let metadata = test_region_metadata();
392 let builder = BloomFilterIndexApplierBuilder::new(
393 "test".to_string(),
394 PathType::Bare,
395 test_object_store(),
396 &metadata,
397 factory,
398 );
399 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
400 left: Box::new(column("column1")),
401 op: Operator::Eq,
402 right: Box::new("value1".lit()),
403 })];
404 let result = builder.build(&exprs).unwrap();
405 assert!(result.is_some());
406
407 let predicates = result.unwrap().predicates;
408 assert_eq!(predicates.len(), 1);
409
410 let column_predicates = predicates.get(&1).unwrap();
411 assert_eq!(column_predicates.len(), 1);
412
413 let expected = encode_lit(
414 &ScalarValue::Utf8(Some("value1".to_string())),
415 ConcreteDataType::string_datatype(),
416 )
417 .unwrap();
418 assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
419 }
420
421 fn int64_lit(i: i64) -> Expr {
422 i.lit()
423 }
424
425 #[test]
426 fn test_build_with_in_list() {
427 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
428 let metadata = test_region_metadata();
429 let builder = BloomFilterIndexApplierBuilder::new(
430 "test".to_string(),
431 PathType::Bare,
432 test_object_store(),
433 &metadata,
434 factory,
435 );
436
437 let exprs = vec![Expr::InList(InList {
438 expr: Box::new(column("column2")),
439 list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
440 negated: false,
441 })];
442
443 let result = builder.build(&exprs).unwrap();
444 assert!(result.is_some());
445
446 let predicates = result.unwrap().predicates;
447 let column_predicates = predicates.get(&2).unwrap();
448 assert_eq!(column_predicates.len(), 1);
449 assert_eq!(column_predicates[0].list.len(), 3);
450 }
451
452 #[test]
453 fn test_build_with_or_chain() {
454 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
455 let metadata = test_region_metadata();
456 let builder = || {
457 BloomFilterIndexApplierBuilder::new(
458 "test".to_string(),
459 PathType::Bare,
460 test_object_store(),
461 &metadata,
462 factory.clone(),
463 )
464 };
465
466 let expr = col("column1")
467 .eq(lit("value1"))
468 .or(col("column1")
469 .eq(lit("value2"))
470 .or(col("column1").eq(lit("value4"))))
471 .or(col("column1").eq(lit("value3")));
472
473 let result = builder().build(&[expr]).unwrap();
474 assert!(result.is_some());
475
476 let predicates = result.unwrap().predicates;
477 let column_predicates = predicates.get(&1).unwrap();
478 assert_eq!(column_predicates.len(), 1);
479 assert_eq!(column_predicates[0].list.len(), 4);
480 let or_chain_predicates = &column_predicates[0].list;
481 let encode_str = |s: &str| {
482 encode_lit(
483 &ScalarValue::Utf8(Some(s.to_string())),
484 ConcreteDataType::string_datatype(),
485 )
486 .unwrap()
487 };
488 assert!(or_chain_predicates.contains(&encode_str("value1")));
489 assert!(or_chain_predicates.contains(&encode_str("value2")));
490 assert!(or_chain_predicates.contains(&encode_str("value3")));
491 assert!(or_chain_predicates.contains(&encode_str("value4")));
492
493 let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None), None));
495 let result = builder().build(&[expr]).unwrap();
496 assert!(result.is_none());
497
498 let expr = col("column1")
500 .eq(lit("value1"))
501 .or(col("column2").eq(lit("value2")));
502 let result = builder().build(&[expr]).unwrap();
503 assert!(result.is_none());
504
505 let expr = col("column1")
507 .eq(lit("value1"))
508 .or(col("column1").gt_eq(lit("value2")));
509 let result = builder().build(&[expr]).unwrap();
510 assert!(result.is_none());
511 }
512
513 #[test]
514 fn test_build_with_and_expressions() {
515 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
516 let metadata = test_region_metadata();
517 let builder = BloomFilterIndexApplierBuilder::new(
518 "test".to_string(),
519 PathType::Bare,
520 test_object_store(),
521 &metadata,
522 factory,
523 );
524 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
525 left: Box::new(Expr::BinaryExpr(BinaryExpr {
526 left: Box::new(column("column1")),
527 op: Operator::Eq,
528 right: Box::new("value1".lit()),
529 })),
530 op: Operator::And,
531 right: Box::new(Expr::BinaryExpr(BinaryExpr {
532 left: Box::new(column("column2")),
533 op: Operator::Eq,
534 right: Box::new(int64_lit(42)),
535 })),
536 })];
537 let result = builder.build(&exprs).unwrap();
538 assert!(result.is_some());
539
540 let predicates = result.unwrap().predicates;
541 assert_eq!(predicates.len(), 2);
542 assert!(predicates.contains_key(&1));
543 assert!(predicates.contains_key(&2));
544 }
545
546 #[test]
547 fn test_build_with_null_values() {
548 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
549 let metadata = test_region_metadata();
550 let builder = BloomFilterIndexApplierBuilder::new(
551 "test".to_string(),
552 PathType::Bare,
553 test_object_store(),
554 &metadata,
555 factory,
556 );
557
558 let exprs = vec![
559 Expr::BinaryExpr(BinaryExpr {
560 left: Box::new(column("column1")),
561 op: Operator::Eq,
562 right: Box::new(Expr::Literal(ScalarValue::Utf8(None), None)),
563 }),
564 Expr::InList(InList {
565 expr: Box::new(column("column2")),
566 list: vec![
567 int64_lit(1),
568 Expr::Literal(ScalarValue::Int64(None), None),
569 int64_lit(3),
570 ],
571 negated: false,
572 }),
573 ];
574
575 let result = builder.build(&exprs).unwrap();
576 assert!(result.is_some());
577
578 let predicates = result.unwrap().predicates;
579 assert!(!predicates.contains_key(&1)); let column2_predicates = predicates.get(&2).unwrap();
581 assert_eq!(column2_predicates[0].list.len(), 2);
582 }
583
584 #[test]
585 fn test_build_with_invalid_expressions() {
586 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
587 let metadata = test_region_metadata();
588 let builder = BloomFilterIndexApplierBuilder::new(
589 "test".to_string(),
590 PathType::Bare,
591 test_object_store(),
592 &metadata,
593 factory,
594 );
595 let exprs = vec![
596 Expr::BinaryExpr(BinaryExpr {
598 left: Box::new(column("column1")),
599 op: Operator::Gt,
600 right: Box::new("value1".lit()),
601 }),
602 Expr::BinaryExpr(BinaryExpr {
604 left: Box::new(column("non_existent")),
605 op: Operator::Eq,
606 right: Box::new("value".lit()),
607 }),
608 Expr::InList(InList {
610 expr: Box::new(column("column2")),
611 list: vec![int64_lit(1), int64_lit(2)],
612 negated: true,
613 }),
614 ];
615
616 let result = builder.build(&exprs).unwrap();
617 assert!(result.is_none());
618 }
619
620 #[test]
621 fn test_build_with_multiple_predicates_same_column() {
622 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
623 let metadata = test_region_metadata();
624 let builder = BloomFilterIndexApplierBuilder::new(
625 "test".to_string(),
626 PathType::Bare,
627 test_object_store(),
628 &metadata,
629 factory,
630 );
631 let exprs = vec![
632 Expr::BinaryExpr(BinaryExpr {
633 left: Box::new(column("column1")),
634 op: Operator::Eq,
635 right: Box::new("value1".lit()),
636 }),
637 Expr::InList(InList {
638 expr: Box::new(column("column1")),
639 list: vec!["value2".lit(), "value3".lit()],
640 negated: false,
641 }),
642 ];
643
644 let result = builder.build(&exprs).unwrap();
645 assert!(result.is_some());
646
647 let predicates = result.unwrap().predicates;
648 let column_predicates = predicates.get(&1).unwrap();
649 assert_eq!(column_predicates.len(), 2);
650 }
651}