1use std::collections::{HashMap, HashSet};
16
17use common_telemetry::warn;
18use datafusion_common::ScalarValue;
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadata;
29use store_api::storage::ColumnId;
30
31use crate::cache::file_cache::FileCacheRef;
32use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
33use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
34use crate::row_converter::SortField;
35use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
36use crate::sst::index::codec::IndexValueCodec;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38
39pub struct BloomFilterIndexApplierBuilder<'a> {
40 region_dir: String,
41 object_store: ObjectStore,
42 metadata: &'a RegionMetadata,
43 puffin_manager_factory: PuffinManagerFactory,
44 file_cache: Option<FileCacheRef>,
45 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
46 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
47 predicates: HashMap<ColumnId, Vec<InListPredicate>>,
48}
49
50impl<'a> BloomFilterIndexApplierBuilder<'a> {
51 pub fn new(
52 region_dir: String,
53 object_store: ObjectStore,
54 metadata: &'a RegionMetadata,
55 puffin_manager_factory: PuffinManagerFactory,
56 ) -> Self {
57 Self {
58 region_dir,
59 object_store,
60 metadata,
61 puffin_manager_factory,
62 file_cache: None,
63 puffin_metadata_cache: None,
64 bloom_filter_index_cache: None,
65 predicates: HashMap::default(),
66 }
67 }
68
69 pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
70 self.file_cache = file_cache;
71 self
72 }
73
74 pub fn with_puffin_metadata_cache(
75 mut self,
76 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
77 ) -> Self {
78 self.puffin_metadata_cache = puffin_metadata_cache;
79 self
80 }
81
82 pub fn with_bloom_filter_index_cache(
83 mut self,
84 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
85 ) -> Self {
86 self.bloom_filter_index_cache = bloom_filter_index_cache;
87 self
88 }
89
90 pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
92 for expr in exprs {
93 self.traverse_and_collect(expr);
94 }
95
96 if self.predicates.is_empty() {
97 return Ok(None);
98 }
99
100 let applier = BloomFilterIndexApplier::new(
101 self.region_dir,
102 self.metadata.region_id,
103 self.object_store,
104 self.puffin_manager_factory,
105 self.predicates,
106 )
107 .with_file_cache(self.file_cache)
108 .with_puffin_metadata_cache(self.puffin_metadata_cache)
109 .with_bloom_filter_cache(self.bloom_filter_index_cache);
110
111 Ok(Some(applier))
112 }
113
114 fn traverse_and_collect(&mut self, expr: &Expr) {
116 let res = match expr {
117 Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
118 Operator::And => {
119 self.traverse_and_collect(left);
120 self.traverse_and_collect(right);
121 Ok(())
122 }
123 Operator::Eq => self.collect_eq(left, right),
124 _ => Ok(()),
125 },
126 Expr::InList(in_list) => self.collect_in_list(in_list),
127 _ => Ok(()),
128 };
129
130 if let Err(err) = res {
131 warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
132 }
133 }
134
135 fn column_id_and_type(
137 &self,
138 column_name: &str,
139 ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
140 let column = self
141 .metadata
142 .column_by_name(column_name)
143 .context(ColumnNotFoundSnafu {
144 column: column_name,
145 })?;
146
147 Ok(Some((
148 column.column_id,
149 column.column_schema.data_type.clone(),
150 )))
151 }
152
153 fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
155 let (col, lit) = match (left, right) {
156 (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
157 (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
158 _ => return Ok(()),
159 };
160 if lit.is_null() {
161 return Ok(());
162 }
163 let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
164 return Ok(());
165 };
166 let value = encode_lit(lit, data_type)?;
167 self.predicates
168 .entry(column_id)
169 .or_default()
170 .push(InListPredicate {
171 list: HashSet::from([value]),
172 });
173
174 Ok(())
175 }
176
177 fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
179 let Expr::Column(column) = &in_list.expr.as_ref() else {
181 return Ok(());
182 };
183 if in_list.list.is_empty() || in_list.negated {
184 return Ok(());
185 }
186
187 let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
188 return Ok(());
189 };
190
191 let predicates = in_list
193 .list
194 .iter()
195 .filter_map(Self::nonnull_lit)
196 .map(|lit| encode_lit(lit, data_type.clone()));
197
198 let mut valid_predicates = HashSet::new();
200 for predicate in predicates {
201 match predicate {
202 Ok(p) => {
203 valid_predicates.insert(p);
204 }
205 Err(e) => warn!(e; "Failed to convert value in InList"),
206 }
207 }
208
209 if !valid_predicates.is_empty() {
210 self.predicates
211 .entry(column_id)
212 .or_default()
213 .push(InListPredicate {
214 list: valid_predicates,
215 });
216 }
217
218 Ok(())
219 }
220
221 fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
223 match expr {
224 Expr::Literal(lit) if !lit.is_null() => Some(lit),
225 _ => None,
226 }
227 }
228}
229
230fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
233 let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
234 let mut bytes = vec![];
235 let field = SortField::new(data_type);
236 IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
237 Ok(bytes)
238}
239
240#[cfg(test)]
241mod tests {
242 use api::v1::SemanticType;
243 use datafusion_common::Column;
244 use datatypes::schema::ColumnSchema;
245 use object_store::services::Memory;
246 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
247 use store_api::storage::RegionId;
248
249 use super::*;
250
251 fn test_region_metadata() -> RegionMetadata {
252 let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
253 builder
254 .push_column_metadata(ColumnMetadata {
255 column_schema: ColumnSchema::new(
256 "column1",
257 ConcreteDataType::string_datatype(),
258 false,
259 ),
260 semantic_type: SemanticType::Tag,
261 column_id: 1,
262 })
263 .push_column_metadata(ColumnMetadata {
264 column_schema: ColumnSchema::new(
265 "column2",
266 ConcreteDataType::int64_datatype(),
267 false,
268 ),
269 semantic_type: SemanticType::Field,
270 column_id: 2,
271 })
272 .push_column_metadata(ColumnMetadata {
273 column_schema: ColumnSchema::new(
274 "column3",
275 ConcreteDataType::timestamp_millisecond_datatype(),
276 false,
277 ),
278 semantic_type: SemanticType::Timestamp,
279 column_id: 3,
280 })
281 .primary_key(vec![1]);
282 builder.build().unwrap()
283 }
284
285 fn test_object_store() -> ObjectStore {
286 ObjectStore::new(Memory::default()).unwrap().finish()
287 }
288
289 fn column(name: &str) -> Expr {
290 Expr::Column(Column::from_name(name))
291 }
292
293 fn string_lit(s: impl Into<String>) -> Expr {
294 Expr::Literal(ScalarValue::Utf8(Some(s.into())))
295 }
296
297 #[test]
298 fn test_build_with_exprs() {
299 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
300 let metadata = test_region_metadata();
301 let builder = BloomFilterIndexApplierBuilder::new(
302 "test".to_string(),
303 test_object_store(),
304 &metadata,
305 factory,
306 );
307 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
308 left: Box::new(column("column1")),
309 op: Operator::Eq,
310 right: Box::new(string_lit("value1")),
311 })];
312 let result = builder.build(&exprs).unwrap();
313 assert!(result.is_some());
314
315 let predicates = result.unwrap().predicates;
316 assert_eq!(predicates.len(), 1);
317
318 let column_predicates = predicates.get(&1).unwrap();
319 assert_eq!(column_predicates.len(), 1);
320
321 let expected = encode_lit(
322 &ScalarValue::Utf8(Some("value1".to_string())),
323 ConcreteDataType::string_datatype(),
324 )
325 .unwrap();
326 assert_eq!(column_predicates[0].list, HashSet::from([expected]));
327 }
328
329 fn int64_lit(i: i64) -> Expr {
330 Expr::Literal(ScalarValue::Int64(Some(i)))
331 }
332
333 #[test]
334 fn test_build_with_in_list() {
335 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
336 let metadata = test_region_metadata();
337 let builder = BloomFilterIndexApplierBuilder::new(
338 "test".to_string(),
339 test_object_store(),
340 &metadata,
341 factory,
342 );
343
344 let exprs = vec![Expr::InList(InList {
345 expr: Box::new(column("column2")),
346 list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
347 negated: false,
348 })];
349
350 let result = builder.build(&exprs).unwrap();
351 assert!(result.is_some());
352
353 let predicates = result.unwrap().predicates;
354 let column_predicates = predicates.get(&2).unwrap();
355 assert_eq!(column_predicates.len(), 1);
356 assert_eq!(column_predicates[0].list.len(), 3);
357 }
358
359 #[test]
360 fn test_build_with_and_expressions() {
361 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
362 let metadata = test_region_metadata();
363 let builder = BloomFilterIndexApplierBuilder::new(
364 "test".to_string(),
365 test_object_store(),
366 &metadata,
367 factory,
368 );
369 let exprs = vec![Expr::BinaryExpr(BinaryExpr {
370 left: Box::new(Expr::BinaryExpr(BinaryExpr {
371 left: Box::new(column("column1")),
372 op: Operator::Eq,
373 right: Box::new(string_lit("value1")),
374 })),
375 op: Operator::And,
376 right: Box::new(Expr::BinaryExpr(BinaryExpr {
377 left: Box::new(column("column2")),
378 op: Operator::Eq,
379 right: Box::new(int64_lit(42)),
380 })),
381 })];
382 let result = builder.build(&exprs).unwrap();
383 assert!(result.is_some());
384
385 let predicates = result.unwrap().predicates;
386 assert_eq!(predicates.len(), 2);
387 assert!(predicates.contains_key(&1));
388 assert!(predicates.contains_key(&2));
389 }
390
391 #[test]
392 fn test_build_with_null_values() {
393 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
394 let metadata = test_region_metadata();
395 let builder = BloomFilterIndexApplierBuilder::new(
396 "test".to_string(),
397 test_object_store(),
398 &metadata,
399 factory,
400 );
401
402 let exprs = vec![
403 Expr::BinaryExpr(BinaryExpr {
404 left: Box::new(column("column1")),
405 op: Operator::Eq,
406 right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
407 }),
408 Expr::InList(InList {
409 expr: Box::new(column("column2")),
410 list: vec![
411 int64_lit(1),
412 Expr::Literal(ScalarValue::Int64(None)),
413 int64_lit(3),
414 ],
415 negated: false,
416 }),
417 ];
418
419 let result = builder.build(&exprs).unwrap();
420 assert!(result.is_some());
421
422 let predicates = result.unwrap().predicates;
423 assert!(!predicates.contains_key(&1)); let column2_predicates = predicates.get(&2).unwrap();
425 assert_eq!(column2_predicates[0].list.len(), 2);
426 }
427
428 #[test]
429 fn test_build_with_invalid_expressions() {
430 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
431 let metadata = test_region_metadata();
432 let builder = BloomFilterIndexApplierBuilder::new(
433 "test".to_string(),
434 test_object_store(),
435 &metadata,
436 factory,
437 );
438 let exprs = vec![
439 Expr::BinaryExpr(BinaryExpr {
441 left: Box::new(column("column1")),
442 op: Operator::Gt,
443 right: Box::new(string_lit("value1")),
444 }),
445 Expr::BinaryExpr(BinaryExpr {
447 left: Box::new(column("non_existent")),
448 op: Operator::Eq,
449 right: Box::new(string_lit("value")),
450 }),
451 Expr::InList(InList {
453 expr: Box::new(column("column2")),
454 list: vec![int64_lit(1), int64_lit(2)],
455 negated: true,
456 }),
457 ];
458
459 let result = builder.build(&exprs).unwrap();
460 assert!(result.is_none());
461 }
462
463 #[test]
464 fn test_build_with_multiple_predicates_same_column() {
465 let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
466 let metadata = test_region_metadata();
467 let builder = BloomFilterIndexApplierBuilder::new(
468 "test".to_string(),
469 test_object_store(),
470 &metadata,
471 factory,
472 );
473 let exprs = vec![
474 Expr::BinaryExpr(BinaryExpr {
475 left: Box::new(column("column1")),
476 op: Operator::Eq,
477 right: Box::new(string_lit("value1")),
478 }),
479 Expr::InList(InList {
480 expr: Box::new(column("column1")),
481 list: vec![string_lit("value2"), string_lit("value3")],
482 negated: false,
483 }),
484 ];
485
486 let result = builder.build(&exprs).unwrap();
487 assert!(result.is_some());
488
489 let predicates = result.unwrap().predicates;
490 let column_predicates = predicates.get(&1).unwrap();
491 assert_eq!(column_predicates.len(), 2);
492 }
493}