mito2/sst/index/bloom_filter/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, BTreeSet};
16
17use common_telemetry::warn;
18use datafusion_common::{Column, ScalarValue};
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use mito_codec::index::IndexValueCodec;
26use mito_codec::row_converter::SortField;
27use object_store::ObjectStore;
28use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadata;
31use store_api::storage::ColumnId;
32
33use crate::cache::file_cache::FileCacheRef;
34use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
35use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
36use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38
39pub struct BloomFilterIndexApplierBuilder<'a> {
40    region_dir: String,
41    object_store: ObjectStore,
42    metadata: &'a RegionMetadata,
43    puffin_manager_factory: PuffinManagerFactory,
44    file_cache: Option<FileCacheRef>,
45    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
46    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
47    predicates: BTreeMap<ColumnId, Vec<InListPredicate>>,
48}
49
50impl<'a> BloomFilterIndexApplierBuilder<'a> {
51    pub fn new(
52        region_dir: String,
53        object_store: ObjectStore,
54        metadata: &'a RegionMetadata,
55        puffin_manager_factory: PuffinManagerFactory,
56    ) -> Self {
57        Self {
58            region_dir,
59            object_store,
60            metadata,
61            puffin_manager_factory,
62            file_cache: None,
63            puffin_metadata_cache: None,
64            bloom_filter_index_cache: None,
65            predicates: BTreeMap::default(),
66        }
67    }
68
69    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
70        self.file_cache = file_cache;
71        self
72    }
73
74    pub fn with_puffin_metadata_cache(
75        mut self,
76        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
77    ) -> Self {
78        self.puffin_metadata_cache = puffin_metadata_cache;
79        self
80    }
81
82    pub fn with_bloom_filter_index_cache(
83        mut self,
84        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
85    ) -> Self {
86        self.bloom_filter_index_cache = bloom_filter_index_cache;
87        self
88    }
89
90    /// Builds the applier with given filter expressions
91    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
92        for expr in exprs {
93            self.traverse_and_collect(expr);
94        }
95
96        if self.predicates.is_empty() {
97            return Ok(None);
98        }
99
100        let applier = BloomFilterIndexApplier::new(
101            self.region_dir,
102            self.metadata.region_id,
103            self.object_store,
104            self.puffin_manager_factory,
105            self.predicates,
106        )
107        .with_file_cache(self.file_cache)
108        .with_puffin_metadata_cache(self.puffin_metadata_cache)
109        .with_bloom_filter_cache(self.bloom_filter_index_cache);
110
111        Ok(Some(applier))
112    }
113
114    /// Recursively traverses expressions to collect bloom filter predicates
115    fn traverse_and_collect(&mut self, expr: &Expr) {
116        let res = match expr {
117            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
118                Operator::And => {
119                    self.traverse_and_collect(left);
120                    self.traverse_and_collect(right);
121                    Ok(())
122                }
123                Operator::Eq => self.collect_eq(left, right),
124                Operator::Or => self.collect_or_eq_list(left, right),
125                _ => Ok(()),
126            },
127            Expr::InList(in_list) => self.collect_in_list(in_list),
128            _ => Ok(()),
129        };
130
131        if let Err(err) = res {
132            warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
133        }
134    }
135
136    /// Helper function to get the column id and type
137    fn column_id_and_type(
138        &self,
139        column_name: &str,
140    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
141        let column = self
142            .metadata
143            .column_by_name(column_name)
144            .context(ColumnNotFoundSnafu {
145                column: column_name,
146            })?;
147
148        Ok(Some((
149            column.column_id,
150            column.column_schema.data_type.clone(),
151        )))
152    }
153
154    /// Collects an equality expression (column = value)
155    fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
156        let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
157            return Ok(());
158        };
159        if lit.is_null() {
160            return Ok(());
161        }
162        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
163            return Ok(());
164        };
165        let value = encode_lit(lit, data_type)?;
166        self.predicates
167            .entry(column_id)
168            .or_default()
169            .push(InListPredicate {
170                list: BTreeSet::from([value]),
171            });
172
173        Ok(())
174    }
175
176    /// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
177    fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
178        // Only collect InList predicates if they reference a column
179        let Expr::Column(column) = &in_list.expr.as_ref() else {
180            return Ok(());
181        };
182        if in_list.list.is_empty() || in_list.negated {
183            return Ok(());
184        }
185
186        let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
187            return Ok(());
188        };
189
190        // Convert all non-null literals to predicates
191        let predicates = in_list
192            .list
193            .iter()
194            .filter_map(Self::nonnull_lit)
195            .map(|lit| encode_lit(lit, data_type.clone()));
196
197        // Collect successful conversions
198        let mut valid_predicates = BTreeSet::new();
199        for predicate in predicates {
200            match predicate {
201                Ok(p) => {
202                    valid_predicates.insert(p);
203                }
204                Err(e) => warn!(e; "Failed to convert value in InList"),
205            }
206        }
207
208        if !valid_predicates.is_empty() {
209            self.predicates
210                .entry(column_id)
211                .or_default()
212                .push(InListPredicate {
213                    list: valid_predicates,
214                });
215        }
216
217        Ok(())
218    }
219
220    /// Collects an or expression in the form of `column = lit OR column = lit OR ...`.
221    fn collect_or_eq_list(&mut self, left: &Expr, right: &Expr) -> Result<()> {
222        let (eq_left, eq_right, or_list) = if let Expr::BinaryExpr(BinaryExpr {
223            left: l,
224            op: Operator::Eq,
225            right: r,
226        }) = left
227        {
228            (l, r, right)
229        } else if let Expr::BinaryExpr(BinaryExpr {
230            left: l,
231            op: Operator::Eq,
232            right: r,
233        }) = right
234        {
235            (l, r, left)
236        } else {
237            return Ok(());
238        };
239
240        let Some((col, lit)) = Self::eq_expr_col_lit(eq_left, eq_right)? else {
241            return Ok(());
242        };
243        if lit.is_null() {
244            return Ok(());
245        }
246        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
247            return Ok(());
248        };
249
250        let mut inlist = BTreeSet::new();
251        inlist.insert(encode_lit(lit, data_type.clone())?);
252        if Self::collect_or_eq_list_rec(&col.name, &data_type, or_list, &mut inlist)? {
253            self.predicates
254                .entry(column_id)
255                .or_default()
256                .push(InListPredicate { list: inlist });
257        }
258
259        Ok(())
260    }
261
262    fn collect_or_eq_list_rec(
263        column_name: &str,
264        data_type: &ConcreteDataType,
265        expr: &Expr,
266        inlist: &mut BTreeSet<Bytes>,
267    ) -> Result<bool> {
268        if let Expr::BinaryExpr(BinaryExpr { left, op, right }) = expr {
269            match op {
270                Operator::Or => {
271                    let r = Self::collect_or_eq_list_rec(column_name, data_type, left, inlist)?
272                        .then(|| {
273                            Self::collect_or_eq_list_rec(column_name, data_type, right, inlist)
274                        })
275                        .transpose()?
276                        .unwrap_or(false);
277                    return Ok(r);
278                }
279                Operator::Eq => {
280                    let Some((col, lit)) = Self::eq_expr_col_lit(left, right)? else {
281                        return Ok(false);
282                    };
283                    if lit.is_null() || column_name != col.name {
284                        return Ok(false);
285                    }
286                    let bytes = encode_lit(lit, data_type.clone())?;
287                    inlist.insert(bytes);
288                    return Ok(true);
289                }
290                _ => {}
291            }
292        }
293
294        Ok(false)
295    }
296
297    /// Helper function to get non-null literal value
298    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
299        match expr {
300            Expr::Literal(lit) if !lit.is_null() => Some(lit),
301            _ => None,
302        }
303    }
304
305    /// Helper function to get the column and literal value from an equality expr (column = lit)
306    fn eq_expr_col_lit<'b>(
307        left: &'b Expr,
308        right: &'b Expr,
309    ) -> Result<Option<(&'b Column, &'b ScalarValue)>> {
310        let (col, lit) = match (left, right) {
311            (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
312            (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
313            _ => return Ok(None),
314        };
315        Ok(Some((col, lit)))
316    }
317}
318
319// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
320/// Helper function to encode a literal into bytes.
321fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
322    let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
323    let mut bytes = vec![];
324    let field = SortField::new(data_type);
325    IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
326        .context(EncodeSnafu)?;
327    Ok(bytes)
328}
329
330#[cfg(test)]
331mod tests {
332    use api::v1::SemanticType;
333    use datafusion_common::Column;
334    use datafusion_expr::{col, lit};
335    use datatypes::schema::ColumnSchema;
336    use object_store::services::Memory;
337    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
338    use store_api::storage::RegionId;
339
340    use super::*;
341
342    fn test_region_metadata() -> RegionMetadata {
343        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
344        builder
345            .push_column_metadata(ColumnMetadata {
346                column_schema: ColumnSchema::new(
347                    "column1",
348                    ConcreteDataType::string_datatype(),
349                    false,
350                ),
351                semantic_type: SemanticType::Tag,
352                column_id: 1,
353            })
354            .push_column_metadata(ColumnMetadata {
355                column_schema: ColumnSchema::new(
356                    "column2",
357                    ConcreteDataType::int64_datatype(),
358                    false,
359                ),
360                semantic_type: SemanticType::Field,
361                column_id: 2,
362            })
363            .push_column_metadata(ColumnMetadata {
364                column_schema: ColumnSchema::new(
365                    "column3",
366                    ConcreteDataType::timestamp_millisecond_datatype(),
367                    false,
368                ),
369                semantic_type: SemanticType::Timestamp,
370                column_id: 3,
371            })
372            .primary_key(vec![1]);
373        builder.build().unwrap()
374    }
375
376    fn test_object_store() -> ObjectStore {
377        ObjectStore::new(Memory::default()).unwrap().finish()
378    }
379
380    fn column(name: &str) -> Expr {
381        Expr::Column(Column::from_name(name))
382    }
383
384    fn string_lit(s: impl Into<String>) -> Expr {
385        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
386    }
387
388    #[test]
389    fn test_build_with_exprs() {
390        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
391        let metadata = test_region_metadata();
392        let builder = BloomFilterIndexApplierBuilder::new(
393            "test".to_string(),
394            test_object_store(),
395            &metadata,
396            factory,
397        );
398        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
399            left: Box::new(column("column1")),
400            op: Operator::Eq,
401            right: Box::new(string_lit("value1")),
402        })];
403        let result = builder.build(&exprs).unwrap();
404        assert!(result.is_some());
405
406        let predicates = result.unwrap().predicates;
407        assert_eq!(predicates.len(), 1);
408
409        let column_predicates = predicates.get(&1).unwrap();
410        assert_eq!(column_predicates.len(), 1);
411
412        let expected = encode_lit(
413            &ScalarValue::Utf8(Some("value1".to_string())),
414            ConcreteDataType::string_datatype(),
415        )
416        .unwrap();
417        assert_eq!(column_predicates[0].list, BTreeSet::from([expected]));
418    }
419
420    fn int64_lit(i: i64) -> Expr {
421        Expr::Literal(ScalarValue::Int64(Some(i)))
422    }
423
424    #[test]
425    fn test_build_with_in_list() {
426        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
427        let metadata = test_region_metadata();
428        let builder = BloomFilterIndexApplierBuilder::new(
429            "test".to_string(),
430            test_object_store(),
431            &metadata,
432            factory,
433        );
434
435        let exprs = vec![Expr::InList(InList {
436            expr: Box::new(column("column2")),
437            list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
438            negated: false,
439        })];
440
441        let result = builder.build(&exprs).unwrap();
442        assert!(result.is_some());
443
444        let predicates = result.unwrap().predicates;
445        let column_predicates = predicates.get(&2).unwrap();
446        assert_eq!(column_predicates.len(), 1);
447        assert_eq!(column_predicates[0].list.len(), 3);
448    }
449
450    #[test]
451    fn test_build_with_or_chain() {
452        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_or_chain_");
453        let metadata = test_region_metadata();
454        let builder = || {
455            BloomFilterIndexApplierBuilder::new(
456                "test".to_string(),
457                test_object_store(),
458                &metadata,
459                factory.clone(),
460            )
461        };
462
463        let expr = col("column1")
464            .eq(lit("value1"))
465            .or(col("column1")
466                .eq(lit("value2"))
467                .or(col("column1").eq(lit("value4"))))
468            .or(col("column1").eq(lit("value3")));
469
470        let result = builder().build(&[expr]).unwrap();
471        assert!(result.is_some());
472
473        let predicates = result.unwrap().predicates;
474        let column_predicates = predicates.get(&1).unwrap();
475        assert_eq!(column_predicates.len(), 1);
476        assert_eq!(column_predicates[0].list.len(), 4);
477        let or_chain_predicates = &column_predicates[0].list;
478        let encode_str = |s: &str| {
479            encode_lit(
480                &ScalarValue::Utf8(Some(s.to_string())),
481                ConcreteDataType::string_datatype(),
482            )
483            .unwrap()
484        };
485        assert!(or_chain_predicates.contains(&encode_str("value1")));
486        assert!(or_chain_predicates.contains(&encode_str("value2")));
487        assert!(or_chain_predicates.contains(&encode_str("value3")));
488        assert!(or_chain_predicates.contains(&encode_str("value4")));
489
490        // Test with null value
491        let expr = col("column1").eq(Expr::Literal(ScalarValue::Utf8(None)));
492        let result = builder().build(&[expr]).unwrap();
493        assert!(result.is_none());
494
495        // Test with different column
496        let expr = col("column1")
497            .eq(lit("value1"))
498            .or(col("column2").eq(lit("value2")));
499        let result = builder().build(&[expr]).unwrap();
500        assert!(result.is_none());
501
502        // Test with non or chain
503        let expr = col("column1")
504            .eq(lit("value1"))
505            .or(col("column1").gt_eq(lit("value2")));
506        let result = builder().build(&[expr]).unwrap();
507        assert!(result.is_none());
508    }
509
510    #[test]
511    fn test_build_with_and_expressions() {
512        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
513        let metadata = test_region_metadata();
514        let builder = BloomFilterIndexApplierBuilder::new(
515            "test".to_string(),
516            test_object_store(),
517            &metadata,
518            factory,
519        );
520        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
521            left: Box::new(Expr::BinaryExpr(BinaryExpr {
522                left: Box::new(column("column1")),
523                op: Operator::Eq,
524                right: Box::new(string_lit("value1")),
525            })),
526            op: Operator::And,
527            right: Box::new(Expr::BinaryExpr(BinaryExpr {
528                left: Box::new(column("column2")),
529                op: Operator::Eq,
530                right: Box::new(int64_lit(42)),
531            })),
532        })];
533        let result = builder.build(&exprs).unwrap();
534        assert!(result.is_some());
535
536        let predicates = result.unwrap().predicates;
537        assert_eq!(predicates.len(), 2);
538        assert!(predicates.contains_key(&1));
539        assert!(predicates.contains_key(&2));
540    }
541
542    #[test]
543    fn test_build_with_null_values() {
544        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
545        let metadata = test_region_metadata();
546        let builder = BloomFilterIndexApplierBuilder::new(
547            "test".to_string(),
548            test_object_store(),
549            &metadata,
550            factory,
551        );
552
553        let exprs = vec![
554            Expr::BinaryExpr(BinaryExpr {
555                left: Box::new(column("column1")),
556                op: Operator::Eq,
557                right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
558            }),
559            Expr::InList(InList {
560                expr: Box::new(column("column2")),
561                list: vec![
562                    int64_lit(1),
563                    Expr::Literal(ScalarValue::Int64(None)),
564                    int64_lit(3),
565                ],
566                negated: false,
567            }),
568        ];
569
570        let result = builder.build(&exprs).unwrap();
571        assert!(result.is_some());
572
573        let predicates = result.unwrap().predicates;
574        assert!(!predicates.contains_key(&1)); // Null equality should be ignored
575        let column2_predicates = predicates.get(&2).unwrap();
576        assert_eq!(column2_predicates[0].list.len(), 2);
577    }
578
579    #[test]
580    fn test_build_with_invalid_expressions() {
581        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
582        let metadata = test_region_metadata();
583        let builder = BloomFilterIndexApplierBuilder::new(
584            "test".to_string(),
585            test_object_store(),
586            &metadata,
587            factory,
588        );
589        let exprs = vec![
590            // Non-equality operator
591            Expr::BinaryExpr(BinaryExpr {
592                left: Box::new(column("column1")),
593                op: Operator::Gt,
594                right: Box::new(string_lit("value1")),
595            }),
596            // Non-existent column
597            Expr::BinaryExpr(BinaryExpr {
598                left: Box::new(column("non_existent")),
599                op: Operator::Eq,
600                right: Box::new(string_lit("value")),
601            }),
602            // Negated IN list
603            Expr::InList(InList {
604                expr: Box::new(column("column2")),
605                list: vec![int64_lit(1), int64_lit(2)],
606                negated: true,
607            }),
608        ];
609
610        let result = builder.build(&exprs).unwrap();
611        assert!(result.is_none());
612    }
613
614    #[test]
615    fn test_build_with_multiple_predicates_same_column() {
616        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
617        let metadata = test_region_metadata();
618        let builder = BloomFilterIndexApplierBuilder::new(
619            "test".to_string(),
620            test_object_store(),
621            &metadata,
622            factory,
623        );
624        let exprs = vec![
625            Expr::BinaryExpr(BinaryExpr {
626                left: Box::new(column("column1")),
627                op: Operator::Eq,
628                right: Box::new(string_lit("value1")),
629            }),
630            Expr::InList(InList {
631                expr: Box::new(column("column1")),
632                list: vec![string_lit("value2"), string_lit("value3")],
633                negated: false,
634            }),
635        ];
636
637        let result = builder.build(&exprs).unwrap();
638        assert!(result.is_some());
639
640        let predicates = result.unwrap().predicates;
641        let column_predicates = predicates.get(&1).unwrap();
642        assert_eq!(column_predicates.len(), 2);
643    }
644}