mito2/sst/index/bloom_filter/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41    table_dir: String,
42    path_type: PathType,
43    object_store: ObjectStore,
44    metadata: &'a RegionMetadata,
45    puffin_manager_factory: PuffinManagerFactory,
46    file_cache: Option<FileCacheRef>,
47    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49    predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53    pub fn new(
54        table_dir: String,
55        path_type: PathType,
56        object_store: ObjectStore,
57        metadata: &'a RegionMetadata,
58        puffin_manager_factory: PuffinManagerFactory,
59    ) -> Self {
60        Self {
61            table_dir,
62            path_type,
63            object_store,
64            metadata,
65            puffin_manager_factory,
66            file_cache: None,
67            puffin_metadata_cache: None,
68            bloom_filter_index_cache: None,
69            predicates: BTreeMap::default(),
70        }
71    }
72
73    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74        self.file_cache = file_cache;
75        self
76    }
77
78    pub fn with_puffin_metadata_cache(
79        mut self,
80        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81    ) -> Self {
82        self.puffin_metadata_cache = puffin_metadata_cache;
83        self
84    }
85
86    pub fn with_bloom_filter_index_cache(
87        mut self,
88        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89    ) -> Self {
90        self.bloom_filter_index_cache = bloom_filter_index_cache;
91        self
92    }
93
94    /// Builds the applier with given filter expressions
95    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96        for expr in exprs {
97            self.traverse_and_collect(expr);
98        }
99
100        if self.predicates.is_empty() {
101            return Ok(None);
102        }
103
104        let applier = BloomFilterIndexApplier::new(
105            self.table_dir,
106            self.path_type,
107            self.object_store,
108            self.puffin_manager_factory,
109            self.predicates,
110        )
111        .with_file_cache(self.file_cache)
112        .with_puffin_metadata_cache(self.puffin_metadata_cache)
113        .with_bloom_filter_cache(self.bloom_filter_index_cache);
114
115        Ok(Some(applier))
116    }
117
118    /// Recursively traverses expressions to collect bloom filter predicates
119    fn traverse_and_collect(&mut self, expr: &Expr) {
120        let res = match expr {
121            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
122                Operator::And => {
123                    self.traverse_and_collect(left);
124                    self.traverse_and_collect(right);
125                    Ok(())
126                }
127                Operator::Eq => self.collect_eq(left, right),
128                Operator::Or => self.collect_or_eq_list(left, right),
129                _ => Ok(()),
130            },
131            Expr::InList(in_list) => self.collect_in_list(in_list),
132            _ => Ok(()),
133        };
134
135        if let Err(err) = res {
136            warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
137        }
138    }
139
140    /// Helper function to get the column id and type
141    fn column_id_and_type(
142        &self,
143        column_name: &str,
144    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
145        let column = self
146            .metadata
147            .column_by_name(column_name)
148            .context(ColumnNotFoundSnafu {
149                column: column_name,
150            })?;
151
152        Ok(Some((
153            column.column_id,
154            column.column_schema.data_type.clone(),
155        )))
156    }
157
158    /// Collects an equality expression (column = value)
159    fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
160        let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
161            return Ok(());
162        };
163        if lit.is_null() {
164            return Ok(());
165        }
166        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
167            return Ok(());
168        };
169        let value = encode_lit(lit, data_type)?;
170        self.predicates
171            .entry(column_id)
172            .or_default()
173            .push(InListPredicate {
174                list: BTreeSet::from([value]),
175            });
176
177        Ok(())
178    }
179
180    /// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
181    fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
182        // Only collect InList predicates if they reference a column
183        let Expr::Column(column) = &in_list.expr.as_ref() else {
184            return Ok(());
185        };
186        if in_list.list.is_empty() || in_list.negated {
187            return Ok(());
188        }
189
190        let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
191            return Ok(());
192        };
193
194        // Convert all non-null literals to predicates
195        let predicates = in_list
196            .list
197            .iter()
198            .filter_map(Self::nonnull_lit)
199            .map(|lit| encode_lit(lit, data_type.clone()));
200
201        // Collect successful conversions
202        let mut valid_predicates = BTreeSet::new();
203        for predicate in predicates {
204            match predicate {
205                Ok(p) => {
206                    valid_predicates.insert(p);
207                }
208                Err(e) => warn!(e; "Failed to convert value in InList"),
209            }
210        }
211
212        if !valid_predicates.is_empty() {
213            self.predicates
214                .entry(column_id)
215                .or_default()
216                .push(InListPredicate {
217                    list: valid_predicates,
218                });
219        }
220
221        Ok(())
222    }
223
224    /// Collects an or expression in the form of `column = lit OR column = lit OR ...`.
225    fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
226        let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
227            left: l,
228            op: Operator::Eq,
229            right: r,
230        }) = left
231        {
232            (l, r, right)
233        } else if let Expr::BinaryExpr(BinaryExpr {
234            left: l,
235            op: Operator::Eq,
236            right: r,
237        }) = right
238        {
239            (l, r, left)
240        } else {
241            return Ok(());
242        };
243
244        let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
245            return Ok(());
246        };
247        if lit.is_null() {
248            return Ok(());
249        }
250        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
251            return Ok(());
252        };
253
254        let mut inlist = BTreeSet::new();
255        inlist.insert(encode_lit(lit, data_type.clone())?);
256        if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
257            self.predicates
258                .entry(column_id)
259                .or_default()
260                .push(InListPredicate { list: inlist });
261        }
262
263        Ok(())
264    }
265
266    fn collect_or_eq_list_rec(
267        column_name: &str,
268        data_type: &ConcreteDataType,
269        expr: &Expr,
270        inlist: &mut BTreeSet<Bytes>,
271    ) -> Result<bool> {
272        if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
273            match op {
274                Operator::Or => {
275                    let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
276                        .then(|| {
277                            Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
278                        })
279                        .transpose()?
280                        .unwrap_or(false);
281                    return Ok(r);
282                }
283                Operator::Eq => {
284                    let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
285                        return Ok(false);
286                    };
287                    if lit.is_null() || column_name != col.name {
288                        return Ok(false);
289                    }
290                    let bytes = encode_lit(lit, data_type.clone())?;
291                    inlist.insert(bytes);
292                    return Ok(true);
293                }
294                _ => {}
295            }
296        }
297
298        Ok(false)
299    }
300
301    /// Helper function to get non-null literal value
302    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
303        match expr {
304            Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
305            _ => None,
306        }
307    }
308
309    /// Helper function to get the column and literal value from an equality expr (column = lit)
310    fn eq_expr_col_lit<'b>(
311        left: &'b Expr,
312        right: &'b Expr,
313    ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
314        let (col, lit) = match (left, right) {
315            (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
316            (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
317            _ => return Ok(None),
318        };
319        Ok(Some((col, lit)))
320    }
321}
322
323// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
324/// Helper function to encode a literal into bytes.
325fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
326    let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
327    let mut bytes = vec![];
328    let field = SortField::new(data_type);
329    IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
330        .context(EncodeSnafu)?;
331    Ok(bytes)
332}
333
334#[cfg(test)]
335mod tests {
336    use api::v1::SemanticType;
337    use datafusion_common::Column;
338    use datafusion_expr::{col, lit, Literal};
339    use datatypes::schema::ColumnSchema;
340    use object_store::services::Memory;
341    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
342    use store_api::storage::RegionId;
343
344    use super::*;
345
346    fn test_region_metadata() -> RegionMetadata {
347        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
348        builder
349            .push_column_metadata(ColumnMetadata {
350                column_schema: ColumnSchema::new(
351                    "column1",
352                    ConcreteDataType::string_datatype(),
353                    false,
354                ),
355                semantic_type: SemanticType::Tag,
356                column_id: 1,
357            })
358            .push_column_metadata(ColumnMetadata {
359                column_schema: ColumnSchema::new(
360                    "column2",
361                    ConcreteDataType::int64_datatype(),
362                    false,
363                ),
364                semantic_type: SemanticType::Field,
365                column_id: 2,
366            })
367            .push_column_metadata(ColumnMetadata {
368                column_schema: ColumnSchema::new(
369                    "column3",
370                    ConcreteDataType::timestamp_millisecond_datatype(),
371                    false,
372                ),
373                semantic_type: SemanticType::Timestamp,
374                column_id: 3,
375            })
376            .primary_key(vec![1]);
377        builder.build().unwrap()
378    }
379
380    fn test_object_store() -> ObjectStore {
381        ObjectStore::new(Memory::default()).unwrap().finish()
382    }
383
384    fn column(name: &str) -> Expr {
385        Expr::Column(Column::from_name(name))
386    }
387
388    #[test]
389    fn test_build_with_exprs() {
390        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
391        let metadata = test_region_metadata();
392        let builder = BloomFilterIndexApplierBuilder::new(
393            "test".to_string(),
394            PathType::Bare,
395            test_object_store(),
396            &metadata,
397            factory,
398        );
399        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
400            left: Box::new(column("column1")),
401            op: Operator::Eq,
402            right: Box::new("value1".lit()),
403        })];
404        let result = builder.build(&exprs).unwrap();
405        assert!(result.is_some());
406
407        let predicates = result.unwrap().predicates;
408        assert_eq!(predicates.len(), 1);
409
410        let column_predicates = predicates.get(&1).unwrap();
411        assert_eq!(column_predicates.len(), 1);
412
413        let expected = encode_lit(
414            &ScalarValue::Utf8(Some("value1".to_string())),
415            ConcreteDataType::string_datatype(),
416        )
417        .unwrap();
418        assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
419    }
420
421    fn int64_lit(i: i64) -> Expr {
422        i.lit()
423    }
424
425    #[test]
426    fn test_build_with_in_list() {
427        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
428        let metadata = test_region_metadata();
429        let builder = BloomFilterIndexApplierBuilder::new(
430            "test".to_string(),
431            PathType::Bare,
432            test_object_store(),
433            &metadata,
434            factory,
435        );
436
437        let exprs = vec![Expr::InList(InList {
438            expr: Box::new(column("column2")),
439            list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
440            negated: false,
441        })];
442
443        let result = builder.build(&exprs).unwrap();
444        assert!(result.is_some());
445
446        let predicates = result.unwrap().predicates;
447        let column_predicates = predicates.get(&2).unwrap();
448        assert_eq!(column_predicates.len(), 1);
449        assert_eq!(column_predicates[0].list.len(), 3);
450    }
451
452    #[test]
453    fn test_build_with_or_chain() {
454        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
455        let metadata = test_region_metadata();
456        let builder = || {
457            BloomFilterIndexApplierBuilder::new(
458                "test".to_string(),
459                PathType::Bare,
460                test_object_store(),
461                &metadata,
462                factory.clone(),
463            )
464        };
465
466        let expr = col("column1")
467            .eq(lit("value1"))
468            .or(col("column1")
469                .eq(lit("value2"))
470                .or(col("column1").eq(lit("value4"))))
471            .or(col("column1").eq(lit("value3")));
472
473        let result = builder().build(&[expr]).unwrap();
474        assert!(result.is_some());
475
476        let predicates = result.unwrap().predicates;
477        let column_predicates = predicates.get(&1).unwrap();
478        assert_eq!(column_predicates.len(), 1);
479        assert_eq!(column_predicates[0].list.len(), 4);
480        let or_chain_predicates = &column_predicates[0].list;
481        let encode_str = |s: &str| {
482            encode_lit(
483                &ScalarValue::Utf8(Some(s.to_string())),
484                ConcreteDataType::string_datatype(),
485            )
486            .unwrap()
487        };
488        assert!(or_chain_predicates.contains(&encode_str("value1")));
489        assert!(or_chain_predicates.contains(&encode_str("value2")));
490        assert!(or_chain_predicates.contains(&encode_str("value3")));
491        assert!(or_chain_predicates.contains(&encode_str("value4")));
492
493        // Test with null value
494        let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None), None));
495        let result = builder().build(&[expr]).unwrap();
496        assert!(result.is_none());
497
498        // Test with different column
499        let expr = col("column1")
500            .eq(lit("value1"))
501            .or(col("column2").eq(lit("value2")));
502        let result = builder().build(&[expr]).unwrap();
503        assert!(result.is_none());
504
505        // Test with non or chain
506        let expr = col("column1")
507            .eq(lit("value1"))
508            .or(col("column1").gt_eq(lit("value2")));
509        let result = builder().build(&[expr]).unwrap();
510        assert!(result.is_none());
511    }
512
513    #[test]
514    fn test_build_with_and_expressions() {
515        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
516        let metadata = test_region_metadata();
517        let builder = BloomFilterIndexApplierBuilder::new(
518            "test".to_string(),
519            PathType::Bare,
520            test_object_store(),
521            &metadata,
522            factory,
523        );
524        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
525            left: Box::new(Expr::BinaryExpr(BinaryExpr {
526                left: Box::new(column("column1")),
527                op: Operator::Eq,
528                right: Box::new("value1".lit()),
529            })),
530            op: Operator::And,
531            right: Box::new(Expr::BinaryExpr(BinaryExpr {
532                left: Box::new(column("column2")),
533                op: Operator::Eq,
534                right: Box::new(int64_lit(42)),
535            })),
536        })];
537        let result = builder.build(&exprs).unwrap();
538        assert!(result.is_some());
539
540        let predicates = result.unwrap().predicates;
541        assert_eq!(predicates.len(), 2);
542        assert!(predicates.contains_key(&1));
543        assert!(predicates.contains_key(&2));
544    }
545
546    #[test]
547    fn test_build_with_null_values() {
548        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
549        let metadata = test_region_metadata();
550        let builder = BloomFilterIndexApplierBuilder::new(
551            "test".to_string(),
552            PathType::Bare,
553            test_object_store(),
554            &metadata,
555            factory,
556        );
557
558        let exprs = vec![
559            Expr::BinaryExpr(BinaryExpr {
560                left: Box::new(column("column1")),
561                op: Operator::Eq,
562                right: Box::new(Expr::Literal(ScalarValue::Utf8(None), None)),
563            }),
564            Expr::InList(InList {
565                expr: Box::new(column("column2")),
566                list: vec![
567                    int64_lit(1),
568                    Expr::Literal(ScalarValue::Int64(None), None),
569                    int64_lit(3),
570                ],
571                negated: false,
572            }),
573        ];
574
575        let result = builder.build(&exprs).unwrap();
576        assert!(result.is_some());
577
578        let predicates = result.unwrap().predicates;
579        assert!(!predicates.contains_key(&1)); // Null equality should be ignored
580        let column2_predicates = predicates.get(&2).unwrap();
581        assert_eq!(column2_predicates[0].list.len(), 2);
582    }
583
584    #[test]
585    fn test_build_with_invalid_expressions() {
586        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
587        let metadata = test_region_metadata();
588        let builder = BloomFilterIndexApplierBuilder::new(
589            "test".to_string(),
590            PathType::Bare,
591            test_object_store(),
592            &metadata,
593            factory,
594        );
595        let exprs = vec![
596            // Non-equality operator
597            Expr::BinaryExpr(BinaryExpr {
598                left: Box::new(column("column1")),
599                op: Operator::Gt,
600                right: Box::new("value1".lit()),
601            }),
602            // Non-existent column
603            Expr::BinaryExpr(BinaryExpr {
604                left: Box::new(column("non_existent")),
605                op: Operator::Eq,
606                right: Box::new("value".lit()),
607            }),
608            // Negated IN list
609            Expr::InList(InList {
610                expr: Box::new(column("column2")),
611                list: vec![int64_lit(1), int64_lit(2)],
612                negated: true,
613            }),
614        ];
615
616        let result = builder.build(&exprs).unwrap();
617        assert!(result.is_none());
618    }
619
620    #[test]
621    fn test_build_with_multiple_predicates_same_column() {
622        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
623        let metadata = test_region_metadata();
624        let builder = BloomFilterIndexApplierBuilder::new(
625            "test".to_string(),
626            PathType::Bare,
627            test_object_store(),
628            &metadata,
629            factory,
630        );
631        let exprs = vec![
632            Expr::BinaryExpr(BinaryExpr {
633                left: Box::new(column("column1")),
634                op: Operator::Eq,
635                right: Box::new("value1".lit()),
636            }),
637            Expr::InList(InList {
638                expr: Box::new(column("column1")),
639                list: vec!["value2".lit(), "value3".lit()],
640                negated: false,
641            }),
642        ];
643
644        let result = builder.build(&exprs).unwrap();
645        assert!(result.is_some());
646
647        let predicates = result.unwrap().predicates;
648        let column_predicates = predicates.get(&1).unwrap();
649        assert_eq!(column_predicates.len(), 2);
650    }
651}