mito2/sst/index/inverted_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use mito_codec::index::IndexValueCodec;
31use mito_codec::row_converter::SortField;
32use object_store::ObjectStore;
33use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::RegionMetadata;
36use store_api::region_request::PathType;
37use store_api::storage::ColumnId;
38
39use crate::cache::file_cache::FileCacheRef;
40use crate::cache::index::inverted_index::InvertedIndexCacheRef;
41use crate::error::{
42    BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
43};
44use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
45use crate::sst::index::puffin_manager::PuffinManagerFactory;
46
47/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
48pub(crate) struct InvertedIndexApplierBuilder<'a> {
49    /// Directory of the table, required argument for constructing [`InvertedIndexApplier`].
50    table_dir: String,
51
52    /// Path type for generating file paths.
53    path_type: PathType,
54
55    /// Object store, required argument for constructing [`InvertedIndexApplier`].
56    object_store: ObjectStore,
57
58    /// File cache, required argument for constructing [`InvertedIndexApplier`].
59    file_cache: Option<FileCacheRef>,
60
61    /// Metadata of the region, used to get metadata like column type.
62    metadata: &'a RegionMetadata,
63
64    /// Column ids of the columns that are indexed.
65    indexed_column_ids: HashSet<ColumnId>,
66
67    /// Stores predicates during traversal on the Expr tree.
68    output: BTreeMap<ColumnId, Vec<Predicate>>,
69
70    /// The puffin manager factory.
71    puffin_manager_factory: PuffinManagerFactory,
72
73    /// Cache for inverted index.
74    inverted_index_cache: Option<InvertedIndexCacheRef>,
75
76    /// Cache for puffin metadata.
77    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
78}
79
80impl<'a> InvertedIndexApplierBuilder<'a> {
81    /// Creates a new [`InvertedIndexApplierBuilder`].
82    pub fn new(
83        table_dir: String,
84        path_type: PathType,
85        object_store: ObjectStore,
86        metadata: &'a RegionMetadata,
87        indexed_column_ids: HashSet<ColumnId>,
88        puffin_manager_factory: PuffinManagerFactory,
89    ) -> Self {
90        Self {
91            table_dir,
92            path_type,
93            object_store,
94            metadata,
95            indexed_column_ids,
96            output: BTreeMap::default(),
97            puffin_manager_factory,
98            file_cache: None,
99            inverted_index_cache: None,
100            puffin_metadata_cache: None,
101        }
102    }
103
104    /// Sets the file cache.
105    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
106        self.file_cache = file_cache;
107        self
108    }
109
110    /// Sets the puffin metadata cache.
111    pub fn with_puffin_metadata_cache(
112        mut self,
113        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
114    ) -> Self {
115        self.puffin_metadata_cache = puffin_metadata_cache;
116        self
117    }
118
119    /// Sets the inverted index cache.
120    pub fn with_inverted_index_cache(
121        mut self,
122        inverted_index_cache: Option<InvertedIndexCacheRef>,
123    ) -> Self {
124        self.inverted_index_cache = inverted_index_cache;
125        self
126    }
127
128    /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
129    /// the expressions provided. If no predicates match, returns `None`.
130    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
131        for expr in exprs {
132            self.traverse_and_collect(expr);
133        }
134
135        if self.output.is_empty() {
136            return Ok(None);
137        }
138
139        let predicates = self
140            .output
141            .iter()
142            .map(|(column_id, predicates)| (column_id.to_string(), predicates.clone()))
143            .collect();
144        let applier = PredicatesIndexApplier::try_from(predicates);
145
146        Ok(Some(
147            InvertedIndexApplier::new(
148                self.table_dir,
149                self.path_type,
150                self.object_store,
151                Box::new(applier.context(BuildIndexApplierSnafu)?),
152                self.puffin_manager_factory,
153                self.output,
154            )
155            .with_file_cache(self.file_cache)
156            .with_puffin_metadata_cache(self.puffin_metadata_cache)
157            .with_index_cache(self.inverted_index_cache),
158        ))
159    }
160
161    /// Recursively traverses expressions to collect predicates.
162    /// Results are stored in `self.output`.
163    fn traverse_and_collect(&mut self, expr: &Expr) {
164        let res = match expr {
165            Expr::Between(between) => self.collect_between(between),
166
167            Expr::InList(in_list) => self.collect_inlist(in_list),
168            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
169                Operator::And => {
170                    self.traverse_and_collect(left);
171                    self.traverse_and_collect(right);
172                    Ok(())
173                }
174                Operator::Or => self.collect_or_eq_list(left, right),
175                Operator::Eq => self.collect_eq(left, right),
176                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
177                    self.collect_comparison_expr(left, op, right)
178                }
179                Operator::RegexMatch => self.collect_regex_match(left, right),
180                _ => Ok(()),
181            },
182
183            // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
184            _ => Ok(()),
185        };
186
187        if let Err(err) = res {
188            warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
189        }
190    }
191
192    /// Helper function to add a predicate to the output.
193    fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
194        self.output.entry(column_id).or_default().push(predicate);
195    }
196
197    /// Helper function to get the column id and the column type of a column.
198    /// Returns `None` if the column is not a tag column or if the column is ignored.
199    fn column_id_and_type(
200        &self,
201        column_name: &str,
202    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
203        let column = self
204            .metadata
205            .column_by_name(column_name)
206            .context(ColumnNotFoundSnafu {
207                column: column_name,
208            })?;
209
210        if !self.indexed_column_ids.contains(&column.column_id) {
211            return Ok(None);
212        }
213
214        Ok(Some((
215            column.column_id,
216            column.column_schema.data_type.clone(),
217        )))
218    }
219
220    /// Helper function to get a non-null literal.
221    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
222        match expr {
223            Expr::Literal(lit) if !lit.is_null() => Some(lit),
224            _ => None,
225        }
226    }
227
228    /// Helper function to get the column name of a column expression.
229    fn column_name(expr: &Expr) -> Option<&str> {
230        match expr {
231            Expr::Column(column) => Some(&column.name),
232            _ => None,
233        }
234    }
235
236    /// Helper function to encode a literal into bytes.
237    fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
238        let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
239        let mut bytes = vec![];
240        let field = SortField::new(data_type);
241        IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
242            .context(EncodeSnafu)?;
243        Ok(bytes)
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use api::v1::SemanticType;
250    use datafusion_common::Column;
251    use datafusion_expr::Between;
252    use datatypes::data_type::ConcreteDataType;
253    use datatypes::schema::ColumnSchema;
254    use index::inverted_index::search::predicate::{
255        Bound, Range, RangePredicate, RegexMatchPredicate,
256    };
257    use object_store::services::Memory;
258    use object_store::ObjectStore;
259    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
260    use store_api::storage::RegionId;
261
262    use super::*;
263
264    pub(crate) fn test_region_metadata() -> RegionMetadata {
265        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
266        builder
267            .push_column_metadata(ColumnMetadata {
268                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
269                semantic_type: SemanticType::Tag,
270                column_id: 1,
271            })
272            .push_column_metadata(ColumnMetadata {
273                column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
274                semantic_type: SemanticType::Tag,
275                column_id: 2,
276            })
277            .push_column_metadata(ColumnMetadata {
278                column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
279                semantic_type: SemanticType::Field,
280                column_id: 3,
281            })
282            .push_column_metadata(ColumnMetadata {
283                column_schema: ColumnSchema::new(
284                    "d",
285                    ConcreteDataType::timestamp_millisecond_datatype(),
286                    false,
287                ),
288                semantic_type: SemanticType::Timestamp,
289                column_id: 4,
290            })
291            .primary_key(vec![1, 2]);
292        builder.build().unwrap()
293    }
294
295    pub(crate) fn test_object_store() -> ObjectStore {
296        ObjectStore::new(Memory::default()).unwrap().finish()
297    }
298
299    pub(crate) fn tag_column() -> Expr {
300        Expr::Column(Column::from_name("a"))
301    }
302
303    pub(crate) fn tag_column2() -> Expr {
304        Expr::Column(Column::from_name("b"))
305    }
306
307    pub(crate) fn field_column() -> Expr {
308        Expr::Column(Column::from_name("c"))
309    }
310
311    pub(crate) fn nonexistent_column() -> Expr {
312        Expr::Column(Column::from_name("nonexistence"))
313    }
314
315    pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
316        Expr::Literal(ScalarValue::Utf8(Some(s.into())))
317    }
318
319    pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
320        Expr::Literal(ScalarValue::Int64(Some(i.into())))
321    }
322
323    pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
324        let mut bytes = vec![];
325        IndexValueCodec::encode_nonnull_value(
326            Value::from(s.into()).as_value_ref(),
327            &SortField::new(ConcreteDataType::string_datatype()),
328            &mut bytes,
329        )
330        .unwrap();
331        bytes
332    }
333
334    pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
335        let mut bytes = vec![];
336        IndexValueCodec::encode_nonnull_value(
337            Value::from(s.into()).as_value_ref(),
338            &SortField::new(ConcreteDataType::int64_datatype()),
339            &mut bytes,
340        )
341        .unwrap();
342        bytes
343    }
344
345    #[test]
346    fn test_collect_and_basic() {
347        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
348
349        let metadata = test_region_metadata();
350        let mut builder = InvertedIndexApplierBuilder::new(
351            "test".to_string(),
352            PathType::Bare,
353            test_object_store(),
354            &metadata,
355            HashSet::from_iter([1, 2, 3]),
356            facotry,
357        );
358
359        let expr = Expr::BinaryExpr(BinaryExpr {
360            left: Box::new(Expr::BinaryExpr(BinaryExpr {
361                left: Box::new(tag_column()),
362                op: Operator::RegexMatch,
363                right: Box::new(string_lit("bar")),
364            })),
365            op: Operator::And,
366            right: Box::new(Expr::Between(Between {
367                expr: Box::new(tag_column2()),
368                negated: false,
369                low: Box::new(int64_lit(123)),
370                high: Box::new(int64_lit(456)),
371            })),
372        });
373
374        builder.traverse_and_collect(&expr);
375        let predicates = builder.output.get(&1).unwrap();
376        assert_eq!(predicates.len(), 1);
377        assert_eq!(
378            predicates[0],
379            Predicate::RegexMatch(RegexMatchPredicate {
380                pattern: "bar".to_string()
381            })
382        );
383        let predicates = builder.output.get(&2).unwrap();
384        assert_eq!(predicates.len(), 1);
385        assert_eq!(
386            predicates[0],
387            Predicate::Range(RangePredicate {
388                range: Range {
389                    lower: Some(Bound {
390                        inclusive: true,
391                        value: encoded_int64(123),
392                    }),
393                    upper: Some(Bound {
394                        inclusive: true,
395                        value: encoded_int64(456),
396                    }),
397                }
398            })
399        );
400    }
401}