Skip to main content

mito2/sst/index/bloom_filter/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::Bytes;
24use index::bloom_filter::applier::InListPredicate;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41    table_dir: String,
42    path_type: PathType,
43    object_store: ObjectStore,
44    metadata: &'a RegionMetadata,
45    puffin_manager_factory: PuffinManagerFactory,
46    file_cache: Option<FileCacheRef>,
47    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49    predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53    pub fn new(
54        table_dir: String,
55        path_type: PathType,
56        object_store: ObjectStore,
57        metadata: &'a RegionMetadata,
58        puffin_manager_factory: PuffinManagerFactory,
59    ) -> Self {
60        Self {
61            table_dir,
62            path_type,
63            object_store,
64            metadata,
65            puffin_manager_factory,
66            file_cache: None,
67            puffin_metadata_cache: None,
68            bloom_filter_index_cache: None,
69            predicates: BTreeMap::default(),
70        }
71    }
72
73    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74        self.file_cache = file_cache;
75        self
76    }
77
78    pub fn with_puffin_metadata_cache(
79        mut self,
80        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81    ) -> Self {
82        self.puffin_metadata_cache = puffin_metadata_cache;
83        self
84    }
85
86    pub fn with_bloom_filter_index_cache(
87        mut self,
88        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89    ) -> Self {
90        self.bloom_filter_index_cache = bloom_filter_index_cache;
91        self
92    }
93
94    /// Builds the applier with given filter expressions
95    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96        for expr in exprs {
97            self.traverse_and_collect(expr);
98        }
99
100        if self.predicates.is_empty() {
101            return Ok(None);
102        }
103
104        let expected_predicate_column_types = self.expected_predicate_column_types();
105        let applier = BloomFilterIndexApplier::new(
106            self.table_dir,
107            self.path_type,
108            self.object_store,
109            self.puffin_manager_factory,
110            self.predicates,
111            expected_predicate_column_types,
112        )
113        .with_file_cache(self.file_cache)
114        .with_puffin_metadata_cache(self.puffin_metadata_cache)
115        .with_bloom_filter_cache(self.bloom_filter_index_cache);
116
117        Ok(Some(applier))
118    }
119
120    /// Recursively traverses expressions to collect bloom filter predicates
121    fn traverse_and_collect(&mut self, expr: &Expr) {
122        let res = match expr {
123            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
124                Operator::And => {
125                    self.traverse_and_collect(left);
126                    self.traverse_and_collect(right);
127                    Ok(())
128                }
129                Operator::Eq => self.collect_eq(left, right),
130                Operator::Or => self.collect_or_eq_list(left, right),
131                _ => Ok(()),
132            },
133            Expr::InList(in_list) => self.collect_in_list(in_list),
134            _ => Ok(()),
135        };
136
137        if let Err(err) = res {
138            warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
139        }
140    }
141
142    /// Returns `(column_id, data_type)` pairs for predicate columns.
143    fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
144        self.predicates
145            .keys()
146            .filter_map(|col_id| {
147                let col = self.metadata.column_by_id(*col_id)?;
148                Some((*col_id, col.column_schema.data_type.clone()))
149            })
150            .collect()
151    }
152
153    /// Helper function to get the column id and type
154    fn column_id_and_type(
155        &self,
156        column_name: &str,
157    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
158        let column = self
159            .metadata
160            .column_by_name(column_name)
161            .context(ColumnNotFoundSnafu {
162                column: column_name,
163            })?;
164
165        Ok(Some((
166            column.column_id,
167            column.column_schema.data_type.clone(),
168        )))
169    }
170
171    /// Collects an equality expression (column = value)
172    fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
173        let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
174            return Ok(());
175        };
176        if lit.is_null() {
177            return Ok(());
178        }
179        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
180            return Ok(());
181        };
182        let value = encode_lit(lit, data_type)?;
183        self.predicates
184            .entry(column_id)
185            .or_default()
186            .push(InListPredicate {
187                list: BTreeSet::from([value]),
188            });
189
190        Ok(())
191    }
192
193    /// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
194    fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
195        // Only collect InList predicates if they reference a column
196        let Expr::Column(column) = &in_list.expr.as_ref() else {
197            return Ok(());
198        };
199        if in_list.list.is_empty() || in_list.negated {
200            return Ok(());
201        }
202
203        let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
204            return Ok(());
205        };
206
207        // Convert all non-null literals to predicates
208        let predicates = in_list
209            .list
210            .iter()
211            .filter_map(Self::nonnull_lit)
212            .map(|lit| encode_lit(lit, data_type.clone()));
213
214        // Collect successful conversions
215        let mut valid_predicates = BTreeSet::new();
216        for predicate in predicates {
217            match predicate {
218                Ok(p) => {
219                    valid_predicates.insert(p);
220                }
221                Err(e) => warn!(e; "Failed to convert value in InList"),
222            }
223        }
224
225        if !valid_predicates.is_empty() {
226            self.predicates
227                .entry(column_id)
228                .or_default()
229                .push(InListPredicate {
230                    list: valid_predicates,
231                });
232        }
233
234        Ok(())
235    }
236
237    /// Collects an or expression in the form of `column = lit OR column = lit OR ...`.
238    fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
239        let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
240            left: l,
241            op: Operator::Eq,
242            right: r,
243        }) = left
244        {
245            (l, r, right)
246        } else if let Expr::BinaryExpr(BinaryExpr {
247            left: l,
248            op: Operator::Eq,
249            right: r,
250        }) = right
251        {
252            (l, r, left)
253        } else {
254            return Ok(());
255        };
256
257        let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
258            return Ok(());
259        };
260        if lit.is_null() {
261            return Ok(());
262        }
263        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
264            return Ok(());
265        };
266
267        let mut inlist = BTreeSet::new();
268        inlist.insert(encode_lit(lit, data_type.clone())?);
269        if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
270            self.predicates
271                .entry(column_id)
272                .or_default()
273                .push(InListPredicate { list: inlist });
274        }
275
276        Ok(())
277    }
278
279    fn collect_or_eq_list_rec(
280        column_name: &str,
281        data_type: &ConcreteDataType,
282        expr: &Expr,
283        inlist: &mut BTreeSet<Bytes>,
284    ) -> Result<bool> {
285        if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
286            match op {
287                Operator::Or => {
288                    let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
289                        .then(|| {
290                            Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
291                        })
292                        .transpose()?
293                        .unwrap_or(false);
294                    return Ok(r);
295                }
296                Operator::Eq => {
297                    let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
298                        return Ok(false);
299                    };
300                    if lit.is_null() || column_name != col.name {
301                        return Ok(false);
302                    }
303                    let bytes = encode_lit(lit, data_type.clone())?;
304                    inlist.insert(bytes);
305                    return Ok(true);
306                }
307                _ => {}
308            }
309        }
310
311        Ok(false)
312    }
313
314    /// Helper function to get non-null literal value
315    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
316        match expr {
317            Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
318            _ => None,
319        }
320    }
321
322    /// Helper function to get the column and literal value from an equality expr (column = lit)
323    fn eq_expr_col_lit<'b>(
324        left: &'b Expr,
325        right: &'b Expr,
326    ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
327        let (col, lit) = match (left, right) {
328            (Expr::Column(col), Expr::Literal(lit, _)) => (col, lit),
329            (Expr::Literal(lit, _), Expr::Column(col)) => (col, lit),
330            _ => return Ok(None),
331        };
332        Ok(Some((col, lit)))
333    }
334}
335
336// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
337/// Helper function to encode a literal into bytes.
338fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
339    let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
340    let mut bytes = vec![];
341    let field = SortField::new(data_type);
342    IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
343        .context(EncodeSnafu)?;
344    Ok(bytes)
345}
346
347#[cfg(test)]
348mod tests {
349    use api::v1::SemanticType;
350    use datafusion_common::Column;
351    use datafusion_expr::{Literal, col, lit};
352    use datatypes::schema::ColumnSchema;
353    use object_store::services::Memory;
354    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
355    use store_api::storage::RegionId;
356
357    use super::*;
358
359    fn test_region_metadata() -> RegionMetadata {
360        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
361        builder
362            .push_column_metadata(ColumnMetadata {
363                column_schema: ColumnSchema::new(
364                    "column1",
365                    ConcreteDataType::string_datatype(),
366                    false,
367                ),
368                semantic_type: SemanticType::Tag,
369                column_id: 1,
370            })
371            .push_column_metadata(ColumnMetadata {
372                column_schema: ColumnSchema::new(
373                    "column2",
374                    ConcreteDataType::int64_datatype(),
375                    false,
376                ),
377                semantic_type: SemanticType::Field,
378                column_id: 2,
379            })
380            .push_column_metadata(ColumnMetadata {
381                column_schema: ColumnSchema::new(
382                    "column3",
383                    ConcreteDataType::timestamp_millisecond_datatype(),
384                    false,
385                ),
386                semantic_type: SemanticType::Timestamp,
387                column_id: 3,
388            })
389            .primary_key(vec![1]);
390        builder.build().unwrap()
391    }
392
393    fn test_object_store() -> ObjectStore {
394        ObjectStore::new(Memory::default()).unwrap().finish()
395    }
396
397    fn column(name: &str) -> Expr {
398        Expr::Column(Column::from_name(name))
399    }
400
401    #[test]
402    fn test_build_with_exprs() {
403        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
404        let metadata = test_region_metadata();
405        let builder = BloomFilterIndexApplierBuilder::new(
406            "test".to_string(),
407            PathType::Bare,
408            test_object_store(),
409            &metadata,
410            factory,
411        );
412        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
413            left: Box::new(column("column1")),
414            op: Operator::Eq,
415            right: Box::new("value1".lit()),
416        })];
417        let result = builder.build(&exprs).unwrap();
418        assert!(result.is_some());
419
420        let predicates = result.unwrap().default_predicates;
421        assert_eq!(predicates.len(), 1);
422
423        let column_predicates = predicates.get(&1).unwrap();
424        assert_eq!(column_predicates.len(), 1);
425
426        let expected = encode_lit(
427            &ScalarValue::Utf8(Some("value1".to_string())),
428            ConcreteDataType::string_datatype(),
429        )
430        .unwrap();
431        assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
432    }
433
434    fn int64_lit(i: i64) -> Expr {
435        i.lit()
436    }
437
438    #[test]
439    fn test_build_with_in_list() {
440        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
441        let metadata = test_region_metadata();
442        let builder = BloomFilterIndexApplierBuilder::new(
443            "test".to_string(),
444            PathType::Bare,
445            test_object_store(),
446            &metadata,
447            factory,
448        );
449
450        let exprs = vec![Expr::InList(InList {
451            expr: Box::new(column("column2")),
452            list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
453            negated: false,
454        })];
455
456        let result = builder.build(&exprs).unwrap();
457        assert!(result.is_some());
458
459        let predicates = result.unwrap().default_predicates;
460        let column_predicates = predicates.get(&2).unwrap();
461        assert_eq!(column_predicates.len(), 1);
462        assert_eq!(column_predicates[0].list.len(), 3);
463    }
464
465    #[test]
466    fn test_build_with_or_chain() {
467        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
468        let metadata = test_region_metadata();
469        let builder = || {
470            BloomFilterIndexApplierBuilder::new(
471                "test".to_string(),
472                PathType::Bare,
473                test_object_store(),
474                &metadata,
475                factory.clone(),
476            )
477        };
478
479        let expr = col("column1")
480            .eq(lit("value1"))
481            .or(col("column1")
482                .eq(lit("value2"))
483                .or(col("column1").eq(lit("value4"))))
484            .or(col("column1").eq(lit("value3")));
485
486        let result = builder().build(&[expr]).unwrap();
487        assert!(result.is_some());
488
489        let predicates = result.unwrap().default_predicates;
490        let column_predicates = predicates.get(&1).unwrap();
491        assert_eq!(column_predicates.len(), 1);
492        assert_eq!(column_predicates[0].list.len(), 4);
493        let or_chain_predicates = &column_predicates[0].list;
494        let encode_str = |s: &str| {
495            encode_lit(
496                &ScalarValue::Utf8(Some(s.to_string())),
497                ConcreteDataType::string_datatype(),
498            )
499            .unwrap()
500        };
501        assert!(or_chain_predicates.contains(&encode_str("value1")));
502        assert!(or_chain_predicates.contains(&encode_str("value2")));
503        assert!(or_chain_predicates.contains(&encode_str("value3")));
504        assert!(or_chain_predicates.contains(&encode_str("value4")));
505
506        // Test with null value
507        let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None), None));
508        let result = builder().build(&[expr]).unwrap();
509        assert!(result.is_none());
510
511        // Test with different column
512        let expr = col("column1")
513            .eq(lit("value1"))
514            .or(col("column2").eq(lit("value2")));
515        let result = builder().build(&[expr]).unwrap();
516        assert!(result.is_none());
517
518        // Test with non or chain
519        let expr = col("column1")
520            .eq(lit("value1"))
521            .or(col("column1").gt_eq(lit("value2")));
522        let result = builder().build(&[expr]).unwrap();
523        assert!(result.is_none());
524    }
525
526    #[test]
527    fn test_build_with_and_expressions() {
528        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
529        let metadata = test_region_metadata();
530        let builder = BloomFilterIndexApplierBuilder::new(
531            "test".to_string(),
532            PathType::Bare,
533            test_object_store(),
534            &metadata,
535            factory,
536        );
537        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
538            left: Box::new(Expr::BinaryExpr(BinaryExpr {
539                left: Box::new(column("column1")),
540                op: Operator::Eq,
541                right: Box::new("value1".lit()),
542            })),
543            op: Operator::And,
544            right: Box::new(Expr::BinaryExpr(BinaryExpr {
545                left: Box::new(column("column2")),
546                op: Operator::Eq,
547                right: Box::new(int64_lit(42)),
548            })),
549        })];
550        let result = builder.build(&exprs).unwrap();
551        assert!(result.is_some());
552
553        let predicates = result.unwrap().default_predicates;
554        assert_eq!(predicates.len(), 2);
555        assert!(predicates.contains_key(&1));
556        assert!(predicates.contains_key(&2));
557    }
558
559    #[test]
560    fn test_build_with_null_values() {
561        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
562        let metadata = test_region_metadata();
563        let builder = BloomFilterIndexApplierBuilder::new(
564            "test".to_string(),
565            PathType::Bare,
566            test_object_store(),
567            &metadata,
568            factory,
569        );
570
571        let exprs = vec![
572            Expr::BinaryExpr(BinaryExpr {
573                left: Box::new(column("column1")),
574                op: Operator::Eq,
575                right: Box::new(Expr::Literal(ScalarValue::Utf8(None), None)),
576            }),
577            Expr::InList(InList {
578                expr: Box::new(column("column2")),
579                list: vec![
580                    int64_lit(1),
581                    Expr::Literal(ScalarValue::Int64(None), None),
582                    int64_lit(3),
583                ],
584                negated: false,
585            }),
586        ];
587
588        let result = builder.build(&exprs).unwrap();
589        assert!(result.is_some());
590
591        let predicates = result.unwrap().default_predicates;
592        assert!(!predicates.contains_key(&1)); // Null equality should be ignored
593        let column2_predicates = predicates.get(&2).unwrap();
594        assert_eq!(column2_predicates[0].list.len(), 2);
595    }
596
597    #[test]
598    fn test_build_with_invalid_expressions() {
599        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
600        let metadata = test_region_metadata();
601        let builder = BloomFilterIndexApplierBuilder::new(
602            "test".to_string(),
603            PathType::Bare,
604            test_object_store(),
605            &metadata,
606            factory,
607        );
608        let exprs = vec![
609            // Non-equality operator
610            Expr::BinaryExpr(BinaryExpr {
611                left: Box::new(column("column1")),
612                op: Operator::Gt,
613                right: Box::new("value1".lit()),
614            }),
615            // Non-existent column
616            Expr::BinaryExpr(BinaryExpr {
617                left: Box::new(column("non_existent")),
618                op: Operator::Eq,
619                right: Box::new("value".lit()),
620            }),
621            // Negated IN list
622            Expr::InList(InList {
623                expr: Box::new(column("column2")),
624                list: vec![int64_lit(1), int64_lit(2)],
625                negated: true,
626            }),
627        ];
628
629        let result = builder.build(&exprs).unwrap();
630        assert!(result.is_none());
631    }
632
633    #[test]
634    fn test_build_with_multiple_predicates_same_column() {
635        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
636        let metadata = test_region_metadata();
637        let builder = BloomFilterIndexApplierBuilder::new(
638            "test".to_string(),
639            PathType::Bare,
640            test_object_store(),
641            &metadata,
642            factory,
643        );
644        let exprs = vec![
645            Expr::BinaryExpr(BinaryExpr {
646                left: Box::new(column("column1")),
647                op: Operator::Eq,
648                right: Box::new("value1".lit()),
649            }),
650            Expr::InList(InList {
651                expr: Box::new(column("column1")),
652                list: vec!["value2".lit(), "value3".lit()],
653                negated: false,
654            }),
655        ];
656
657        let result = builder.build(&exprs).unwrap();
658        assert!(result.is_some());
659
660        let predicates = result.unwrap().default_predicates;
661        let column_predicates = predicates.get(&1).unwrap();
662        assert_eq!(column_predicates.len(), 2);
663    }
664}