mito2/sst/index/inverted_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use mito_codec::index::IndexValueCodec;
31use mito_codec::row_converter::SortField;
32use object_store::ObjectStore;
33use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadata;
36use store_api::storage::ColumnId;
37
38use crate::cache::file_cache::FileCacheRef;
39use crate::cache::index::inverted_index::InvertedIndexCacheRef;
40use crate::error::{
41    BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
42};
43use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
44use crate::sst::index::puffin_manager::PuffinManagerFactory;
45
46/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
47pub(crate) struct InvertedIndexApplierBuilder<'a> {
48    /// Directory of the region, required argument for constructing [`InvertedIndexApplier`].
49    region_dir: String,
50
51    /// Object store, required argument for constructing [`InvertedIndexApplier`].
52    object_store: ObjectStore,
53
54    /// File cache, required argument for constructing [`InvertedIndexApplier`].
55    file_cache: Option<FileCacheRef>,
56
57    /// Metadata of the region, used to get metadata like column type.
58    metadata: &'a RegionMetadata,
59
60    /// Column ids of the columns that are indexed.
61    indexed_column_ids: HashSet<ColumnId>,
62
63    /// Stores predicates during traversal on the Expr tree.
64    output: BTreeMap<ColumnId, Vec<Predicate>>,
65
66    /// The puffin manager factory.
67    puffin_manager_factory: PuffinManagerFactory,
68
69    /// Cache for inverted index.
70    inverted_index_cache: Option<InvertedIndexCacheRef>,
71
72    /// Cache for puffin metadata.
73    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
74}
75
76impl<'a> InvertedIndexApplierBuilder<'a> {
77    /// Creates a new [`InvertedIndexApplierBuilder`].
78    pub fn new(
79        region_dir: String,
80        object_store: ObjectStore,
81        metadata: &'a RegionMetadata,
82        indexed_column_ids: HashSet<ColumnId>,
83        puffin_manager_factory: PuffinManagerFactory,
84    ) -> Self {
85        Self {
86            region_dir,
87            object_store,
88            metadata,
89            indexed_column_ids,
90            output: BTreeMap::default(),
91            puffin_manager_factory,
92            file_cache: None,
93            inverted_index_cache: None,
94            puffin_metadata_cache: None,
95        }
96    }
97
98    /// Sets the file cache.
99    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
100        self.file_cache = file_cache;
101        self
102    }
103
104    /// Sets the puffin metadata cache.
105    pub fn with_puffin_metadata_cache(
106        mut self,
107        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
108    ) -> Self {
109        self.puffin_metadata_cache = puffin_metadata_cache;
110        self
111    }
112
113    /// Sets the inverted index cache.
114    pub fn with_inverted_index_cache(
115        mut self,
116        inverted_index_cache: Option<InvertedIndexCacheRef>,
117    ) -> Self {
118        self.inverted_index_cache = inverted_index_cache;
119        self
120    }
121
122    /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
123    /// the expressions provided. If no predicates match, returns `None`.
124    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
125        for expr in exprs {
126            self.traverse_and_collect(expr);
127        }
128
129        if self.output.is_empty() {
130            return Ok(None);
131        }
132
133        let predicates = self
134            .output
135            .iter()
136            .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
137            .collect();
138        let applier = PredicatesIndexApplier::try_from(predicates);
139
140        Ok(Some(
141            InvertedIndexApplier::new(
142                self.region_dir,
143                self.metadata.region_id,
144                self.object_store,
145                Box::new(applier.context(BuildIndexApplierSnafu)?),
146                self.puffin_manager_factory,
147                self.output,
148            )
149            .with_file_cache(self.file_cache)
150            .with_puffin_metadata_cache(self.puffin_metadata_cache)
151            .with_index_cache(self.inverted_index_cache),
152        ))
153    }
154
155    /// Recursively traverses expressions to collect predicates.
156    /// Results are stored in `self.output`.
157    fn traverse_and_collect(&mut self, expr: &Expr) {
158        let res = match expr {
159            Expr::Between(between) => self.collect_between(between),
160
161            Expr::InList(in_list) => self.collect_inlist(in_list),
162            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
163                Operator::And => {
164                    self.traverse_and_collect(left);
165                    self.traverse_and_collect(right);
166                    Ok(())
167                }
168                Operator::Or => self.collect_or_eq_list(left, right),
169                Operator::Eq => self.collect_eq(left, right),
170                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
171                    self.collect_comparison_expr(left, op, right)
172                }
173                Operator::RegexMatch => self.collect_regex_match(left, right),
174                _ => Ok(()),
175            },
176
177            // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
178            _ => Ok(()),
179        };
180
181        if let Err(err) = res {
182            warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
183        }
184    }
185
186    /// Helper function to add a predicate to the output.
187    fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
188        self.output.entry(column_id).or_default().push(predicate);
189    }
190
191    /// Helper function to get the column id and the column type of a column.
192    /// Returns `None` if the column is not a tag column or if the column is ignored.
193    fn column_id_and_type(
194        &self,
195        column_name: &str,
196    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
197        let column = self
198            .metadata
199            .column_by_name(column_name)
200            .context(ColumnNotFoundSnafu {
201                column: column_name,
202            })?;
203
204        if !self.indexed_column_ids.contains(&column.column_id) {
205            return Ok(None);
206        }
207
208        Ok(Some((
209            column.column_id,
210            column.column_schema.data_type.clone(),
211        )))
212    }
213
214    /// Helper function to get a non-null literal.
215    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
216        match expr {
217            Expr::Literal(lit) if !lit.is_null() => Some(lit),
218            _ => None,
219        }
220    }
221
222    /// Helper function to get the column name of a column expression.
223    fn column_name(expr: &Expr) -> Option<&str> {
224        match expr {
225            Expr::Column(column) => Some(&column.name),
226            _ => None,
227        }
228    }
229
230    /// Helper function to encode a literal into bytes.
231    fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
232        let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
233        let mut bytes = vec![];
234        let field = SortField::new(data_type);
235        IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
236            .context(EncodeSnafu)?;
237        Ok(bytes)
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use api::v1::SemanticType;
244    use datafusion_common::Column;
245    use datafusion_expr::Between;
246    use datatypes::data_type::ConcreteDataType;
247    use datatypes::schema::ColumnSchema;
248    use index::inverted_index::search::predicate::{
249        Bound, Range, RangePredicate, RegexMatchPredicate,
250    };
251    use object_store::services::Memory;
252    use object_store::ObjectStore;
253    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
254    use store_api::storage::RegionId;
255
256    use super::*;
257
258    pub(crate) fn test_region_metadata() -> RegionMetadata {
259        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
260        builder
261            .push_column_metadata(ColumnMetadata {
262                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
263                semantic_type: SemanticType::Tag,
264                column_id: 1,
265            })
266            .push_column_metadata(ColumnMetadata {
267                column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
268                semantic_type: SemanticType::Tag,
269                column_id: 2,
270            })
271            .push_column_metadata(ColumnMetadata {
272                column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
273                semantic_type: SemanticType::Field,
274                column_id: 3,
275            })
276            .push_column_metadata(ColumnMetadata {
277                column_schema: ColumnSchema::new(
278                    "d",
279                    ConcreteDataType::timestamp_millisecond_datatype(),
280                    false,
281                ),
282                semantic_type: SemanticType::Timestamp,
283                column_id: 4,
284            })
285            .primary_key(vec![1, 2]);
286        builder.build().unwrap()
287    }
288
289    pub(crate) fn test_object_store() -> ObjectStore {
290        ObjectStore::new(Memory::default()).unwrap().finish()
291    }
292
293    pub(crate) fn tag_column() -> Expr {
294        Expr::Column(Column::from_name("a"))
295    }
296
297    pub(crate) fn tag_column2() -> Expr {
298        Expr::Column(Column::from_name("b"))
299    }
300
301    pub(crate) fn field_column() -> Expr {
302        Expr::Column(Column::from_name("c"))
303    }
304
305    pub(crate) fn nonexistent_column() -> Expr {
306        Expr::Column(Column::from_name("nonexistence"))
307    }
308
309    pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
310        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
311    }
312
313    pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
314        Expr::Literal(ScalarValue::Int64(Some(i.into())))
315    }
316
317    pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
318        let mut bytes = vec![];
319        IndexValueCodec::encode_nonnull_value(
320            Value::from(s.into()).as_value_ref(),
321            &SortField::new(ConcreteDataType::string_datatype()),
322            &mut bytes,
323        )
324        .unwrap();
325        bytes
326    }
327
328    pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
329        let mut bytes = vec![];
330        IndexValueCodec::encode_nonnull_value(
331            Value::from(s.into()).as_value_ref(),
332            &SortField::new(ConcreteDataType::int64_datatype()),
333            &mut bytes,
334        )
335        .unwrap();
336        bytes
337    }
338
339    #[test]
340    fn test_collect_and_basic() {
341        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
342
343        let metadata = test_region_metadata();
344        let mut builder = InvertedIndexApplierBuilder::new(
345            "test".to_string(),
346            test_object_store(),
347            &metadata,
348            HashSet::from_iter([1, 2, 3]),
349            facotry,
350        );
351
352        let expr = Expr::BinaryExpr(BinaryExpr {
353            left: Box::new(Expr::BinaryExpr(BinaryExpr {
354                left: Box::new(tag_column()),
355                op: Operator::RegexMatch,
356                right: Box::new(string_lit("bar")),
357            })),
358            op: Operator::And,
359            right: Box::new(Expr::Between(Between {
360                expr: Box::new(tag_column2()),
361                negated: false,
362                low: Box::new(int64_lit(123)),
363                high: Box::new(int64_lit(456)),
364            })),
365        });
366
367        builder.traverse_and_collect(&expr);
368        let predicates = builder.output.get(&1).unwrap();
369        assert_eq!(predicates.len(), 1);
370        assert_eq!(
371            predicates[0],
372            Predicate::RegexMatch(RegexMatchPredicate {
373                pattern: "bar".to_string()
374            })
375        );
376        let predicates = builder.output.get(&2).unwrap();
377        assert_eq!(predicates.len(), 1);
378        assert_eq!(
379            predicates[0],
380            Predicate::Range(RangePredicate {
381                range: Range {
382                    lower: Some(Bound {
383                        inclusive: true,
384                        value: encoded_int64(123),
385                    }),
386                    upper: Some(Bound {
387                        inclusive: true,
388                        value: encoded_int64(456),
389                    }),
390                }
391            })
392        );
393    }
394}