mito2/sst/index/inverted_index/applier/builder/
eq_list.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26    /// Collects an eq expression in the form of `column = lit`.
27    pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29            return Ok(());
30        };
31        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32            return Ok(());
33        };
34        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35            return Ok(());
36        };
37
38        let predicate = Predicate::InList(InListPredicate {
39            list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]),
40        });
41        self.add_predicate(column_id, predicate);
42        Ok(())
43    }
44
45    /// Collects eq list in the form of `column = lit OR column = lit OR ...`.
46    pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47        let DfExpr::BinaryExpr(BinaryExpr {
48            left,
49            op: Operator::Eq,
50            right,
51        }) = eq_expr
52        else {
53            return Ok(());
54        };
55
56        let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57            return Ok(());
58        };
59        let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60            return Ok(());
61        };
62        let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63            return Ok(());
64        };
65
66        let bytes = Self::encode_lit(lit, data_type.clone())?;
67        let mut inlist = HashSet::from_iter([bytes]);
68
69        if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70            let predicate = Predicate::InList(InListPredicate { list: inlist });
71            self.add_predicate(column_id, predicate);
72        }
73
74        Ok(())
75    }
76
77    /// Recursively collects eq list.
78    ///
79    /// Returns false if the expression doesn't match the form then
80    /// caller can safely ignore the expression.
81    fn collect_eq_list_inner(
82        column_name: &str,
83        data_type: &ConcreteDataType,
84        expr: &DfExpr,
85        inlist: &mut HashSet<Bytes>,
86    ) -> Result<bool> {
87        let DfExpr::BinaryExpr(BinaryExpr {
88            left,
89            op: op @ (Operator::Eq | Operator::Or),
90            right,
91        }) = expr
92        else {
93            return Ok(false);
94        };
95
96        if op == &Operator::Or {
97            let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98                .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99                .transpose()?
100                .unwrap_or(false);
101            return Ok(r);
102        }
103
104        if op == &Operator::Eq {
105            let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106                return Ok(false);
107            };
108            if column_name != name {
109                return Ok(false);
110            }
111            let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112                return Ok(false);
113            };
114
115            inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116            return Ok(true);
117        }
118
119        Ok(false)
120    }
121}
122
123#[cfg(test)]
124mod tests {
125    use super::*;
126    use crate::error::Error;
127    use crate::sst::index::inverted_index::applier::builder::tests::{
128        encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
129        tag_column2, test_object_store, test_region_metadata,
130    };
131    use crate::sst::index::puffin_manager::PuffinManagerFactory;
132
133    #[test]
134    fn test_collect_eq_basic() {
135        let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
136        let metadata = test_region_metadata();
137        let mut builder = InvertedIndexApplierBuilder::new(
138            "test".to_string(),
139            test_object_store(),
140            &metadata,
141            HashSet::from_iter([1, 2, 3]),
142            facotry,
143        );
144
145        builder
146            .collect_eq(&tag_column(), &string_lit("foo"))
147            .unwrap();
148        builder
149            .collect_eq(&string_lit("bar"), &tag_column())
150            .unwrap();
151
152        let predicates = builder.output.get(&1).unwrap();
153        assert_eq!(predicates.len(), 2);
154        assert_eq!(
155            predicates[0],
156            Predicate::InList(InListPredicate {
157                list: HashSet::from_iter([encoded_string("foo")])
158            })
159        );
160        assert_eq!(
161            predicates[1],
162            Predicate::InList(InListPredicate {
163                list: HashSet::from_iter([encoded_string("bar")])
164            })
165        );
166    }
167
168    #[test]
169    fn test_collect_eq_field_column() {
170        let (_d, facotry) =
171            PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
172        let metadata = test_region_metadata();
173        let mut builder = InvertedIndexApplierBuilder::new(
174            "test".to_string(),
175            test_object_store(),
176            &metadata,
177            HashSet::from_iter([1, 2, 3]),
178            facotry,
179        );
180
181        builder
182            .collect_eq(&field_column(), &string_lit("abc"))
183            .unwrap();
184
185        let predicates = builder.output.get(&3).unwrap();
186        assert_eq!(predicates.len(), 1);
187        assert_eq!(
188            predicates[0],
189            Predicate::InList(InListPredicate {
190                list: HashSet::from_iter([encoded_string("abc")])
191            })
192        );
193    }
194
195    #[test]
196    fn test_collect_eq_nonexistent_column() {
197        let (_d, facotry) =
198            PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
199        let metadata = test_region_metadata();
200        let mut builder = InvertedIndexApplierBuilder::new(
201            "test".to_string(),
202            test_object_store(),
203            &metadata,
204            HashSet::from_iter([1, 2, 3]),
205            facotry,
206        );
207
208        let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
209        assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
210        assert!(builder.output.is_empty());
211    }
212
213    #[test]
214    fn test_collect_eq_type_mismatch() {
215        let (_d, facotry) =
216            PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
217        let metadata = test_region_metadata();
218        let mut builder = InvertedIndexApplierBuilder::new(
219            "test".to_string(),
220            test_object_store(),
221            &metadata,
222            HashSet::from_iter([1, 2, 3]),
223            facotry,
224        );
225
226        let res = builder.collect_eq(&tag_column(), &int64_lit(1));
227        assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
228        assert!(builder.output.is_empty());
229    }
230
231    #[test]
232    fn test_collect_or_eq_list_basic() {
233        let (_d, facotry) =
234            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
235        let metadata = test_region_metadata();
236        let mut builder = InvertedIndexApplierBuilder::new(
237            "test".to_string(),
238            test_object_store(),
239            &metadata,
240            HashSet::from_iter([1, 2, 3]),
241            facotry,
242        );
243
244        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
245            left: Box::new(tag_column()),
246            op: Operator::Eq,
247            right: Box::new(string_lit("abc")),
248        });
249        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
250            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
251                left: Box::new(tag_column()),
252                op: Operator::Eq,
253                right: Box::new(string_lit("foo")),
254            })),
255            op: Operator::Or,
256            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
257                left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
258                    left: Box::new(tag_column()),
259                    op: Operator::Eq,
260                    right: Box::new(string_lit("bar")),
261                })),
262                op: Operator::Or,
263                right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
264                    left: Box::new(tag_column()),
265                    op: Operator::Eq,
266                    right: Box::new(string_lit("baz")),
267                })),
268            })),
269        });
270
271        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
272
273        let predicates = builder.output.get(&1).unwrap();
274        assert_eq!(predicates.len(), 1);
275        assert_eq!(
276            predicates[0],
277            Predicate::InList(InListPredicate {
278                list: HashSet::from_iter([
279                    encoded_string("abc"),
280                    encoded_string("foo"),
281                    encoded_string("bar"),
282                    encoded_string("baz")
283                ])
284            })
285        );
286    }
287
288    #[test]
289    fn test_collect_or_eq_list_invalid_op() {
290        let (_d, facotry) =
291            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
292        let metadata = test_region_metadata();
293        let mut builder = InvertedIndexApplierBuilder::new(
294            "test".to_string(),
295            test_object_store(),
296            &metadata,
297            HashSet::from_iter([1, 2, 3]),
298            facotry,
299        );
300
301        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
302            left: Box::new(tag_column()),
303            op: Operator::Eq,
304            right: Box::new(string_lit("abc")),
305        });
306        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
307            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
308                left: Box::new(tag_column()),
309                op: Operator::Eq,
310                right: Box::new(string_lit("foo")),
311            })),
312            op: Operator::Or,
313            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
314                left: Box::new(tag_column()),
315                op: Operator::Gt, // invalid op
316                right: Box::new(string_lit("foo")),
317            })),
318        });
319
320        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
321        assert!(builder.output.is_empty());
322    }
323
324    #[test]
325    fn test_collect_or_eq_list_multiple_columns() {
326        let (_d, facotry) =
327            PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
328        let metadata = test_region_metadata();
329        let mut builder = InvertedIndexApplierBuilder::new(
330            "test".to_string(),
331            test_object_store(),
332            &metadata,
333            HashSet::from_iter([1, 2, 3]),
334            facotry,
335        );
336
337        let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
338            left: Box::new(tag_column()),
339            op: Operator::Eq,
340            right: Box::new(string_lit("abc")),
341        });
342        let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
343            left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
344                left: Box::new(tag_column()),
345                op: Operator::Eq,
346                right: Box::new(string_lit("foo")),
347            })),
348            op: Operator::Or,
349            right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
350                left: Box::new(tag_column2()), // different column
351                op: Operator::Eq,
352                right: Box::new(string_lit("bar")),
353            })),
354        });
355
356        builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
357        assert!(builder.output.is_empty());
358    }
359}