Skip to main content

mito2/sst/index/inverted_index/applier/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod between;
16mod comparison;
17mod eq_list;
18mod in_list;
19mod regex_match;
20
21use std::collections::{BTreeMap, HashSet};
22
23use common_telemetry::warn;
24use datafusion_common::ScalarValue;
25use datafusion_expr::{BinaryExpr, Expr, Operator};
26use datatypes::data_type::ConcreteDataType;
27use datatypes::value::Value;
28use index::inverted_index::search::predicate::Predicate;
29use mito_codec::index::IndexValueCodec;
30use mito_codec::row_converter::SortField;
31use object_store::ObjectStore;
32use puffin::puffin_manager::cache::PuffinMetadataCacheRef;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::RegionMetadata;
35use store_api::region_request::PathType;
36use store_api::storage::ColumnId;
37
38use crate::cache::file_cache::FileCacheRef;
39use crate::cache::index::inverted_index::InvertedIndexCacheRef;
40use crate::error::{ColumnNotFoundSnafu, ConvertValueSnafu, EncodeSnafu, Result};
41use crate::sst::index::inverted_index::applier::InvertedIndexApplier;
42use crate::sst::index::puffin_manager::PuffinManagerFactory;
43
44/// Constructs an [`InvertedIndexApplier`] which applies predicates to SST files during scan.
45pub(crate) struct InvertedIndexApplierBuilder<'a> {
46    /// Directory of the table, required argument for constructing [`InvertedIndexApplier`].
47    table_dir: String,
48
49    /// Path type for generating file paths.
50    path_type: PathType,
51
52    /// Object store, required argument for constructing [`InvertedIndexApplier`].
53    object_store: ObjectStore,
54
55    /// File cache, required argument for constructing [`InvertedIndexApplier`].
56    file_cache: Option<FileCacheRef>,
57
58    /// Metadata of the region, used to get metadata like column type.
59    metadata: &'a RegionMetadata,
60
61    /// Column ids of the columns that are indexed.
62    indexed_column_ids: HashSet<ColumnId>,
63
64    /// Stores predicates during traversal on the Expr tree.
65    output: BTreeMap<ColumnId, Vec<Predicate>>,
66
67    /// The puffin manager factory.
68    puffin_manager_factory: PuffinManagerFactory,
69
70    /// Cache for inverted index.
71    inverted_index_cache: Option<InvertedIndexCacheRef>,
72
73    /// Cache for puffin metadata.
74    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
75}
76
77impl<'a> InvertedIndexApplierBuilder<'a> {
78    /// Creates a new [`InvertedIndexApplierBuilder`].
79    pub fn new(
80        table_dir: String,
81        path_type: PathType,
82        object_store: ObjectStore,
83        metadata: &'a RegionMetadata,
84        indexed_column_ids: HashSet<ColumnId>,
85        puffin_manager_factory: PuffinManagerFactory,
86    ) -> Self {
87        Self {
88            table_dir,
89            path_type,
90            object_store,
91            metadata,
92            indexed_column_ids,
93            output: BTreeMap::default(),
94            puffin_manager_factory,
95            file_cache: None,
96            inverted_index_cache: None,
97            puffin_metadata_cache: None,
98        }
99    }
100
101    /// Sets the file cache.
102    pub fn with_file_cache(mut self, file_cache: Option<FileCacheRef>) -> Self {
103        self.file_cache = file_cache;
104        self
105    }
106
107    /// Sets the puffin metadata cache.
108    pub fn with_puffin_metadata_cache(
109        mut self,
110        puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
111    ) -> Self {
112        self.puffin_metadata_cache = puffin_metadata_cache;
113        self
114    }
115
116    /// Sets the inverted index cache.
117    pub fn with_inverted_index_cache(
118        mut self,
119        inverted_index_cache: Option<InvertedIndexCacheRef>,
120    ) -> Self {
121        self.inverted_index_cache = inverted_index_cache;
122        self
123    }
124
125    /// Consumes the builder to construct an [`InvertedIndexApplier`], optionally returned based on
126    /// the expressions provided. If no predicates match, returns `None`.
127    pub fn build(mut self, exprs: &[Expr]) -> Result<Option<InvertedIndexApplier>> {
128        for expr in exprs {
129            self.traverse_and_collect(expr);
130        }
131
132        if self.output.is_empty() {
133            return Ok(None);
134        }
135
136        let expected_predicate_column_types = self.expected_predicate_column_types();
137
138        Ok(Some(
139            InvertedIndexApplier::new(
140                self.table_dir,
141                self.path_type,
142                self.object_store,
143                self.puffin_manager_factory,
144                self.output,
145                expected_predicate_column_types,
146            )?
147            .with_file_cache(self.file_cache)
148            .with_puffin_metadata_cache(self.puffin_metadata_cache)
149            .with_index_cache(self.inverted_index_cache),
150        ))
151    }
152
153    /// Returns `(column_id, data_type)` pairs for predicate columns
154    /// collected in `self.output`.
155    ///
156    /// The data types are resolved from the latest region manifest. Columns
157    /// that no longer exist in the latest metadata are skipped.
158    fn expected_predicate_column_types(&self) -> BTreeMap<ColumnId, ConcreteDataType> {
159        self.output
160            .keys()
161            .filter_map(|col_id| {
162                let col = self.metadata.column_by_id(*col_id)?;
163                Some((*col_id, col.column_schema.data_type.clone()))
164            })
165            .collect()
166    }
167
168    /// Recursively traverses expressions to collect predicates.
169    /// Results are stored in `self.output`.
170    fn traverse_and_collect(&mut self, expr: &Expr) {
171        let res = match expr {
172            Expr::Between(between) => self.collect_between(between),
173
174            Expr::InList(in_list) => self.collect_inlist(in_list),
175            Expr::BinaryExpr(BinaryExpr { left, op, right }) => match op {
176                Operator::And => {
177                    self.traverse_and_collect(left);
178                    self.traverse_and_collect(right);
179                    Ok(())
180                }
181                Operator::Or => self.collect_or_eq_list(left, right),
182                Operator::Eq => self.collect_eq(left, right),
183                Operator::Lt | Operator::LtEq | Operator::Gt | Operator::GtEq => {
184                    self.collect_comparison_expr(left, op, right)
185                }
186                Operator::RegexMatch => self.collect_regex_match(left, right),
187                _ => Ok(()),
188            },
189
190            // TODO(zhongzc): support more expressions, e.g. IsNull, IsNotNull, ...
191            _ => Ok(()),
192        };
193
194        if let Err(err) = res {
195            warn!(err; "Failed to collect predicates, ignore it. expr: {expr}");
196        }
197    }
198
199    /// Helper function to add a predicate to the output.
200    fn add_predicate(&mut self, column_id: ColumnId, predicate: Predicate) {
201        self.output.entry(column_id).or_default().push(predicate);
202    }
203
204    /// Helper function to get the column id and the column type of a column.
205    /// Returns `None` if the column is not a tag column or if the column is ignored.
206    fn column_id_and_type(
207        &self,
208        column_name: &str,
209    ) -> Result<Option<(ColumnId, ConcreteDataType)>> {
210        let column = self
211            .metadata
212            .column_by_name(column_name)
213            .context(ColumnNotFoundSnafu {
214                column: column_name,
215            })?;
216
217        if !self.indexed_column_ids.contains(&column.column_id) {
218            return Ok(None);
219        }
220
221        Ok(Some((
222            column.column_id,
223            column.column_schema.data_type.clone(),
224        )))
225    }
226
227    /// Helper function to get a non-null literal.
228    fn nonnull_lit(expr: &Expr) -> Option<&ScalarValue> {
229        match expr {
230            Expr::Literal(lit, _) if !lit.is_null() => Some(lit),
231            _ => None,
232        }
233    }
234
235    /// Helper function to get the column name of a column expression.
236    fn column_name(expr: &Expr) -> Option<&str> {
237        match expr {
238            Expr::Column(column) => Some(&column.name),
239            _ => None,
240        }
241    }
242
243    /// Helper function to encode a literal into bytes.
244    fn encode_lit(lit: &ScalarValue, data_type: ConcreteDataType) -> Result<Vec<u8>> {
245        let value = Value::try_from(lit.clone()).context(ConvertValueSnafu)?;
246        let mut bytes = vec![];
247        let field = SortField::new(data_type);
248        IndexValueCodec::encode_nonnull_value(value.as_value_ref(), &field, &mut bytes)
249            .context(EncodeSnafu)?;
250        Ok(bytes)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use api::v1::SemanticType;
257    use datafusion_common::Column;
258    use datafusion_expr::{Between, Literal};
259    use datatypes::data_type::ConcreteDataType;
260    use datatypes::schema::ColumnSchema;
261    use index::inverted_index::search::predicate::{
262        Bound, Range, RangePredicate, RegexMatchPredicate,
263    };
264    use object_store::ObjectStore;
265    use object_store::services::Memory;
266    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
267    use store_api::storage::RegionId;
268
269    use super::*;
270
271    pub(crate) fn test_region_metadata() -> RegionMetadata {
272        let mut builder = RegionMetadataBuilder::new(RegionId::new(1234, 5678));
273        builder
274            .push_column_metadata(ColumnMetadata {
275                column_schema: ColumnSchema::new("a", ConcreteDataType::string_datatype(), false),
276                semantic_type: SemanticType::Tag,
277                column_id: 1,
278            })
279            .push_column_metadata(ColumnMetadata {
280                column_schema: ColumnSchema::new("b", ConcreteDataType::int64_datatype(), false),
281                semantic_type: SemanticType::Tag,
282                column_id: 2,
283            })
284            .push_column_metadata(ColumnMetadata {
285                column_schema: ColumnSchema::new("c", ConcreteDataType::string_datatype(), false),
286                semantic_type: SemanticType::Field,
287                column_id: 3,
288            })
289            .push_column_metadata(ColumnMetadata {
290                column_schema: ColumnSchema::new(
291                    "d",
292                    ConcreteDataType::timestamp_millisecond_datatype(),
293                    false,
294                ),
295                semantic_type: SemanticType::Timestamp,
296                column_id: 4,
297            })
298            .primary_key(vec![1, 2]);
299        builder.build().unwrap()
300    }
301
302    pub(crate) fn test_object_store() -> ObjectStore {
303        ObjectStore::new(Memory::default()).unwrap().finish()
304    }
305
306    pub(crate) fn tag_column() -> Expr {
307        Expr::Column(Column::from_name("a"))
308    }
309
310    pub(crate) fn tag_column2() -> Expr {
311        Expr::Column(Column::from_name("b"))
312    }
313
314    pub(crate) fn field_column() -> Expr {
315        Expr::Column(Column::from_name("c"))
316    }
317
318    pub(crate) fn nonexistent_column() -> Expr {
319        Expr::Column(Column::from_name("nonexistence"))
320    }
321
322    pub(crate) fn string_lit(s: impl Into<String>) -> Expr {
323        s.into().lit()
324    }
325
326    pub(crate) fn int64_lit(i: impl Into<i64>) -> Expr {
327        i.into().lit()
328    }
329
330    pub(crate) fn encoded_string(s: impl Into<String>) -> Vec<u8> {
331        let mut bytes = vec![];
332        IndexValueCodec::encode_nonnull_value(
333            Value::from(s.into()).as_value_ref(),
334            &SortField::new(ConcreteDataType::string_datatype()),
335            &mut bytes,
336        )
337        .unwrap();
338        bytes
339    }
340
341    pub(crate) fn encoded_int64(s: impl Into<i64>) -> Vec<u8> {
342        let mut bytes = vec![];
343        IndexValueCodec::encode_nonnull_value(
344            Value::from(s.into()).as_value_ref(),
345            &SortField::new(ConcreteDataType::int64_datatype()),
346            &mut bytes,
347        )
348        .unwrap();
349        bytes
350    }
351
352    #[test]
353    fn test_collect_and_basic() {
354        let (_d, factory) = PuffinManagerFactory::new_for_test_block("test_collect_and_basic_");
355
356        let metadata = test_region_metadata();
357        let mut builder = InvertedIndexApplierBuilder::new(
358            "test".to_string(),
359            PathType::Bare,
360            test_object_store(),
361            &metadata,
362            HashSet::from_iter([1, 2, 3]),
363            factory,
364        );
365
366        let expr = Expr::BinaryExpr(BinaryExpr {
367            left: Box::new(Expr::BinaryExpr(BinaryExpr {
368                left: Box::new(tag_column()),
369                op: Operator::RegexMatch,
370                right: Box::new(string_lit("bar")),
371            })),
372            op: Operator::And,
373            right: Box::new(Expr::Between(Between {
374                expr: Box::new(tag_column2()),
375                negated: false,
376                low: Box::new(int64_lit(123)),
377                high: Box::new(int64_lit(456)),
378            })),
379        });
380
381        builder.traverse_and_collect(&expr);
382        let predicates = builder.output.get(&1).unwrap();
383        assert_eq!(predicates.len(), 1);
384        assert_eq!(
385            predicates[0],
386            Predicate::RegexMatch(RegexMatchPredicate {
387                pattern: "bar".to_string()
388            })
389        );
390        let predicates = builder.output.get(&2).unwrap();
391        assert_eq!(predicates.len(), 1);
392        assert_eq!(
393            predicates[0],
394            Predicate::Range(RangePredicate {
395                range: Range {
396                    lower: Some(Bound {
397                        inclusive: true,
398                        value: encoded_int64(123),
399                    }),
400                    upper: Some(Bound {
401                        inclusive: true,
402                        value: encoded_int64(456),
403                    }),
404                }
405            })
406        );
407    }
408}