mito2/sst/index/inverted_index/applier/builder/
eq_list.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26    /// Collects an eq expression in the form of `column = lit`.
27    pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29            return Ok(());
30        };
31        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32            return Ok(());
33        };
34        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35            return Ok(());
36        };
37
38        let predicate = Predicate::InList(InListPredicate {
39            list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]),
40        });
41        self.add_predicate(column_id, predicate);
42        Ok(())
43    }
44
45    /// Collects eq list in the form of `column = lit OR column = lit OR ...`.
46    pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47        let DfExpr::BinaryExpr(BinaryExpr {
48            left,
49            op: Operator::Eq,
50            right,
51        }) = eq_expr
52        else {
53            return Ok(());
54        };
55
56        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57            return Ok(());
58        };
59        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60            return Ok(());
61        };
62        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63            return Ok(());
64        };
65
66        let bytes = Self::encode_lit(lit, data_type.clone())?;
67        let mut inlist = BTreeSet::from_iter([bytes]);
68
69        if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70            let predicate = Predicate::InList(InListPredicate { list: inlist });
71            self.add_predicate(column_id, predicate);
72        }
73
74        Ok(())
75    }
76
77    /// Recursively collects eq list.
78    ///
79    /// Returns false if the expression doesn't match the form then
80    /// caller can safely ignore the expression.
81    fn collect_eq_list_inner(
82        column_name: &str,
83        data_type: &ConcreteDataType,
84        expr: &DfExpr,
85        inlist: &mut BTreeSet<Bytes>,
86    ) -> Result<bool> {
87        let DfExpr::BinaryExpr(BinaryExpr {
88            left,
89            op: op @ (Operator::Eq | Operator::Or),
90            right,
91        }) = expr
92        else {
93            return Ok(false);
94        };
95
96        if op == &Operator::Or {
97            let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98                .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99                .transpose()?
100                .unwrap_or(false);
101            return Ok(r);
102        }
103
104        if op == &Operator::Eq {
105            let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106                return Ok(false);
107            };
108            if column_name != name {
109                return Ok(false);
110            }
111            let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112                return Ok(false);
113            };
114
115            inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116            return Ok(true);
117        }
118
119        Ok(false)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use std::collections::HashSet;
126
127    use store_api::region_request::PathType;
128
129    use super::*;
130    use crate::error::Error;
131    use crate::sst::index::inverted_index::applier::builder::tests::{
132        encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
133        tag_column2, test_object_store, test_region_metadata,
134    };
135    use crate::sst::index::puffin_manager::PuffinManagerFactory;
136
137    #[test]
138    fn test_collect_eq_basic() {
139        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
140        let metadata = test_region_metadata();
141        let mut builder = InvertedIndexApplierBuilder::new(
142            "test".to_string(),
143            PathType::Bare,
144            test_object_store(),
145            &metadata,
146            HashSet::from_iter([1, 2, 3]),
147            facotry,
148        );
149
150        builder
151            .collect_eq(&tag_column(), &string_lit("foo"))
152            .unwrap();
153        builder
154            .collect_eq(&string_lit("bar"), &tag_column())
155            .unwrap();
156
157        let predicates = builder.output.get(&1).unwrap();
158        assert_eq!(predicates.len(), 2);
159        assert_eq!(
160            predicates[0],
161            Predicate::InList(InListPredicate {
162                list: BTreeSet::from_iter([encoded_string("foo")])
163            })
164        );
165        assert_eq!(
166            predicates[1],
167            Predicate::InList(InListPredicate {
168                list: BTreeSet::from_iter([encoded_string("bar")])
169            })
170        );
171    }
172
173    #[test]
174    fn test_collect_eq_field_column() {
175        let (_d, facotry) =
176            PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
177        let metadata = test_region_metadata();
178        let mut builder = InvertedIndexApplierBuilder::new(
179            "test".to_string(),
180            PathType::Bare,
181            test_object_store(),
182            &metadata,
183            HashSet::from_iter([1, 2, 3]),
184            facotry,
185        );
186
187        builder
188            .collect_eq(&field_column(), &string_lit("abc"))
189            .unwrap();
190
191        let predicates = builder.output.get(&3).unwrap();
192        assert_eq!(predicates.len(), 1);
193        assert_eq!(
194            predicates[0],
195            Predicate::InList(InListPredicate {
196                list: BTreeSet::from_iter([encoded_string("abc")])
197            })
198        );
199    }
200
201    #[test]
202    fn test_collect_eq_nonexistent_column() {
203        let (_d, facotry) =
204            PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
205        let metadata = test_region_metadata();
206        let mut builder = InvertedIndexApplierBuilder::new(
207            "test".to_string(),
208            PathType::Bare,
209            test_object_store(),
210            &metadata,
211            HashSet::from_iter([1, 2, 3]),
212            facotry,
213        );
214
215        let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
216        assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
217        assert!(builder.output.is_empty());
218    }
219
220    #[test]
221    fn test_collect_eq_type_mismatch() {
222        let (_d, facotry) =
223            PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
224        let metadata = test_region_metadata();
225        let mut builder = InvertedIndexApplierBuilder::new(
226            "test".to_string(),
227            PathType::Bare,
228            test_object_store(),
229            &metadata,
230            HashSet::from_iter([1, 2, 3]),
231            facotry,
232        );
233
234        let res = builder.collect_eq(&tag_column(), &int64_lit(1));
235        assert!(matches!(res, Err(Error::Encode { .. })));
236        assert!(builder.output.is_empty());
237    }
238
239    #[test]
240    fn test_collect_or_eq_list_basic() {
241        let (_d, facotry) =
242            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
243        let metadata = test_region_metadata();
244        let mut builder = InvertedIndexApplierBuilder::new(
245            "test".to_string(),
246            PathType::Bare,
247            test_object_store(),
248            &metadata,
249            HashSet::from_iter([1, 2, 3]),
250            facotry,
251        );
252
253        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
254            left: Box::new(tag_column()),
255            op: Operator::Eq,
256            right: Box::new(string_lit("abc")),
257        });
258        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
259            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
260                left: Box::new(tag_column()),
261                op: Operator::Eq,
262                right: Box::new(string_lit("foo")),
263            })),
264            op: Operator::Or,
265            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
266                left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
267                    left: Box::new(tag_column()),
268                    op: Operator::Eq,
269                    right: Box::new(string_lit("bar")),
270                })),
271                op: Operator::Or,
272                right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
273                    left: Box::new(tag_column()),
274                    op: Operator::Eq,
275                    right: Box::new(string_lit("baz")),
276                })),
277            })),
278        });
279
280        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
281
282        let predicates = builder.output.get(&1).unwrap();
283        assert_eq!(predicates.len(), 1);
284        assert_eq!(
285            predicates[0],
286            Predicate::InList(InListPredicate {
287                list: BTreeSet::from_iter([
288                    encoded_string("abc"),
289                    encoded_string("foo"),
290                    encoded_string("bar"),
291                    encoded_string("baz")
292                ])
293            })
294        );
295    }
296
297    #[test]
298    fn test_collect_or_eq_list_invalid_op() {
299        let (_d, facotry) =
300            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
301        let metadata = test_region_metadata();
302        let mut builder = InvertedIndexApplierBuilder::new(
303            "test".to_string(),
304            PathType::Bare,
305            test_object_store(),
306            &metadata,
307            HashSet::from_iter([1, 2, 3]),
308            facotry,
309        );
310
311        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
312            left: Box::new(tag_column()),
313            op: Operator::Eq,
314            right: Box::new(string_lit("abc")),
315        });
316        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
317            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
318                left: Box::new(tag_column()),
319                op: Operator::Eq,
320                right: Box::new(string_lit("foo")),
321            })),
322            op: Operator::Or,
323            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
324                left: Box::new(tag_column()),
325                op: Operator::Gt, // invalid op
326                right: Box::new(string_lit("foo")),
327            })),
328        });
329
330        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
331        assert!(builder.output.is_empty());
332    }
333
334    #[test]
335    fn test_collect_or_eq_list_multiple_columns() {
336        let (_d, facotry) =
337            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
338        let metadata = test_region_metadata();
339        let mut builder = InvertedIndexApplierBuilder::new(
340            "test".to_string(),
341            PathType::Bare,
342            test_object_store(),
343            &metadata,
344            HashSet::from_iter([1, 2, 3]),
345            facotry,
346        );
347
348        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
349            left: Box::new(tag_column()),
350            op: Operator::Eq,
351            right: Box::new(string_lit("abc")),
352        });
353        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
354            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
355                left: Box::new(tag_column()),
356                op: Operator::Eq,
357                right: Box::new(string_lit("foo")),
358            })),
359            op: Operator::Or,
360            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
361                left: Box::new(tag_column2()), // different column
362                op: Operator::Eq,
363                right: Box::new(string_lit("bar")),
364            })),
365        });
366
367        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
368        assert!(builder.output.is_empty());
369    }
370}