mito2/sst/index/inverted_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use object_store::ObjectStore;
31use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
32use snafu::{OptionExt, ResultExt};
33use store_api::metadata::RegionMetadata;
34use store_api::storage::ColumnId;
35
36use crate::cache::file_cache::FileCacheRef;
37use crate::cache::index::inverted_index::InvertedIndexCacheRef;
38use crate::error::{BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, Result};
39use crate::row_converter::SortField;
40use crate::sst::index::codec::IndexValueCodec;
41use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
42use crate::sst::index::puffin_manager::PuffinManagerFactory;
43
44/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
45pub(crate) struct InvertedIndexApplierBuilder<'a> {
46    /// Directory of the region, required argument for constructing [`InvertedIndexApplier`].
47    region_dir: String,
48
49    /// Object store, required argument for constructing [`InvertedIndexApplier`].
50    object_store: ObjectStore,
51
52    /// File cache, required argument for constructing [`InvertedIndexApplier`].
53    file_cache: Option<FileCacheRef>,
54
55    /// Metadata of the region, used to get metadata like column type.
56    metadata: &'a RegionMetadata,
57
58    /// Column ids of the columns that are indexed.
59    indexed_column_ids: HashSet<ColumnId>,
60
61    /// Stores predicates during traversal on the Expr tree.
62    output: BTreeMap<ColumnId, Vec<Predicate>>,
63
64    /// The puffin manager factory.
65    puffin_manager_factory: PuffinManagerFactory,
66
67    /// Cache for inverted index.
68    inverted_index_cache: Option<InvertedIndexCacheRef>,
69
70    /// Cache for puffin metadata.
71    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
72}
73
74impl<'a> InvertedIndexApplierBuilder<'a> {
75    /// Creates a new [`InvertedIndexApplierBuilder`].
76    pub fn new(
77        region_dir: String,
78        object_store: ObjectStore,
79        metadata: &'a RegionMetadata,
80        indexed_column_ids: HashSet<ColumnId>,
81        puffin_manager_factory: PuffinManagerFactory,
82    ) -> Self {
83        Self {
84            region_dir,
85            object_store,
86            metadata,
87            indexed_column_ids,
88            output: BTreeMap::default(),
89            puffin_manager_factory,
90            file_cache: None,
91            inverted_index_cache: None,
92            puffin_metadata_cache: None,
93        }
94    }
95
96    /// Sets the file cache.
97    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
98        self.file_cache = file_cache;
99        self
100    }
101
102    /// Sets the puffin metadata cache.
103    pub fn with_puffin_metadata_cache(
104        mut self,
105        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
106    ) -> Self {
107        self.puffin_metadata_cache = puffin_metadata_cache;
108        self
109    }
110
111    /// Sets the inverted index cache.
112    pub fn with_inverted_index_cache(
113        mut self,
114        inverted_index_cache: Option<InvertedIndexCacheRef>,
115    ) -> Self {
116        self.inverted_index_cache = inverted_index_cache;
117        self
118    }
119
120    /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
121    /// the expressions provided. If no predicates match, returns `None`.
122    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
123        for expr in exprs {
124            self.traverse_and_collect(expr);
125        }
126
127        if self.output.is_empty() {
128            return Ok(None);
129        }
130
131        let predicates = self
132            .output
133            .iter()
134            .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
135            .collect();
136        let applier = PredicatesIndexApplier::try_from(predicates);
137
138        Ok(Some(
139            InvertedIndexApplier::new(
140                self.region_dir,
141                self.metadata.region_id,
142                self.object_store,
143                Box::new(applier.context(BuildIndexApplierSnafu)?),
144                self.puffin_manager_factory,
145                self.output,
146            )
147            .with_file_cache(self.file_cache)
148            .with_puffin_metadata_cache(self.puffin_metadata_cache)
149            .with_index_cache(self.inverted_index_cache),
150        ))
151    }
152
153    /// Recursively traverses expressions to collect predicates.
154    /// Results are stored in `self.output`.
155    fn traverse_and_collect(&mut self, expr: &Expr) {
156        let res = match expr {
157            Expr::Between(between) => self.collect_between(between),
158
159            Expr::InList(in_list) => self.collect_inlist(in_list),
160            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
161                Operator::And => {
162                    self.traverse_and_collect(left);
163                    self.traverse_and_collect(right);
164                    Ok(())
165                }
166                Operator::Or => self.collect_or_eq_list(left, right),
167                Operator::Eq => self.collect_eq(left, right),
168                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
169                    self.collect_comparison_expr(left, op, right)
170                }
171                Operator::RegexMatch => self.collect_regex_match(left, right),
172                _ => Ok(()),
173            },
174
175            // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
176            _ => Ok(()),
177        };
178
179        if let Err(err) = res {
180            warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
181        }
182    }
183
184    /// Helper function to add a predicate to the output.
185    fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
186        self.output.entry(column_id).or_default().push(predicate);
187    }
188
189    /// Helper function to get the column id and the column type of a column.
190    /// Returns `None` if the column is not a tag column or if the column is ignored.
191    fn column_id_and_type(
192        &self,
193        column_name: &str,
194    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
195        let column = self
196            .metadata
197            .column_by_name(column_name)
198            .context(ColumnNotFoundSnafu {
199                column: column_name,
200            })?;
201
202        if !self.indexed_column_ids.contains(&column.column_id) {
203            return Ok(None);
204        }
205
206        Ok(Some((
207            column.column_id,
208            column.column_schema.data_type.clone(),
209        )))
210    }
211
212    /// Helper function to get a non-null literal.
213    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
214        match expr {
215            Expr::Literal(lit) if !lit.is_null() => Some(lit),
216            _ => None,
217        }
218    }
219
220    /// Helper function to get the column name of a column expression.
221    fn column_name(expr: &Expr) -> Option<&str> {
222        match expr {
223            Expr::Column(column) => Some(&column.name),
224            _ => None,
225        }
226    }
227
228    /// Helper function to encode a literal into bytes.
229    fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
230        let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
231        let mut bytes = vec![];
232        let field = SortField::new(data_type);
233        IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)?;
234        Ok(bytes)
235    }
236}
237
238#[cfg(test)]
239mod tests {
240    use api::v1::SemanticType;
241    use datafusion_common::Column;
242    use datafusion_expr::Between;
243    use datatypes::data_type::ConcreteDataType;
244    use datatypes::schema::ColumnSchema;
245    use index::inverted_index::search::predicate::{
246        Bound, Range, RangePredicate, RegexMatchPredicate,
247    };
248    use object_store::services::Memory;
249    use object_store::ObjectStore;
250    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
251    use store_api::storage::RegionId;
252
253    use super::*;
254
255    pub(crate) fn test_region_metadata() -> RegionMetadata {
256        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
257        builder
258            .push_column_metadata(ColumnMetadata {
259                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
260                semantic_type: SemanticType::Tag,
261                column_id: 1,
262            })
263            .push_column_metadata(ColumnMetadata {
264                column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
265                semantic_type: SemanticType::Tag,
266                column_id: 2,
267            })
268            .push_column_metadata(ColumnMetadata {
269                column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
270                semantic_type: SemanticType::Field,
271                column_id: 3,
272            })
273            .push_column_metadata(ColumnMetadata {
274                column_schema: ColumnSchema::new(
275                    "d",
276                    ConcreteDataType::timestamp_millisecond_datatype(),
277                    false,
278                ),
279                semantic_type: SemanticType::Timestamp,
280                column_id: 4,
281            })
282            .primary_key(vec![1, 2]);
283        builder.build().unwrap()
284    }
285
286    pub(crate) fn test_object_store() -> ObjectStore {
287        ObjectStore::new(Memory::default()).unwrap().finish()
288    }
289
290    pub(crate) fn tag_column() -> Expr {
291        Expr::Column(Column::from_name("a"))
292    }
293
294    pub(crate) fn tag_column2() -> Expr {
295        Expr::Column(Column::from_name("b"))
296    }
297
298    pub(crate) fn field_column() -> Expr {
299        Expr::Column(Column::from_name("c"))
300    }
301
302    pub(crate) fn nonexistent_column() -> Expr {
303        Expr::Column(Column::from_name("nonexistence"))
304    }
305
306    pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
307        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
308    }
309
310    pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
311        Expr::Literal(ScalarValue::Int64(Some(i.into())))
312    }
313
314    pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
315        let mut bytes = vec![];
316        IndexValueCodec::encode_nonnull_value(
317            Value::from(s.into()).as_value_ref(),
318            &SortField::new(ConcreteDataType::string_datatype()),
319            &mut bytes,
320        )
321        .unwrap();
322        bytes
323    }
324
325    pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
326        let mut bytes = vec![];
327        IndexValueCodec::encode_nonnull_value(
328            Value::from(s.into()).as_value_ref(),
329            &SortField::new(ConcreteDataType::int64_datatype()),
330            &mut bytes,
331        )
332        .unwrap();
333        bytes
334    }
335
336    #[test]
337    fn test_collect_and_basic() {
338        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
339
340        let metadata = test_region_metadata();
341        let mut builder = InvertedIndexApplierBuilder::new(
342            "test".to_string(),
343            test_object_store(),
344            &metadata,
345            HashSet::from_iter([1, 2, 3]),
346            facotry,
347        );
348
349        let expr = Expr::BinaryExpr(BinaryExpr {
350            left: Box::new(Expr::BinaryExpr(BinaryExpr {
351                left: Box::new(tag_column()),
352                op: Operator::RegexMatch,
353                right: Box::new(string_lit("bar")),
354            })),
355            op: Operator::And,
356            right: Box::new(Expr::Between(Between {
357                expr: Box::new(tag_column2()),
358                negated: false,
359                low: Box::new(int64_lit(123)),
360                high: Box::new(int64_lit(456)),
361            })),
362        });
363
364        builder.traverse_and_collect(&expr);
365        let predicates = builder.output.get(&1).unwrap();
366        assert_eq!(predicates.len(), 1);
367        assert_eq!(
368            predicates[0],
369            Predicate::RegexMatch(RegexMatchPredicate {
370                pattern: "bar".to_string()
371            })
372        );
373        let predicates = builder.output.get(&2).unwrap();
374        assert_eq!(predicates.len(), 1);
375        assert_eq!(
376            predicates[0],
377            Predicate::Range(RangePredicate {
378                range: Range {
379                    lower: Some(Bound {
380                        inclusive: true,
381                        value: encoded_int64(123),
382                    }),
383                    upper: Some(Bound {
384                        inclusive: true,
385                        value: encoded_int64(456),
386                    }),
387                }
388            })
389        );
390    }
391}