mito2/sst/index/bloom_filter/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::region_request::PathType;
32use store_api::storage::ColumnId;
33
34use crate::cache::file_cache::FileCacheRef;
35use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
36use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
37use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
38use crate::sst::index::puffin_manager::PuffinManagerFactory;
39
40pub struct BloomFilterIndexApplierBuilder<'a> {
41    table_dir: String,
42    path_type: PathType,
43    object_store: ObjectStore,
44    metadata: &'a RegionMetadata,
45    puffin_manager_factory: PuffinManagerFactory,
46    file_cache: Option<FileCacheRef>,
47    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
48    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
49    predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
50}
51
52impl<'a> BloomFilterIndexApplierBuilder<'a> {
53    pub fn new(
54        table_dir: String,
55        path_type: PathType,
56        object_store: ObjectStore,
57        metadata: &'a RegionMetadata,
58        puffin_manager_factory: PuffinManagerFactory,
59    ) -> Self {
60        Self {
61            table_dir,
62            path_type,
63            object_store,
64            metadata,
65            puffin_manager_factory,
66            file_cache: None,
67            puffin_metadata_cache: None,
68            bloom_filter_index_cache: None,
69            predicates: BTreeMap::default(),
70        }
71    }
72
73    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
74        self.file_cache = file_cache;
75        self
76    }
77
78    pub fn with_puffin_metadata_cache(
79        mut self,
80        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
81    ) -> Self {
82        self.puffin_metadata_cache = puffin_metadata_cache;
83        self
84    }
85
86    pub fn with_bloom_filter_index_cache(
87        mut self,
88        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
89    ) -> Self {
90        self.bloom_filter_index_cache = bloom_filter_index_cache;
91        self
92    }
93
94    /// Builds the applier with given filter expressions
95    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
96        for expr in exprs {
97            self.traverse_and_collect(expr);
98        }
99
100        if self.predicates.is_empty() {
101            return Ok(None);
102        }
103
104        let applier = BloomFilterIndexApplier::new(
105            self.table_dir,
106            self.path_type,
107            self.object_store,
108            self.puffin_manager_factory,
109            self.predicates,
110        )
111        .with_file_cache(self.file_cache)
112        .with_puffin_metadata_cache(self.puffin_metadata_cache)
113        .with_bloom_filter_cache(self.bloom_filter_index_cache);
114
115        Ok(Some(applier))
116    }
117
118    /// Recursively traverses expressions to collect bloom filter predicates
119    fn traverse_and_collect(&mut self, expr: &Expr) {
120        let res = match expr {
121            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
122                Operator::And => {
123                    self.traverse_and_collect(left);
124                    self.traverse_and_collect(right);
125                    Ok(())
126                }
127                Operator::Eq => self.collect_eq(left, right),
128                Operator::Or => self.collect_or_eq_list(left, right),
129                _ => Ok(()),
130            },
131            Expr::InList(in_list) => self.collect_in_list(in_list),
132            _ => Ok(()),
133        };
134
135        if let Err(err) = res {
136            warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
137        }
138    }
139
140    /// Helper function to get the column id and type
141    fn column_id_and_type(
142        &self,
143        column_name: &str,
144    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
145        let column = self
146            .metadata
147            .column_by_name(column_name)
148            .context(ColumnNotFoundSnafu {
149                column: column_name,
150            })?;
151
152        Ok(Some((
153            column.column_id,
154            column.column_schema.data_type.clone(),
155        )))
156    }
157
158    /// Collects an equality expression (column = value)
159    fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
160        let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
161            return Ok(());
162        };
163        if lit.is_null() {
164            return Ok(());
165        }
166        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
167            return Ok(());
168        };
169        let value = encode_lit(lit, data_type)?;
170        self.predicates
171            .entry(column_id)
172            .or_default()
173            .push(InListPredicate {
174                list: BTreeSet::from([value]),
175            });
176
177        Ok(())
178    }
179
180    /// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
181    fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
182        // Only collect InList predicates if they reference a column
183        let Expr::Column(column) = &in_list.expr.as_ref() else {
184            return Ok(());
185        };
186        if in_list.list.is_empty() || in_list.negated {
187            return Ok(());
188        }
189
190        let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
191            return Ok(());
192        };
193
194        // Convert all non-null literals to predicates
195        let predicates = in_list
196            .list
197            .iter()
198            .filter_map(Self::nonnull_lit)
199            .map(|lit| encode_lit(lit, data_type.clone()));
200
201        // Collect successful conversions
202        let mut valid_predicates = BTreeSet::new();
203        for predicate in predicates {
204            match predicate {
205                Ok(p) => {
206                    valid_predicates.insert(p);
207                }
208                Err(e) => warn!(e; "Failed to convert value in InList"),
209            }
210        }
211
212        if !valid_predicates.is_empty() {
213            self.predicates
214                .entry(column_id)
215                .or_default()
216                .push(InListPredicate {
217                    list: valid_predicates,
218                });
219        }
220
221        Ok(())
222    }
223
224    /// Collects an or expression in the form of `column = lit OR column = lit OR ...`.
225    fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
226        let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
227            left: l,
228            op: Operator::Eq,
229            right: r,
230        }) = left
231        {
232            (l, r, right)
233        } else if let Expr::BinaryExpr(BinaryExpr {
234            left: l,
235            op: Operator::Eq,
236            right: r,
237        }) = right
238        {
239            (l, r, left)
240        } else {
241            return Ok(());
242        };
243
244        let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
245            return Ok(());
246        };
247        if lit.is_null() {
248            return Ok(());
249        }
250        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
251            return Ok(());
252        };
253
254        let mut inlist = BTreeSet::new();
255        inlist.insert(encode_lit(lit, data_type.clone())?);
256        if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
257            self.predicates
258                .entry(column_id)
259                .or_default()
260                .push(InListPredicate { list: inlist });
261        }
262
263        Ok(())
264    }
265
266    fn collect_or_eq_list_rec(
267        column_name: &str,
268        data_type: &ConcreteDataType,
269        expr: &Expr,
270        inlist: &mut BTreeSet<Bytes>,
271    ) -> Result<bool> {
272        if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
273            match op {
274                Operator::Or => {
275                    let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
276                        .then(|| {
277                            Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
278                        })
279                        .transpose()?
280                        .unwrap_or(false);
281                    return Ok(r);
282                }
283                Operator::Eq => {
284                    let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
285                        return Ok(false);
286                    };
287                    if lit.is_null() || column_name != col.name {
288                        return Ok(false);
289                    }
290                    let bytes = encode_lit(lit, data_type.clone())?;
291                    inlist.insert(bytes);
292                    return Ok(true);
293                }
294                _ => {}
295            }
296        }
297
298        Ok(false)
299    }
300
301    /// Helper function to get non-null literal value
302    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
303        match expr {
304            Expr::Literal(lit) if !lit.is_null() => Some(lit),
305            _ => None,
306        }
307    }
308
309    /// Helper function to get the column and literal value from an equality expr (column = lit)
310    fn eq_expr_col_lit<'b>(
311        left: &'b Expr,
312        right: &'b Expr,
313    ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
314        let (col, lit) = match (left, right) {
315            (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
316            (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
317            _ => return Ok(None),
318        };
319        Ok(Some((col, lit)))
320    }
321}
322
323// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
324/// Helper function to encode a literal into bytes.
325fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
326    let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
327    let mut bytes = vec![];
328    let field = SortField::new(data_type);
329    IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
330        .context(EncodeSnafu)?;
331    Ok(bytes)
332}
333
334#[cfg(test)]
335mod tests {
336    use api::v1::SemanticType;
337    use datafusion_common::Column;
338    use datafusion_expr::{col, lit};
339    use datatypes::schema::ColumnSchema;
340    use object_store::services::Memory;
341    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
342    use store_api::storage::RegionId;
343
344    use super::*;
345
346    fn test_region_metadata() -> RegionMetadata {
347        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
348        builder
349            .push_column_metadata(ColumnMetadata {
350                column_schema: ColumnSchema::new(
351                    "column1",
352                    ConcreteDataType::string_datatype(),
353                    false,
354                ),
355                semantic_type: SemanticType::Tag,
356                column_id: 1,
357            })
358            .push_column_metadata(ColumnMetadata {
359                column_schema: ColumnSchema::new(
360                    "column2",
361                    ConcreteDataType::int64_datatype(),
362                    false,
363                ),
364                semantic_type: SemanticType::Field,
365                column_id: 2,
366            })
367            .push_column_metadata(ColumnMetadata {
368                column_schema: ColumnSchema::new(
369                    "column3",
370                    ConcreteDataType::timestamp_millisecond_datatype(),
371                    false,
372                ),
373                semantic_type: SemanticType::Timestamp,
374                column_id: 3,
375            })
376            .primary_key(vec![1]);
377        builder.build().unwrap()
378    }
379
380    fn test_object_store() -> ObjectStore {
381        ObjectStore::new(Memory::default()).unwrap().finish()
382    }
383
384    fn column(name: &str) -> Expr {
385        Expr::Column(Column::from_name(name))
386    }
387
388    fn string_lit(s: impl Into<String>) -> Expr {
389        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
390    }
391
392    #[test]
393    fn test_build_with_exprs() {
394        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
395        let metadata = test_region_metadata();
396        let builder = BloomFilterIndexApplierBuilder::new(
397            "test".to_string(),
398            PathType::Bare,
399            test_object_store(),
400            &metadata,
401            factory,
402        );
403        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
404            left: Box::new(column("column1")),
405            op: Operator::Eq,
406            right: Box::new(string_lit("value1")),
407        })];
408        let result = builder.build(&exprs).unwrap();
409        assert!(result.is_some());
410
411        let predicates = result.unwrap().predicates;
412        assert_eq!(predicates.len(), 1);
413
414        let column_predicates = predicates.get(&1).unwrap();
415        assert_eq!(column_predicates.len(), 1);
416
417        let expected = encode_lit(
418            &ScalarValue::Utf8(Some("value1".to_string())),
419            ConcreteDataType::string_datatype(),
420        )
421        .unwrap();
422        assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
423    }
424
425    fn int64_lit(i: i64) -> Expr {
426        Expr::Literal(ScalarValue::Int64(Some(i)))
427    }
428
429    #[test]
430    fn test_build_with_in_list() {
431        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
432        let metadata = test_region_metadata();
433        let builder = BloomFilterIndexApplierBuilder::new(
434            "test".to_string(),
435            PathType::Bare,
436            test_object_store(),
437            &metadata,
438            factory,
439        );
440
441        let exprs = vec![Expr::InList(InList {
442            expr: Box::new(column("column2")),
443            list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
444            negated: false,
445        })];
446
447        let result = builder.build(&exprs).unwrap();
448        assert!(result.is_some());
449
450        let predicates = result.unwrap().predicates;
451        let column_predicates = predicates.get(&2).unwrap();
452        assert_eq!(column_predicates.len(), 1);
453        assert_eq!(column_predicates[0].list.len(), 3);
454    }
455
456    #[test]
457    fn test_build_with_or_chain() {
458        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
459        let metadata = test_region_metadata();
460        let builder = || {
461            BloomFilterIndexApplierBuilder::new(
462                "test".to_string(),
463                PathType::Bare,
464                test_object_store(),
465                &metadata,
466                factory.clone(),
467            )
468        };
469
470        let expr = col("column1")
471            .eq(lit("value1"))
472            .or(col("column1")
473                .eq(lit("value2"))
474                .or(col("column1").eq(lit("value4"))))
475            .or(col("column1").eq(lit("value3")));
476
477        let result = builder().build(&[expr]).unwrap();
478        assert!(result.is_some());
479
480        let predicates = result.unwrap().predicates;
481        let column_predicates = predicates.get(&1).unwrap();
482        assert_eq!(column_predicates.len(), 1);
483        assert_eq!(column_predicates[0].list.len(), 4);
484        let or_chain_predicates = &column_predicates[0].list;
485        let encode_str = |s: &str| {
486            encode_lit(
487                &ScalarValue::Utf8(Some(s.to_string())),
488                ConcreteDataType::string_datatype(),
489            )
490            .unwrap()
491        };
492        assert!(or_chain_predicates.contains(&encode_str("value1")));
493        assert!(or_chain_predicates.contains(&encode_str("value2")));
494        assert!(or_chain_predicates.contains(&encode_str("value3")));
495        assert!(or_chain_predicates.contains(&encode_str("value4")));
496
497        // Test with null value
498        let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None)));
499        let result = builder().build(&[expr]).unwrap();
500        assert!(result.is_none());
501
502        // Test with different column
503        let expr = col("column1")
504            .eq(lit("value1"))
505            .or(col("column2").eq(lit("value2")));
506        let result = builder().build(&[expr]).unwrap();
507        assert!(result.is_none());
508
509        // Test with non or chain
510        let expr = col("column1")
511            .eq(lit("value1"))
512            .or(col("column1").gt_eq(lit("value2")));
513        let result = builder().build(&[expr]).unwrap();
514        assert!(result.is_none());
515    }
516
517    #[test]
518    fn test_build_with_and_expressions() {
519        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
520        let metadata = test_region_metadata();
521        let builder = BloomFilterIndexApplierBuilder::new(
522            "test".to_string(),
523            PathType::Bare,
524            test_object_store(),
525            &metadata,
526            factory,
527        );
528        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
529            left: Box::new(Expr::BinaryExpr(BinaryExpr {
530                left: Box::new(column("column1")),
531                op: Operator::Eq,
532                right: Box::new(string_lit("value1")),
533            })),
534            op: Operator::And,
535            right: Box::new(Expr::BinaryExpr(BinaryExpr {
536                left: Box::new(column("column2")),
537                op: Operator::Eq,
538                right: Box::new(int64_lit(42)),
539            })),
540        })];
541        let result = builder.build(&exprs).unwrap();
542        assert!(result.is_some());
543
544        let predicates = result.unwrap().predicates;
545        assert_eq!(predicates.len(), 2);
546        assert!(predicates.contains_key(&1));
547        assert!(predicates.contains_key(&2));
548    }
549
550    #[test]
551    fn test_build_with_null_values() {
552        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
553        let metadata = test_region_metadata();
554        let builder = BloomFilterIndexApplierBuilder::new(
555            "test".to_string(),
556            PathType::Bare,
557            test_object_store(),
558            &metadata,
559            factory,
560        );
561
562        let exprs = vec![
563            Expr::BinaryExpr(BinaryExpr {
564                left: Box::new(column("column1")),
565                op: Operator::Eq,
566                right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
567            }),
568            Expr::InList(InList {
569                expr: Box::new(column("column2")),
570                list: vec![
571                    int64_lit(1),
572                    Expr::Literal(ScalarValue::Int64(None)),
573                    int64_lit(3),
574                ],
575                negated: false,
576            }),
577        ];
578
579        let result = builder.build(&exprs).unwrap();
580        assert!(result.is_some());
581
582        let predicates = result.unwrap().predicates;
583        assert!(!predicates.contains_key(&1)); // Null equality should be ignored
584        let column2_predicates = predicates.get(&2).unwrap();
585        assert_eq!(column2_predicates[0].list.len(), 2);
586    }
587
588    #[test]
589    fn test_build_with_invalid_expressions() {
590        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
591        let metadata = test_region_metadata();
592        let builder = BloomFilterIndexApplierBuilder::new(
593            "test".to_string(),
594            PathType::Bare,
595            test_object_store(),
596            &metadata,
597            factory,
598        );
599        let exprs = vec![
600            // Non-equality operator
601            Expr::BinaryExpr(BinaryExpr {
602                left: Box::new(column("column1")),
603                op: Operator::Gt,
604                right: Box::new(string_lit("value1")),
605            }),
606            // Non-existent column
607            Expr::BinaryExpr(BinaryExpr {
608                left: Box::new(column("non_existent")),
609                op: Operator::Eq,
610                right: Box::new(string_lit("value")),
611            }),
612            // Negated IN list
613            Expr::InList(InList {
614                expr: Box::new(column("column2")),
615                list: vec![int64_lit(1), int64_lit(2)],
616                negated: true,
617            }),
618        ];
619
620        let result = builder.build(&exprs).unwrap();
621        assert!(result.is_none());
622    }
623
624    #[test]
625    fn test_build_with_multiple_predicates_same_column() {
626        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
627        let metadata = test_region_metadata();
628        let builder = BloomFilterIndexApplierBuilder::new(
629            "test".to_string(),
630            PathType::Bare,
631            test_object_store(),
632            &metadata,
633            factory,
634        );
635        let exprs = vec![
636            Expr::BinaryExpr(BinaryExpr {
637                left: Box::new(column("column1")),
638                op: Operator::Eq,
639                right: Box::new(string_lit("value1")),
640            }),
641            Expr::InList(InList {
642                expr: Box::new(column("column1")),
643                list: vec![string_lit("value2"), string_lit("value3")],
644                negated: false,
645            }),
646        ];
647
648        let result = builder.build(&exprs).unwrap();
649        assert!(result.is_some());
650
651        let predicates = result.unwrap().predicates;
652        let column_predicates = predicates.get(&1).unwrap();
653        assert_eq!(column_predicates.len(), 2);
654    }
655}