mito2/sst/index/inverted_index/applier/builder/
eq_list.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26    /// Collects an eq expression in the form of `column = lit`.
27    pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29            return Ok(());
30        };
31        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32            return Ok(());
33        };
34        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35            return Ok(());
36        };
37
38        let predicate = Predicate::InList(InListPredicate {
39            list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]),
40        });
41        self.add_predicate(column_id, predicate);
42        Ok(())
43    }
44
45    /// Collects eq list in the form of `column = lit OR column = lit OR ...`.
46    pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47        let DfExpr::BinaryExpr(BinaryExpr {
48            left,
49            op: Operator::Eq,
50            right,
51        }) = eq_expr
52        else {
53            return Ok(());
54        };
55
56        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57            return Ok(());
58        };
59        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60            return Ok(());
61        };
62        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63            return Ok(());
64        };
65
66        let bytes = Self::encode_lit(lit, data_type.clone())?;
67        let mut inlist = BTreeSet::from_iter([bytes]);
68
69        if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70            let predicate = Predicate::InList(InListPredicate { list: inlist });
71            self.add_predicate(column_id, predicate);
72        }
73
74        Ok(())
75    }
76
77    /// Recursively collects eq list.
78    ///
79    /// Returns false if the expression doesn't match the form then
80    /// caller can safely ignore the expression.
81    fn collect_eq_list_inner(
82        column_name: &str,
83        data_type: &ConcreteDataType,
84        expr: &DfExpr,
85        inlist: &mut BTreeSet<Bytes>,
86    ) -> Result<bool> {
87        let DfExpr::BinaryExpr(BinaryExpr {
88            left,
89            op: op @ (Operator::Eq | Operator::Or),
90            right,
91        }) = expr
92        else {
93            return Ok(false);
94        };
95
96        if op == &Operator::Or {
97            let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98                .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99                .transpose()?
100                .unwrap_or(false);
101            return Ok(r);
102        }
103
104        if op == &Operator::Eq {
105            let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106                return Ok(false);
107            };
108            if column_name != name {
109                return Ok(false);
110            }
111            let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112                return Ok(false);
113            };
114
115            inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116            return Ok(true);
117        }
118
119        Ok(false)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::collections::HashSet;
126
127    use super::*;
128    use crate::error::Error;
129    use crate::sst::index::inverted_index::applier::builder::tests::{
130        encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
131        tag_column2, test_object_store, test_region_metadata,
132    };
133    use crate::sst::index::puffin_manager::PuffinManagerFactory;
134
135    #[test]
136    fn test_collect_eq_basic() {
137        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
138        let metadata = test_region_metadata();
139        let mut builder = InvertedIndexApplierBuilder::new(
140            "test".to_string(),
141            test_object_store(),
142            &metadata,
143            HashSet::from_iter([1, 2, 3]),
144            facotry,
145        );
146
147        builder
148            .collect_eq(&tag_column(), &string_lit("foo"))
149            .unwrap();
150        builder
151            .collect_eq(&string_lit("bar"), &tag_column())
152            .unwrap();
153
154        let predicates = builder.output.get(&1).unwrap();
155        assert_eq!(predicates.len(), 2);
156        assert_eq!(
157            predicates[0],
158            Predicate::InList(InListPredicate {
159                list: BTreeSet::from_iter([encoded_string("foo")])
160            })
161        );
162        assert_eq!(
163            predicates[1],
164            Predicate::InList(InListPredicate {
165                list: BTreeSet::from_iter([encoded_string("bar")])
166            })
167        );
168    }
169
170    #[test]
171    fn test_collect_eq_field_column() {
172        let (_d, facotry) =
173            PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
174        let metadata = test_region_metadata();
175        let mut builder = InvertedIndexApplierBuilder::new(
176            "test".to_string(),
177            test_object_store(),
178            &metadata,
179            HashSet::from_iter([1, 2, 3]),
180            facotry,
181        );
182
183        builder
184            .collect_eq(&field_column(), &string_lit("abc"))
185            .unwrap();
186
187        let predicates = builder.output.get(&3).unwrap();
188        assert_eq!(predicates.len(), 1);
189        assert_eq!(
190            predicates[0],
191            Predicate::InList(InListPredicate {
192                list: BTreeSet::from_iter([encoded_string("abc")])
193            })
194        );
195    }
196
197    #[test]
198    fn test_collect_eq_nonexistent_column() {
199        let (_d, facotry) =
200            PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
201        let metadata = test_region_metadata();
202        let mut builder = InvertedIndexApplierBuilder::new(
203            "test".to_string(),
204            test_object_store(),
205            &metadata,
206            HashSet::from_iter([1, 2, 3]),
207            facotry,
208        );
209
210        let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
211        assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
212        assert!(builder.output.is_empty());
213    }
214
215    #[test]
216    fn test_collect_eq_type_mismatch() {
217        let (_d, facotry) =
218            PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
219        let metadata = test_region_metadata();
220        let mut builder = InvertedIndexApplierBuilder::new(
221            "test".to_string(),
222            test_object_store(),
223            &metadata,
224            HashSet::from_iter([1, 2, 3]),
225            facotry,
226        );
227
228        let res = builder.collect_eq(&tag_column(), &int64_lit(1));
229        assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
230        assert!(builder.output.is_empty());
231    }
232
233    #[test]
234    fn test_collect_or_eq_list_basic() {
235        let (_d, facotry) =
236            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
237        let metadata = test_region_metadata();
238        let mut builder = InvertedIndexApplierBuilder::new(
239            "test".to_string(),
240            test_object_store(),
241            &metadata,
242            HashSet::from_iter([1, 2, 3]),
243            facotry,
244        );
245
246        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
247            left: Box::new(tag_column()),
248            op: Operator::Eq,
249            right: Box::new(string_lit("abc")),
250        });
251        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
252            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
253                left: Box::new(tag_column()),
254                op: Operator::Eq,
255                right: Box::new(string_lit("foo")),
256            })),
257            op: Operator::Or,
258            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
259                left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
260                    left: Box::new(tag_column()),
261                    op: Operator::Eq,
262                    right: Box::new(string_lit("bar")),
263                })),
264                op: Operator::Or,
265                right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
266                    left: Box::new(tag_column()),
267                    op: Operator::Eq,
268                    right: Box::new(string_lit("baz")),
269                })),
270            })),
271        });
272
273        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
274
275        let predicates = builder.output.get(&1).unwrap();
276        assert_eq!(predicates.len(), 1);
277        assert_eq!(
278            predicates[0],
279            Predicate::InList(InListPredicate {
280                list: BTreeSet::from_iter([
281                    encoded_string("abc"),
282                    encoded_string("foo"),
283                    encoded_string("bar"),
284                    encoded_string("baz")
285                ])
286            })
287        );
288    }
289
290    #[test]
291    fn test_collect_or_eq_list_invalid_op() {
292        let (_d, facotry) =
293            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
294        let metadata = test_region_metadata();
295        let mut builder = InvertedIndexApplierBuilder::new(
296            "test".to_string(),
297            test_object_store(),
298            &metadata,
299            HashSet::from_iter([1, 2, 3]),
300            facotry,
301        );
302
303        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
304            left: Box::new(tag_column()),
305            op: Operator::Eq,
306            right: Box::new(string_lit("abc")),
307        });
308        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
309            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
310                left: Box::new(tag_column()),
311                op: Operator::Eq,
312                right: Box::new(string_lit("foo")),
313            })),
314            op: Operator::Or,
315            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
316                left: Box::new(tag_column()),
317                op: Operator::Gt, // invalid op
318                right: Box::new(string_lit("foo")),
319            })),
320        });
321
322        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
323        assert!(builder.output.is_empty());
324    }
325
326    #[test]
327    fn test_collect_or_eq_list_multiple_columns() {
328        let (_d, facotry) =
329            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
330        let metadata = test_region_metadata();
331        let mut builder = InvertedIndexApplierBuilder::new(
332            "test".to_string(),
333            test_object_store(),
334            &metadata,
335            HashSet::from_iter([1, 2, 3]),
336            facotry,
337        );
338
339        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
340            left: Box::new(tag_column()),
341            op: Operator::Eq,
342            right: Box::new(string_lit("abc")),
343        });
344        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
345            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
346                left: Box::new(tag_column()),
347                op: Operator::Eq,
348                right: Box::new(string_lit("foo")),
349            })),
350            op: Operator::Or,
351            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
352                left: Box::new(tag_column2()), // different column
353                op: Operator::Eq,
354                right: Box::new(string_lit("bar")),
355            })),
356        });
357
358        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
359        assert!(builder.output.is_empty());
360    }
361}