mito2/sst/index/inverted_index/applier/builder/
eq_list.rs1use std::collections::BTreeSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26 pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29 return Ok(());
30 };
31 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32 return Ok(());
33 };
34 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35 return Ok(());
36 };
37
38 let predicate = Predicate::InList(InListPredicate {
39 list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]),
40 });
41 self.add_predicate(column_id, predicate);
42 Ok(())
43 }
44
45 pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47 let DfExpr::BinaryExpr(BinaryExpr {
48 left,
49 op: Operator::Eq,
50 right,
51 }) = eq_expr
52 else {
53 return Ok(());
54 };
55
56 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57 return Ok(());
58 };
59 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60 return Ok(());
61 };
62 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63 return Ok(());
64 };
65
66 let bytes = Self::encode_lit(lit, data_type.clone())?;
67 let mut inlist = BTreeSet::from_iter([bytes]);
68
69 if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70 let predicate = Predicate::InList(InListPredicate { list: inlist });
71 self.add_predicate(column_id, predicate);
72 }
73
74 Ok(())
75 }
76
77 fn collect_eq_list_inner(
82 column_name: &str,
83 data_type: &ConcreteDataType,
84 expr: &DfExpr,
85 inlist: &mut BTreeSet<Bytes>,
86 ) -> Result<bool> {
87 let DfExpr::BinaryExpr(BinaryExpr {
88 left,
89 op: op @ (Operator::Eq | Operator::Or),
90 right,
91 }) = expr
92 else {
93 return Ok(false);
94 };
95
96 if op == &Operator::Or {
97 let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98 .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99 .transpose()?
100 .unwrap_or(false);
101 return Ok(r);
102 }
103
104 if op == &Operator::Eq {
105 let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106 return Ok(false);
107 };
108 if column_name != name {
109 return Ok(false);
110 }
111 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112 return Ok(false);
113 };
114
115 inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116 return Ok(true);
117 }
118
119 Ok(false)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::collections::HashSet;
126
127 use super::*;
128 use crate::error::Error;
129 use crate::sst::index::inverted_index::applier::builder::tests::{
130 encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
131 tag_column2, test_object_store, test_region_metadata,
132 };
133 use crate::sst::index::puffin_manager::PuffinManagerFactory;
134
135 #[test]
136 fn test_collect_eq_basic() {
137 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
138 let metadata = test_region_metadata();
139 let mut builder = InvertedIndexApplierBuilder::new(
140 "test".to_string(),
141 test_object_store(),
142 &metadata,
143 HashSet::from_iter([1, 2, 3]),
144 facotry,
145 );
146
147 builder
148 .collect_eq(&tag_column(), &string_lit("foo"))
149 .unwrap();
150 builder
151 .collect_eq(&string_lit("bar"), &tag_column())
152 .unwrap();
153
154 let predicates = builder.output.get(&1).unwrap();
155 assert_eq!(predicates.len(), 2);
156 assert_eq!(
157 predicates[0],
158 Predicate::InList(InListPredicate {
159 list: BTreeSet::from_iter([encoded_string("foo")])
160 })
161 );
162 assert_eq!(
163 predicates[1],
164 Predicate::InList(InListPredicate {
165 list: BTreeSet::from_iter([encoded_string("bar")])
166 })
167 );
168 }
169
170 #[test]
171 fn test_collect_eq_field_column() {
172 let (_d, facotry) =
173 PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
174 let metadata = test_region_metadata();
175 let mut builder = InvertedIndexApplierBuilder::new(
176 "test".to_string(),
177 test_object_store(),
178 &metadata,
179 HashSet::from_iter([1, 2, 3]),
180 facotry,
181 );
182
183 builder
184 .collect_eq(&field_column(), &string_lit("abc"))
185 .unwrap();
186
187 let predicates = builder.output.get(&3).unwrap();
188 assert_eq!(predicates.len(), 1);
189 assert_eq!(
190 predicates[0],
191 Predicate::InList(InListPredicate {
192 list: BTreeSet::from_iter([encoded_string("abc")])
193 })
194 );
195 }
196
197 #[test]
198 fn test_collect_eq_nonexistent_column() {
199 let (_d, facotry) =
200 PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
201 let metadata = test_region_metadata();
202 let mut builder = InvertedIndexApplierBuilder::new(
203 "test".to_string(),
204 test_object_store(),
205 &metadata,
206 HashSet::from_iter([1, 2, 3]),
207 facotry,
208 );
209
210 let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
211 assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
212 assert!(builder.output.is_empty());
213 }
214
215 #[test]
216 fn test_collect_eq_type_mismatch() {
217 let (_d, facotry) =
218 PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
219 let metadata = test_region_metadata();
220 let mut builder = InvertedIndexApplierBuilder::new(
221 "test".to_string(),
222 test_object_store(),
223 &metadata,
224 HashSet::from_iter([1, 2, 3]),
225 facotry,
226 );
227
228 let res = builder.collect_eq(&tag_column(), &int64_lit(1));
229 assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
230 assert!(builder.output.is_empty());
231 }
232
233 #[test]
234 fn test_collect_or_eq_list_basic() {
235 let (_d, facotry) =
236 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
237 let metadata = test_region_metadata();
238 let mut builder = InvertedIndexApplierBuilder::new(
239 "test".to_string(),
240 test_object_store(),
241 &metadata,
242 HashSet::from_iter([1, 2, 3]),
243 facotry,
244 );
245
246 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
247 left: Box::new(tag_column()),
248 op: Operator::Eq,
249 right: Box::new(string_lit("abc")),
250 });
251 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
252 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
253 left: Box::new(tag_column()),
254 op: Operator::Eq,
255 right: Box::new(string_lit("foo")),
256 })),
257 op: Operator::Or,
258 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
259 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
260 left: Box::new(tag_column()),
261 op: Operator::Eq,
262 right: Box::new(string_lit("bar")),
263 })),
264 op: Operator::Or,
265 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
266 left: Box::new(tag_column()),
267 op: Operator::Eq,
268 right: Box::new(string_lit("baz")),
269 })),
270 })),
271 });
272
273 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
274
275 let predicates = builder.output.get(&1).unwrap();
276 assert_eq!(predicates.len(), 1);
277 assert_eq!(
278 predicates[0],
279 Predicate::InList(InListPredicate {
280 list: BTreeSet::from_iter([
281 encoded_string("abc"),
282 encoded_string("foo"),
283 encoded_string("bar"),
284 encoded_string("baz")
285 ])
286 })
287 );
288 }
289
290 #[test]
291 fn test_collect_or_eq_list_invalid_op() {
292 let (_d, facotry) =
293 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
294 let metadata = test_region_metadata();
295 let mut builder = InvertedIndexApplierBuilder::new(
296 "test".to_string(),
297 test_object_store(),
298 &metadata,
299 HashSet::from_iter([1, 2, 3]),
300 facotry,
301 );
302
303 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
304 left: Box::new(tag_column()),
305 op: Operator::Eq,
306 right: Box::new(string_lit("abc")),
307 });
308 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
309 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
310 left: Box::new(tag_column()),
311 op: Operator::Eq,
312 right: Box::new(string_lit("foo")),
313 })),
314 op: Operator::Or,
315 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
316 left: Box::new(tag_column()),
317 op: Operator::Gt, right: Box::new(string_lit("foo")),
319 })),
320 });
321
322 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
323 assert!(builder.output.is_empty());
324 }
325
326 #[test]
327 fn test_collect_or_eq_list_multiple_columns() {
328 let (_d, facotry) =
329 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
330 let metadata = test_region_metadata();
331 let mut builder = InvertedIndexApplierBuilder::new(
332 "test".to_string(),
333 test_object_store(),
334 &metadata,
335 HashSet::from_iter([1, 2, 3]),
336 facotry,
337 );
338
339 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
340 left: Box::new(tag_column()),
341 op: Operator::Eq,
342 right: Box::new(string_lit("abc")),
343 });
344 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
345 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
346 left: Box::new(tag_column()),
347 op: Operator::Eq,
348 right: Box::new(string_lit("foo")),
349 })),
350 op: Operator::Or,
351 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
352 left: Box::new(tag_column2()), op: Operator::Eq,
354 right: Box::new(string_lit("bar")),
355 })),
356 });
357
358 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
359 assert!(builder.output.is_empty());
360 }
361}