mito2/sst/index/inverted_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::index_apply::PredicatesIndexApplier;
29use index::inverted_index::search::predicate::Predicate;
30use index::target::IndexTarget;
31use mito_codec::index::IndexValueCodec;
32use mito_codec::row_converter::SortField;
33use object_store::ObjectStore;
34use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::RegionMetadata;
37use store_api::region_request::PathType;
38use store_api::storage::ColumnId;
39
40use crate::cache::file_cache::FileCacheRef;
41use crate::cache::index::inverted_index::InvertedIndexCacheRef;
42use crate::error::{
43    BuildIndexApplierSnafu, ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result,
44};
45use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
46use crate::sst::index::puffin_manager::PuffinManagerFactory;
47
48/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
49pub(crate) struct InvertedIndexApplierBuilder<'a> {
50    /// Directory of the table, required argument for constructing [`InvertedIndexApplier`].
51    table_dir: String,
52
53    /// Path type for generating file paths.
54    path_type: PathType,
55
56    /// Object store, required argument for constructing [`InvertedIndexApplier`].
57    object_store: ObjectStore,
58
59    /// File cache, required argument for constructing [`InvertedIndexApplier`].
60    file_cache: Option<FileCacheRef>,
61
62    /// Metadata of the region, used to get metadata like column type.
63    metadata: &'a RegionMetadata,
64
65    /// Column ids of the columns that are indexed.
66    indexed_column_ids: HashSet<ColumnId>,
67
68    /// Stores predicates during traversal on the Expr tree.
69    output: BTreeMap<ColumnId, Vec<Predicate>>,
70
71    /// The puffin manager factory.
72    puffin_manager_factory: PuffinManagerFactory,
73
74    /// Cache for inverted index.
75    inverted_index_cache: Option<InvertedIndexCacheRef>,
76
77    /// Cache for puffin metadata.
78    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
79}
80
81impl<'a> InvertedIndexApplierBuilder<'a> {
82    /// Creates a new [`InvertedIndexApplierBuilder`].
83    pub fn new(
84        table_dir: String,
85        path_type: PathType,
86        object_store: ObjectStore,
87        metadata: &'a RegionMetadata,
88        indexed_column_ids: HashSet<ColumnId>,
89        puffin_manager_factory: PuffinManagerFactory,
90    ) -> Self {
91        Self {
92            table_dir,
93            path_type,
94            object_store,
95            metadata,
96            indexed_column_ids,
97            output: BTreeMap::default(),
98            puffin_manager_factory,
99            file_cache: None,
100            inverted_index_cache: None,
101            puffin_metadata_cache: None,
102        }
103    }
104
105    /// Sets the file cache.
106    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
107        self.file_cache = file_cache;
108        self
109    }
110
111    /// Sets the puffin metadata cache.
112    pub fn with_puffin_metadata_cache(
113        mut self,
114        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
115    ) -> Self {
116        self.puffin_metadata_cache = puffin_metadata_cache;
117        self
118    }
119
120    /// Sets the inverted index cache.
121    pub fn with_inverted_index_cache(
122        mut self,
123        inverted_index_cache: Option<InvertedIndexCacheRef>,
124    ) -> Self {
125        self.inverted_index_cache = inverted_index_cache;
126        self
127    }
128
129    /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
130    /// the expressions provided. If no predicates match, returns `None`.
131    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
132        for expr in exprs {
133            self.traverse_and_collect(expr);
134        }
135
136        if self.output.is_empty() {
137            return Ok(None);
138        }
139
140        let predicates = self
141            .output
142            .iter()
143            .map(|(column_id, predicates)| {
144                (
145                    format!("{}", IndexTarget::ColumnId(*column_id)),
146                    predicates.clone(),
147                )
148            })
149            .collect::<Vec<_>>();
150        let applier = PredicatesIndexApplier::try_from(predicates);
151
152        Ok(Some(
153            InvertedIndexApplier::new(
154                self.table_dir,
155                self.path_type,
156                self.object_store,
157                Box::new(applier.context(BuildIndexApplierSnafu)?),
158                self.puffin_manager_factory,
159                self.output,
160            )
161            .with_file_cache(self.file_cache)
162            .with_puffin_metadata_cache(self.puffin_metadata_cache)
163            .with_index_cache(self.inverted_index_cache),
164        ))
165    }
166
167    /// Recursively traverses expressions to collect predicates.
168    /// Results are stored in `self.output`.
169    fn traverse_and_collect(&mut self, expr: &Expr) {
170        let res = match expr {
171            Expr::Between(between) => self.collect_between(between),
172
173            Expr::InList(in_list) => self.collect_inlist(in_list),
174            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
175                Operator::And => {
176                    self.traverse_and_collect(left);
177                    self.traverse_and_collect(right);
178                    Ok(())
179                }
180                Operator::Or => self.collect_or_eq_list(left, right),
181                Operator::Eq => self.collect_eq(left, right),
182                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
183                    self.collect_comparison_expr(left, op, right)
184                }
185                Operator::RegexMatch => self.collect_regex_match(left, right),
186                _ => Ok(()),
187            },
188
189            // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
190            _ => Ok(()),
191        };
192
193        if let Err(err) = res {
194            warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
195        }
196    }
197
198    /// Helper function to add a predicate to the output.
199    fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
200        self.output.entry(column_id).or_default().push(predicate);
201    }
202
203    /// Helper function to get the column id and the column type of a column.
204    /// Returns `None` if the column is not a tag column or if the column is ignored.
205    fn column_id_and_type(
206        &self,
207        column_name: &str,
208    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
209        let column = self
210            .metadata
211            .column_by_name(column_name)
212            .context(ColumnNotFoundSnafu {
213                column: column_name,
214            })?;
215
216        if !self.indexed_column_ids.contains(&column.column_id) {
217            return Ok(None);
218        }
219
220        Ok(Some((
221            column.column_id,
222            column.column_schema.data_type.clone(),
223        )))
224    }
225
226    /// Helper function to get a non-null literal.
227    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
228        match expr {
229            Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
230            _ => None,
231        }
232    }
233
234    /// Helper function to get the column name of a column expression.
235    fn column_name(expr: &Expr) -> Option<&str> {
236        match expr {
237            Expr::Column(column) => Some(&column.name),
238            _ => None,
239        }
240    }
241
242    /// Helper function to encode a literal into bytes.
243    fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
244        let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
245        let mut bytes = vec![];
246        let field = SortField::new(data_type);
247        IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
248            .context(EncodeSnafu)?;
249        Ok(bytes)
250    }
251}
252
253#[cfg(test)]
254mod tests {
255    use api::v1::SemanticType;
256    use datafusion_common::Column;
257    use datafusion_expr::{Between, Literal};
258    use datatypes::data_type::ConcreteDataType;
259    use datatypes::schema::ColumnSchema;
260    use index::inverted_index::search::predicate::{
261        Bound, Range, RangePredicate, RegexMatchPredicate,
262    };
263    use object_store::ObjectStore;
264    use object_store::services::Memory;
265    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
266    use store_api::storage::RegionId;
267
268    use super::*;
269
270    pub(crate) fn test_region_metadata() -> RegionMetadata {
271        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
272        builder
273            .push_column_metadata(ColumnMetadata {
274                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
275                semantic_type: SemanticType::Tag,
276                column_id: 1,
277            })
278            .push_column_metadata(ColumnMetadata {
279                column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
280                semantic_type: SemanticType::Tag,
281                column_id: 2,
282            })
283            .push_column_metadata(ColumnMetadata {
284                column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
285                semantic_type: SemanticType::Field,
286                column_id: 3,
287            })
288            .push_column_metadata(ColumnMetadata {
289                column_schema: ColumnSchema::new(
290                    "d",
291                    ConcreteDataType::timestamp_millisecond_datatype(),
292                    false,
293                ),
294                semantic_type: SemanticType::Timestamp,
295                column_id: 4,
296            })
297            .primary_key(vec![1, 2]);
298        builder.build().unwrap()
299    }
300
301    pub(crate) fn test_object_store() -> ObjectStore {
302        ObjectStore::new(Memory::default()).unwrap().finish()
303    }
304
305    pub(crate) fn tag_column() -> Expr {
306        Expr::Column(Column::from_name("a"))
307    }
308
309    pub(crate) fn tag_column2() -> Expr {
310        Expr::Column(Column::from_name("b"))
311    }
312
313    pub(crate) fn field_column() -> Expr {
314        Expr::Column(Column::from_name("c"))
315    }
316
317    pub(crate) fn nonexistent_column() -> Expr {
318        Expr::Column(Column::from_name("nonexistence"))
319    }
320
321    pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
322        s.into().lit()
323    }
324
325    pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
326        i.into().lit()
327    }
328
329    pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
330        let mut bytes = vec![];
331        IndexValueCodec::encode_nonnull_value(
332            Value::from(s.into()).as_value_ref(),
333            &SortField::new(ConcreteDataType::string_datatype()),
334            &mut bytes,
335        )
336        .unwrap();
337        bytes
338    }
339
340    pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
341        let mut bytes = vec![];
342        IndexValueCodec::encode_nonnull_value(
343            Value::from(s.into()).as_value_ref(),
344            &SortField::new(ConcreteDataType::int64_datatype()),
345            &mut bytes,
346        )
347        .unwrap();
348        bytes
349    }
350
351    #[test]
352    fn test_collect_and_basic() {
353        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
354
355        let metadata = test_region_metadata();
356        let mut builder = InvertedIndexApplierBuilder::new(
357            "test".to_string(),
358            PathType::Bare,
359            test_object_store(),
360            &metadata,
361            HashSet::from_iter([1, 2, 3]),
362            facotry,
363        );
364
365        let expr = Expr::BinaryExpr(BinaryExpr {
366            left: Box::new(Expr::BinaryExpr(BinaryExpr {
367                left: Box::new(tag_column()),
368                op: Operator::RegexMatch,
369                right: Box::new(string_lit("bar")),
370            })),
371            op: Operator::And,
372            right: Box::new(Expr::Between(Between {
373                expr: Box::new(tag_column2()),
374                negated: false,
375                low: Box::new(int64_lit(123)),
376                high: Box::new(int64_lit(456)),
377            })),
378        });
379
380        builder.traverse_and_collect(&expr);
381        let predicates = builder.output.get(&1).unwrap();
382        assert_eq!(predicates.len(), 1);
383        assert_eq!(
384            predicates[0],
385            Predicate::RegexMatch(RegexMatchPredicate {
386                pattern: "bar".to_string()
387            })
388        );
389        let predicates = builder.output.get(&2).unwrap();
390        assert_eq!(predicates.len(), 1);
391        assert_eq!(
392            predicates[0],
393            Predicate::Range(RangePredicate {
394                range: Range {
395                    lower: Some(Bound {
396                        inclusive: true,
397                        value: encoded_int64(123),
398                    }),
399                    upper: Some(Bound {
400                        inclusive: true,
401                        value: encoded_int64(456),
402                    }),
403                }
404            })
405        );
406    }
407}