1use std::collections::BTreeSet;
16
17use datafusion_expr::{BinaryExpr, Expr as DfExpr, Operator};
18use datatypes::data_type::ConcreteDataType;
19use index::inverted_index::search::predicate::{InListPredicate, Predicate};
20use index::Bytes;
21
22use crate::error::Result;
23use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
24
25impl InvertedIndexApplierBuilder<'_> {
26 pub(crate) fn collect_eq(&mut self, left: &DfExpr, right: &DfExpr) -> Result<()> {
28 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
29 return Ok(());
30 };
31 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
32 return Ok(());
33 };
34 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
35 return Ok(());
36 };
37
38 let predicate = Predicate::InList(InListPredicate {
39 list: BTreeSet::from_iter([Self::encode_lit(lit, data_type)?]),
40 });
41 self.add_predicate(column_id, predicate);
42 Ok(())
43 }
44
45 pub(crate) fn collect_or_eq_list(&mut self, eq_expr: &DfExpr, or_list: &DfExpr) -> Result<()> {
47 let DfExpr::BinaryExpr(BinaryExpr {
48 left,
49 op: Operator::Eq,
50 right,
51 }) = eq_expr
52 else {
53 return Ok(());
54 };
55
56 let Some(column_name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
57 return Ok(());
58 };
59 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
60 return Ok(());
61 };
62 let Some((column_id, data_type)) = self.column_id_and_type(column_name)? else {
63 return Ok(());
64 };
65
66 let bytes = Self::encode_lit(lit, data_type.clone())?;
67 let mut inlist = BTreeSet::from_iter([bytes]);
68
69 if Self::collect_eq_list_inner(column_name, &data_type, or_list, &mut inlist)? {
70 let predicate = Predicate::InList(InListPredicate { list: inlist });
71 self.add_predicate(column_id, predicate);
72 }
73
74 Ok(())
75 }
76
77 fn collect_eq_list_inner(
82 column_name: &str,
83 data_type: &ConcreteDataType,
84 expr: &DfExpr,
85 inlist: &mut BTreeSet<Bytes>,
86 ) -> Result<bool> {
87 let DfExpr::BinaryExpr(BinaryExpr {
88 left,
89 op: op @ (Operator::Eq | Operator::Or),
90 right,
91 }) = expr
92 else {
93 return Ok(false);
94 };
95
96 if op == &Operator::Or {
97 let r = Self::collect_eq_list_inner(column_name, data_type, left, inlist)?
98 .then(|| Self::collect_eq_list_inner(column_name, data_type, right, inlist))
99 .transpose()?
100 .unwrap_or(false);
101 return Ok(r);
102 }
103
104 if op == &Operator::Eq {
105 let Some(name) = Self::column_name(left).or_else(|| Self::column_name(right)) else {
106 return Ok(false);
107 };
108 if column_name != name {
109 return Ok(false);
110 }
111 let Some(lit) = Self::nonnull_lit(right).or_else(|| Self::nonnull_lit(left)) else {
112 return Ok(false);
113 };
114
115 inlist.insert(Self::encode_lit(lit, data_type.clone())?);
116 return Ok(true);
117 }
118
119 Ok(false)
120 }
121}
122
123#[cfg(test)]
124mod tests {
125 use std::collections::HashSet;
126
127 use store_api::region_request::PathType;
128
129 use super::*;
130 use crate::error::Error;
131 use crate::sst::index::inverted_index::applier::builder::tests::{
132 encoded_string, field_column, int64_lit, nonexistent_column, string_lit, tag_column,
133 tag_column2, test_object_store, test_region_metadata,
134 };
135 use crate::sst::index::puffin_manager::PuffinManagerFactory;
136
137 #[test]
138 fn test_collect_eq_basic() {
139 let (_d, facotry) = PuffinManagerFactory::new_for_test_block("test_collect_eq_basic_");
140 let metadata = test_region_metadata();
141 let mut builder = InvertedIndexApplierBuilder::new(
142 "test".to_string(),
143 PathType::Bare,
144 test_object_store(),
145 &metadata,
146 HashSet::from_iter([1, 2, 3]),
147 facotry,
148 );
149
150 builder
151 .collect_eq(&tag_column(), &string_lit("foo"))
152 .unwrap();
153 builder
154 .collect_eq(&string_lit("bar"), &tag_column())
155 .unwrap();
156
157 let predicates = builder.output.get(&1).unwrap();
158 assert_eq!(predicates.len(), 2);
159 assert_eq!(
160 predicates[0],
161 Predicate::InList(InListPredicate {
162 list: BTreeSet::from_iter([encoded_string("foo")])
163 })
164 );
165 assert_eq!(
166 predicates[1],
167 Predicate::InList(InListPredicate {
168 list: BTreeSet::from_iter([encoded_string("bar")])
169 })
170 );
171 }
172
173 #[test]
174 fn test_collect_eq_field_column() {
175 let (_d, facotry) =
176 PuffinManagerFactory::new_for_test_block("test_collect_eq_field_column_");
177 let metadata = test_region_metadata();
178 let mut builder = InvertedIndexApplierBuilder::new(
179 "test".to_string(),
180 PathType::Bare,
181 test_object_store(),
182 &metadata,
183 HashSet::from_iter([1, 2, 3]),
184 facotry,
185 );
186
187 builder
188 .collect_eq(&field_column(), &string_lit("abc"))
189 .unwrap();
190
191 let predicates = builder.output.get(&3).unwrap();
192 assert_eq!(predicates.len(), 1);
193 assert_eq!(
194 predicates[0],
195 Predicate::InList(InListPredicate {
196 list: BTreeSet::from_iter([encoded_string("abc")])
197 })
198 );
199 }
200
201 #[test]
202 fn test_collect_eq_nonexistent_column() {
203 let (_d, facotry) =
204 PuffinManagerFactory::new_for_test_block("test_collect_eq_nonexistent_column_");
205 let metadata = test_region_metadata();
206 let mut builder = InvertedIndexApplierBuilder::new(
207 "test".to_string(),
208 PathType::Bare,
209 test_object_store(),
210 &metadata,
211 HashSet::from_iter([1, 2, 3]),
212 facotry,
213 );
214
215 let res = builder.collect_eq(&nonexistent_column(), &string_lit("abc"));
216 assert!(matches!(res, Err(Error::ColumnNotFound { .. })));
217 assert!(builder.output.is_empty());
218 }
219
220 #[test]
221 fn test_collect_eq_type_mismatch() {
222 let (_d, facotry) =
223 PuffinManagerFactory::new_for_test_block("test_collect_eq_type_mismatch_");
224 let metadata = test_region_metadata();
225 let mut builder = InvertedIndexApplierBuilder::new(
226 "test".to_string(),
227 PathType::Bare,
228 test_object_store(),
229 &metadata,
230 HashSet::from_iter([1, 2, 3]),
231 facotry,
232 );
233
234 let res = builder.collect_eq(&tag_column(), &int64_lit(1));
235 assert!(matches!(res, Err(Error::Encode { .. })));
236 assert!(builder.output.is_empty());
237 }
238
239 #[test]
240 fn test_collect_or_eq_list_basic() {
241 let (_d, facotry) =
242 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_basic_");
243 let metadata = test_region_metadata();
244 let mut builder = InvertedIndexApplierBuilder::new(
245 "test".to_string(),
246 PathType::Bare,
247 test_object_store(),
248 &metadata,
249 HashSet::from_iter([1, 2, 3]),
250 facotry,
251 );
252
253 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
254 left: Box::new(tag_column()),
255 op: Operator::Eq,
256 right: Box::new(string_lit("abc")),
257 });
258 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
259 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
260 left: Box::new(tag_column()),
261 op: Operator::Eq,
262 right: Box::new(string_lit("foo")),
263 })),
264 op: Operator::Or,
265 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
266 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
267 left: Box::new(tag_column()),
268 op: Operator::Eq,
269 right: Box::new(string_lit("bar")),
270 })),
271 op: Operator::Or,
272 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
273 left: Box::new(tag_column()),
274 op: Operator::Eq,
275 right: Box::new(string_lit("baz")),
276 })),
277 })),
278 });
279
280 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
281
282 let predicates = builder.output.get(&1).unwrap();
283 assert_eq!(predicates.len(), 1);
284 assert_eq!(
285 predicates[0],
286 Predicate::InList(InListPredicate {
287 list: BTreeSet::from_iter([
288 encoded_string("abc"),
289 encoded_string("foo"),
290 encoded_string("bar"),
291 encoded_string("baz")
292 ])
293 })
294 );
295 }
296
297 #[test]
298 fn test_collect_or_eq_list_invalid_op() {
299 let (_d, facotry) =
300 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_invalid_op_");
301 let metadata = test_region_metadata();
302 let mut builder = InvertedIndexApplierBuilder::new(
303 "test".to_string(),
304 PathType::Bare,
305 test_object_store(),
306 &metadata,
307 HashSet::from_iter([1, 2, 3]),
308 facotry,
309 );
310
311 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
312 left: Box::new(tag_column()),
313 op: Operator::Eq,
314 right: Box::new(string_lit("abc")),
315 });
316 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
317 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
318 left: Box::new(tag_column()),
319 op: Operator::Eq,
320 right: Box::new(string_lit("foo")),
321 })),
322 op: Operator::Or,
323 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
324 left: Box::new(tag_column()),
325 op: Operator::Gt, right: Box::new(string_lit("foo")),
327 })),
328 });
329
330 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
331 assert!(builder.output.is_empty());
332 }
333
334 #[test]
335 fn test_collect_or_eq_list_multiple_columns() {
336 let (_d, facotry) =
337 PuffinManagerFactory::new_for_test_block("test_collect_or_eq_list_multiple_columns_");
338 let metadata = test_region_metadata();
339 let mut builder = InvertedIndexApplierBuilder::new(
340 "test".to_string(),
341 PathType::Bare,
342 test_object_store(),
343 &metadata,
344 HashSet::from_iter([1, 2, 3]),
345 facotry,
346 );
347
348 let eq_expr = DfExpr::BinaryExpr(BinaryExpr {
349 left: Box::new(tag_column()),
350 op: Operator::Eq,
351 right: Box::new(string_lit("abc")),
352 });
353 let or_eq_list = DfExpr::BinaryExpr(BinaryExpr {
354 left: Box::new(DfExpr::BinaryExpr(BinaryExpr {
355 left: Box::new(tag_column()),
356 op: Operator::Eq,
357 right: Box::new(string_lit("foo")),
358 })),
359 op: Operator::Or,
360 right: Box::new(DfExpr::BinaryExpr(BinaryExpr {
361 left: Box::new(tag_column2()), op: Operator::Eq,
363 right: Box::new(string_lit("bar")),
364 })),
365 });
366
367 builder.collect_or_eq_list(&eq_expr, &or_eq_list).unwrap();
368 assert!(builder.output.is_empty());
369 }
370}