mito2/sst/index/inverted_index/applier/builder/
eq_list.rs1use std::collections::HashSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26 pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29 return Ok(());
30 };
31 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32 return Ok(());
33 };
34 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35 return Ok(());
36 };
37
38 let predicate = Predicate::InList(InListPredicate {
39 list: HashSet::from_iter([Self::encode_lit(lit, data_type)?]),
40 });
41 self.add_predicate(column_id, predicate);
42 Ok(())
43 }
44
45 pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47 let DfExpr::BinaryExpr(BinaryExpr {
48 left,
49 op: Operator::Eq,
50 right,
51 }) = eq_expr
52 else {
53 return Ok(());
54 };
55
56 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57 return Ok(());
58 };
59 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60 return Ok(());
61 };
62 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63 return Ok(());
64 };
65
66 let bytes = Self::encode_lit(lit, data_type.clone())?;
67 let mut inlist = HashSet::from_iter([bytes]);
68
69 if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70 let predicate = Predicate::InList(InListPredicate { list: inlist });
71 self.add_predicate(column_id, predicate);
72 }
73
74 Ok(())
75 }
76
77 fn collect_eq_list_inner(
82 column_name: &str,
83 data_type: &ConcreteDataType,
84 expr: &DfExpr,
85 inlist: &mut HashSet<Bytes>,
86 ) -> Result<bool> {
87 let DfExpr::BinaryExpr(BinaryExpr {
88 left,
89 op: op @ (Operator::Eq | Operator::Or),
90 right,
91 }) = expr
92 else {
93 return Ok(false);
94 };
95
96 if op == &Operator::Or {
97 let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98 .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99 .transpose()?
100 .unwrap_or(false);
101 return Ok(r);
102 }
103
104 if op == &Operator::Eq {
105 let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106 return Ok(false);
107 };
108 if column_name != name {
109 return Ok(false);
110 }
111 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112 return Ok(false);
113 };
114
115 inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116 return Ok(true);
117 }
118
119 Ok(false)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use super::*;
126 use crate::error::Error;
127 use crate::sst::index::inverted_index::applier::builder::tests::{
128 encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
129 tag_column2, test_object_store, test_region_metadata,
130 };
131 use crate::sst::index::puffin_manager::PuffinManagerFactory;
132
133 #[test]
134 fn test_collect_eq_basic() {
135 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
136 let metadata = test_region_metadata();
137 let mut builder = InvertedIndexApplierBuilder::new(
138 "test".to_string(),
139 test_object_store(),
140 &metadata,
141 HashSet::from_iter([1, 2, 3]),
142 facotry,
143 );
144
145 builder
146 .collect_eq(&tag_column(), &string_lit("foo"))
147 .unwrap();
148 builder
149 .collect_eq(&string_lit("bar"), &tag_column())
150 .unwrap();
151
152 let predicates = builder.output.get(&1).unwrap();
153 assert_eq!(predicates.len(), 2);
154 assert_eq!(
155 predicates[0],
156 Predicate::InList(InListPredicate {
157 list: HashSet::from_iter([encoded_string("foo")])
158 })
159 );
160 assert_eq!(
161 predicates[1],
162 Predicate::InList(InListPredicate {
163 list: HashSet::from_iter([encoded_string("bar")])
164 })
165 );
166 }
167
168 #[test]
169 fn test_collect_eq_field_column() {
170 let (_d, facotry) =
171 PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
172 let metadata = test_region_metadata();
173 let mut builder = InvertedIndexApplierBuilder::new(
174 "test".to_string(),
175 test_object_store(),
176 &metadata,
177 HashSet::from_iter([1, 2, 3]),
178 facotry,
179 );
180
181 builder
182 .collect_eq(&field_column(), &string_lit("abc"))
183 .unwrap();
184
185 let predicates = builder.output.get(&3).unwrap();
186 assert_eq!(predicates.len(), 1);
187 assert_eq!(
188 predicates[0],
189 Predicate::InList(InListPredicate {
190 list: HashSet::from_iter([encoded_string("abc")])
191 })
192 );
193 }
194
195 #[test]
196 fn test_collect_eq_nonexistent_column() {
197 let (_d, facotry) =
198 PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
199 let metadata = test_region_metadata();
200 let mut builder = InvertedIndexApplierBuilder::new(
201 "test".to_string(),
202 test_object_store(),
203 &metadata,
204 HashSet::from_iter([1, 2, 3]),
205 facotry,
206 );
207
208 let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
209 assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
210 assert!(builder.output.is_empty());
211 }
212
213 #[test]
214 fn test_collect_eq_type_mismatch() {
215 let (_d, facotry) =
216 PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
217 let metadata = test_region_metadata();
218 let mut builder = InvertedIndexApplierBuilder::new(
219 "test".to_string(),
220 test_object_store(),
221 &metadata,
222 HashSet::from_iter([1, 2, 3]),
223 facotry,
224 );
225
226 let res = builder.collect_eq(&tag_column(), &int64_lit(1));
227 assert!(matches!(res, Err(Error::FieldTypeMismatch { .. })));
228 assert!(builder.output.is_empty());
229 }
230
231 #[test]
232 fn test_collect_or_eq_list_basic() {
233 let (_d, facotry) =
234 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
235 let metadata = test_region_metadata();
236 let mut builder = InvertedIndexApplierBuilder::new(
237 "test".to_string(),
238 test_object_store(),
239 &metadata,
240 HashSet::from_iter([1, 2, 3]),
241 facotry,
242 );
243
244 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
245 left: Box::new(tag_column()),
246 op: Operator::Eq,
247 right: Box::new(string_lit("abc")),
248 });
249 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
250 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
251 left: Box::new(tag_column()),
252 op: Operator::Eq,
253 right: Box::new(string_lit("foo")),
254 })),
255 op: Operator::Or,
256 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
257 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
258 left: Box::new(tag_column()),
259 op: Operator::Eq,
260 right: Box::new(string_lit("bar")),
261 })),
262 op: Operator::Or,
263 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
264 left: Box::new(tag_column()),
265 op: Operator::Eq,
266 right: Box::new(string_lit("baz")),
267 })),
268 })),
269 });
270
271 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
272
273 let predicates = builder.output.get(&1).unwrap();
274 assert_eq!(predicates.len(), 1);
275 assert_eq!(
276 predicates[0],
277 Predicate::InList(InListPredicate {
278 list: HashSet::from_iter([
279 encoded_string("abc"),
280 encoded_string("foo"),
281 encoded_string("bar"),
282 encoded_string("baz")
283 ])
284 })
285 );
286 }
287
288 #[test]
289 fn test_collect_or_eq_list_invalid_op() {
290 let (_d, facotry) =
291 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
292 let metadata = test_region_metadata();
293 let mut builder = InvertedIndexApplierBuilder::new(
294 "test".to_string(),
295 test_object_store(),
296 &metadata,
297 HashSet::from_iter([1, 2, 3]),
298 facotry,
299 );
300
301 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
302 left: Box::new(tag_column()),
303 op: Operator::Eq,
304 right: Box::new(string_lit("abc")),
305 });
306 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
307 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
308 left: Box::new(tag_column()),
309 op: Operator::Eq,
310 right: Box::new(string_lit("foo")),
311 })),
312 op: Operator::Or,
313 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
314 left: Box::new(tag_column()),
315 op: Operator::Gt, right: Box::new(string_lit("foo")),
317 })),
318 });
319
320 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
321 assert!(builder.output.is_empty());
322 }
323
324 #[test]
325 fn test_collect_or_eq_list_multiple_columns() {
326 let (_d, facotry) =
327 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
328 let metadata = test_region_metadata();
329 let mut builder = InvertedIndexApplierBuilder::new(
330 "test".to_string(),
331 test_object_store(),
332 &metadata,
333 HashSet::from_iter([1, 2, 3]),
334 facotry,
335 );
336
337 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
338 left: Box::new(tag_column()),
339 op: Operator::Eq,
340 right: Box::new(string_lit("abc")),
341 });
342 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
343 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
344 left: Box::new(tag_column()),
345 op: Operator::Eq,
346 right: Box::new(string_lit("foo")),
347 })),
348 op: Operator::Or,
349 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
350 left: Box::new(tag_column2()), op: Operator::Eq,
352 right: Box::new(string_lit("bar")),
353 })),
354 });
355
356 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
357 assert!(builder.output.is_empty());
358 }
359}