mito2/sst/index/bloom_filter/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use common_telemetry::warn;
18use datafusion_common::ScalarValue;
19use datafusion_expr::expr::InList;
20use datafusion_expr::{BinaryExpr, Expr, Operator};
21use datatypes::data_type::ConcreteDataType;
22use datatypes::value::Value;
23use index::bloom_filter::applier::InListPredicate;
24use index::Bytes;
25use object_store::ObjectStore;
26use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadata;
29use store_api::storage::ColumnId;
30
31use crate::cache::file_cache::FileCacheRef;
32use crate::cache::index::bloom_filter_index::BloomFilterIndexCacheRef;
33use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, Result};
34use crate::row_converter::SortField;
35use crate::sst::index::bloom_filter::applier::BloomFilterIndexApplier;
36use crate::sst::index::codec::IndexValueCodec;
37use crate::sst::index::puffin_manager::PuffinManagerFactory;
38
39pub struct BloomFilterIndexApplierBuilder<'a> {
40    region_dir: String,
41    object_store: ObjectStore,
42    metadata: &'a RegionMetadata,
43    puffin_manager_factory: PuffinManagerFactory,
44    file_cache: Option<FileCacheRef>,
45    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
46    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
47    predicates: HashMap<ColumnId, Vec<InListPredicate>>,
48}
49
50impl<'a> BloomFilterIndexApplierBuilder<'a> {
51    pub fn new(
52        region_dir: String,
53        object_store: ObjectStore,
54        metadata: &'a RegionMetadata,
55        puffin_manager_factory: PuffinManagerFactory,
56    ) -> Self {
57        Self {
58            region_dir,
59            object_store,
60            metadata,
61            puffin_manager_factory,
62            file_cache: None,
63            puffin_metadata_cache: None,
64            bloom_filter_index_cache: None,
65            predicates: HashMap::default(),
66        }
67    }
68
69    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
70        self.file_cache = file_cache;
71        self
72    }
73
74    pub fn with_puffin_metadata_cache(
75        mut self,
76        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
77    ) -> Self {
78        self.puffin_metadata_cache = puffin_metadata_cache;
79        self
80    }
81
82    pub fn with_bloom_filter_index_cache(
83        mut self,
84        bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
85    ) -> Self {
86        self.bloom_filter_index_cache = bloom_filter_index_cache;
87        self
88    }
89
90    /// Builds the applier with given filter expressions
91    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<BloomFilterIndexApplier>> {
92        for expr in exprs {
93            self.traverse_and_collect(expr);
94        }
95
96        if self.predicates.is_empty() {
97            return Ok(None);
98        }
99
100        let applier = BloomFilterIndexApplier::new(
101            self.region_dir,
102            self.metadata.region_id,
103            self.object_store,
104            self.puffin_manager_factory,
105            self.predicates,
106        )
107        .with_file_cache(self.file_cache)
108        .with_puffin_metadata_cache(self.puffin_metadata_cache)
109        .with_bloom_filter_cache(self.bloom_filter_index_cache);
110
111        Ok(Some(applier))
112    }
113
114    /// Recursively traverses expressions to collect bloom filter predicates
115    fn traverse_and_collect(&mut self, expr: &Expr) {
116        let res = match expr {
117            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
118                Operator::And => {
119                    self.traverse_and_collect(left);
120                    self.traverse_and_collect(right);
121                    Ok(())
122                }
123                Operator::Eq => self.collect_eq(left, right),
124                _ => Ok(()),
125            },
126            Expr::InList(in_list) => self.collect_in_list(in_list),
127            _ => Ok(()),
128        };
129
130        if let Err(err) = res {
131            warn!(err; "Failed to collect bloom filter predicates, ignore it. expr: {expr}");
132        }
133    }
134
135    /// Helper function to get the column id and type
136    fn column_id_and_type(
137        &self,
138        column_name: &str,
139    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
140        let column = self
141            .metadata
142            .column_by_name(column_name)
143            .context(ColumnNotFoundSnafu {
144                column: column_name,
145            })?;
146
147        Ok(Some((
148            column.column_id,
149            column.column_schema.data_type.clone(),
150        )))
151    }
152
153    /// Collects an equality expression (column = value)
154    fn collect_eq(&mut self, left: &Expr, right: &Expr) -> Result<()> {
155        let (col, lit) = match (left, right) {
156            (Expr::Column(col), Expr::Literal(lit)) => (col, lit),
157            (Expr::Literal(lit), Expr::Column(col)) => (col, lit),
158            _ => return Ok(()),
159        };
160        if lit.is_null() {
161            return Ok(());
162        }
163        let Some((column_id, data_type)) = self.column_id_and_type(&col.name)? else {
164            return Ok(());
165        };
166        let value = encode_lit(lit, data_type)?;
167        self.predicates
168            .entry(column_id)
169            .or_default()
170            .push(InListPredicate {
171                list: HashSet::from([value]),
172            });
173
174        Ok(())
175    }
176
177    /// Collects an in list expression in the form of `column IN (lit, lit, ...)`.
178    fn collect_in_list(&mut self, in_list: &InList) -> Result<()> {
179        // Only collect InList predicates if they reference a column
180        let Expr::Column(column) = &in_list.expr.as_ref() else {
181            return Ok(());
182        };
183        if in_list.list.is_empty() || in_list.negated {
184            return Ok(());
185        }
186
187        let Some((column_id, data_type)) = self.column_id_and_type(&column.name)? else {
188            return Ok(());
189        };
190
191        // Convert all non-null literals to predicates
192        let predicates = in_list
193            .list
194            .iter()
195            .filter_map(Self::nonnull_lit)
196            .map(|lit| encode_lit(lit, data_type.clone()));
197
198        // Collect successful conversions
199        let mut valid_predicates = HashSet::new();
200        for predicate in predicates {
201            match predicate {
202                Ok(p) => {
203                    valid_predicates.insert(p);
204                }
205                Err(e) => warn!(e; "Failed to convert value in InList"),
206            }
207        }
208
209        if !valid_predicates.is_empty() {
210            self.predicates
211                .entry(column_id)
212                .or_default()
213                .push(InListPredicate {
214                    list: valid_predicates,
215                });
216        }
217
218        Ok(())
219    }
220
221    /// Helper function to get non-null literal value
222    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
223        match expr {
224            Expr::Literal(lit) if !lit.is_null() => Some(lit),
225            _ => None,
226        }
227    }
228}
229
230// TODO(ruihang): extract this and the one under inverted_index into a common util mod.
231/// Helper function to encode a literal into bytes.
232fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Bytes> {
233    let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
234    let mut bytes = vec![];
235    let field = SortField::new(data_type);
236    IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
237    Ok(bytes)
238}
239
240#[cfg(test)]
241mod tests {
242    use api::v1::SemanticType;
243    use datafusion_common::Column;
244    use datatypes::schema::ColumnSchema;
245    use object_store::services::Memory;
246    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
247    use store_api::storage::RegionId;
248
249    use super::*;
250
251    fn test_region_metadata() -> RegionMetadata {
252        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
253        builder
254            .push_column_metadata(ColumnMetadata {
255                column_schema: ColumnSchema::new(
256                    "column1",
257                    ConcreteDataType::string_datatype(),
258                    false,
259                ),
260                semantic_type: SemanticType::Tag,
261                column_id: 1,
262            })
263            .push_column_metadata(ColumnMetadata {
264                column_schema: ColumnSchema::new(
265                    "column2",
266                    ConcreteDataType::int64_datatype(),
267                    false,
268                ),
269                semantic_type: SemanticType::Field,
270                column_id: 2,
271            })
272            .push_column_metadata(ColumnMetadata {
273                column_schema: ColumnSchema::new(
274                    "column3",
275                    ConcreteDataType::timestamp_millisecond_datatype(),
276                    false,
277                ),
278                semantic_type: SemanticType::Timestamp,
279                column_id: 3,
280            })
281            .primary_key(vec![1]);
282        builder.build().unwrap()
283    }
284
285    fn test_object_store() -> ObjectStore {
286        ObjectStore::new(Memory::default()).unwrap().finish()
287    }
288
289    fn column(name: &str) -> Expr {
290        Expr::Column(Column::from_name(name))
291    }
292
293    fn string_lit(s: impl Into<String>) -> Expr {
294        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
295    }
296
297    #[test]
298    fn test_build_with_exprs() {
299        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_exprs_");
300        let metadata = test_region_metadata();
301        let builder = BloomFilterIndexApplierBuilder::new(
302            "test".to_string(),
303            test_object_store(),
304            &metadata,
305            factory,
306        );
307        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
308            left: Box::new(column("column1")),
309            op: Operator::Eq,
310            right: Box::new(string_lit("value1")),
311        })];
312        let result = builder.build(&exprs).unwrap();
313        assert!(result.is_some());
314
315        let predicates = result.unwrap().predicates;
316        assert_eq!(predicates.len(), 1);
317
318        let column_predicates = predicates.get(&1).unwrap();
319        assert_eq!(column_predicates.len(), 1);
320
321        let expected = encode_lit(
322            &ScalarValue::Utf8(Some("value1".to_string())),
323            ConcreteDataType::string_datatype(),
324        )
325        .unwrap();
326        assert_eq!(column_predicates[0].list, HashSet::from([expected]));
327    }
328
329    fn int64_lit(i: i64) -> Expr {
330        Expr::Literal(ScalarValue::Int64(Some(i)))
331    }
332
333    #[test]
334    fn test_build_with_in_list() {
335        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_in_list_");
336        let metadata = test_region_metadata();
337        let builder = BloomFilterIndexApplierBuilder::new(
338            "test".to_string(),
339            test_object_store(),
340            &metadata,
341            factory,
342        );
343
344        let exprs = vec![Expr::InList(InList {
345            expr: Box::new(column("column2")),
346            list: vec![int64_lit(1), int64_lit(2), int64_lit(3)],
347            negated: false,
348        })];
349
350        let result = builder.build(&exprs).unwrap();
351        assert!(result.is_some());
352
353        let predicates = result.unwrap().predicates;
354        let column_predicates = predicates.get(&2).unwrap();
355        assert_eq!(column_predicates.len(), 1);
356        assert_eq!(column_predicates[0].list.len(), 3);
357    }
358
359    #[test]
360    fn test_build_with_and_expressions() {
361        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_and_");
362        let metadata = test_region_metadata();
363        let builder = BloomFilterIndexApplierBuilder::new(
364            "test".to_string(),
365            test_object_store(),
366            &metadata,
367            factory,
368        );
369        let exprs = vec![Expr::BinaryExpr(BinaryExpr {
370            left: Box::new(Expr::BinaryExpr(BinaryExpr {
371                left: Box::new(column("column1")),
372                op: Operator::Eq,
373                right: Box::new(string_lit("value1")),
374            })),
375            op: Operator::And,
376            right: Box::new(Expr::BinaryExpr(BinaryExpr {
377                left: Box::new(column("column2")),
378                op: Operator::Eq,
379                right: Box::new(int64_lit(42)),
380            })),
381        })];
382        let result = builder.build(&exprs).unwrap();
383        assert!(result.is_some());
384
385        let predicates = result.unwrap().predicates;
386        assert_eq!(predicates.len(), 2);
387        assert!(predicates.contains_key(&1));
388        assert!(predicates.contains_key(&2));
389    }
390
391    #[test]
392    fn test_build_with_null_values() {
393        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_null_");
394        let metadata = test_region_metadata();
395        let builder = BloomFilterIndexApplierBuilder::new(
396            "test".to_string(),
397            test_object_store(),
398            &metadata,
399            factory,
400        );
401
402        let exprs = vec![
403            Expr::BinaryExpr(BinaryExpr {
404                left: Box::new(column("column1")),
405                op: Operator::Eq,
406                right: Box::new(Expr::Literal(ScalarValue::Utf8(None))),
407            }),
408            Expr::InList(InList {
409                expr: Box::new(column("column2")),
410                list: vec![
411                    int64_lit(1),
412                    Expr::Literal(ScalarValue::Int64(None)),
413                    int64_lit(3),
414                ],
415                negated: false,
416            }),
417        ];
418
419        let result = builder.build(&exprs).unwrap();
420        assert!(result.is_some());
421
422        let predicates = result.unwrap().predicates;
423        assert!(!predicates.contains_key(&1)); // Null equality should be ignored
424        let column2_predicates = predicates.get(&2).unwrap();
425        assert_eq!(column2_predicates[0].list.len(), 2);
426    }
427
428    #[test]
429    fn test_build_with_invalid_expressions() {
430        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_invalid_");
431        let metadata = test_region_metadata();
432        let builder = BloomFilterIndexApplierBuilder::new(
433            "test".to_string(),
434            test_object_store(),
435            &metadata,
436            factory,
437        );
438        let exprs = vec![
439            // Non-equality operator
440            Expr::BinaryExpr(BinaryExpr {
441                left: Box::new(column("column1")),
442                op: Operator::Gt,
443                right: Box::new(string_lit("value1")),
444            }),
445            // Non-existent column
446            Expr::BinaryExpr(BinaryExpr {
447                left: Box::new(column("non_existent")),
448                op: Operator::Eq,
449                right: Box::new(string_lit("value")),
450            }),
451            // Negated IN list
452            Expr::InList(InList {
453                expr: Box::new(column("column2")),
454                list: vec![int64_lit(1), int64_lit(2)],
455                negated: true,
456            }),
457        ];
458
459        let result = builder.build(&exprs).unwrap();
460        assert!(result.is_none());
461    }
462
463    #[test]
464    fn test_build_with_multiple_predicates_same_column() {
465        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_build_with_multiple_");
466        let metadata = test_region_metadata();
467        let builder = BloomFilterIndexApplierBuilder::new(
468            "test".to_string(),
469            test_object_store(),
470            &metadata,
471            factory,
472        );
473        let exprs = vec![
474            Expr::BinaryExpr(BinaryExpr {
475                left: Box::new(column("column1")),
476                op: Operator::Eq,
477                right: Box::new(string_lit("value1")),
478            }),
479            Expr::InList(InList {
480                expr: Box::new(column("column1")),
481                list: vec![string_lit("value2"), string_lit("value3")],
482                negated: false,
483            }),
484        ];
485
486        let result = builder.build(&exprs).unwrap();
487        assert!(result.is_some());
488
489        let predicates = result.unwrap().predicates;
490        let column_predicates = predicates.get(&1).unwrap();
491        assert_eq!(column_predicates.len(), 2);
492    }
493}