1use arrow::array::StringArray;
16use arrow::compute::kernels::comparison;
17use datafusion::common::ScalarValue;
18use datafusion::logical_expr::expr::Like;
19use datafusion::logical_expr::{Expr, Operator};
20use datatypes::value::Value;
21use store_api::storage::ScanRequest;
22
23type ColumnName = String;
24#[derive(Clone, PartialEq, Eq, Debug)]
28pub(crate) enum Predicate {
29 Eq(ColumnName, Value),
30 Like(ColumnName, String, bool),
31 NotEq(ColumnName, Value),
32 InList(ColumnName, Vec<Value>),
33 And(Box<Predicate>, Box<Predicate>),
34 Or(Box<Predicate>, Box<Predicate>),
35 Not(Box<Predicate>),
36}
37
38impl Predicate {
39 fn eval(&self, row: &[(&str, &Value)]) -> Option<bool> {
44 match self {
45 Predicate::Eq(c, v) => {
46 for (column, value) in row {
47 if c != column {
48 continue;
49 }
50 return Some(v == *value);
51 }
52 }
53 Predicate::Like(c, pattern, case_insensitive) => {
54 for (column, value) in row {
55 if c != column {
56 continue;
57 }
58
59 let Value::String(bs) = value else {
60 continue;
61 };
62
63 return like_utf8(bs.as_utf8(), pattern, case_insensitive);
64 }
65 }
66 Predicate::NotEq(c, v) => {
67 for (column, value) in row {
68 if c != column {
69 continue;
70 }
71 return Some(v != *value);
72 }
73 }
74 Predicate::InList(c, values) => {
75 for (column, value) in row {
76 if c != column {
77 continue;
78 }
79 return Some(values.iter().any(|v| v == *value));
80 }
81 }
82 Predicate::And(left, right) => {
83 let left = left.eval(row);
84
85 if matches!(left, Some(false)) {
87 return Some(false);
88 }
89
90 return match (left, right.eval(row)) {
91 (Some(left), Some(right)) => Some(left && right),
92 (None, Some(false)) => Some(false),
93 _ => None,
94 };
95 }
96 Predicate::Or(left, right) => {
97 let left = left.eval(row);
98
99 if matches!(left, Some(true)) {
101 return Some(true);
102 }
103
104 return match (left, right.eval(row)) {
105 (Some(left), Some(right)) => Some(left || right),
106 (None, Some(true)) => Some(true),
107 _ => None,
108 };
109 }
110 Predicate::Not(p) => {
111 return Some(!p.eval(row)?);
112 }
113 }
114
115 None
117 }
118
119 fn from_expr(expr: Expr) -> Option<Predicate> {
121 match expr {
122 Expr::Not(expr) => Some(Predicate::Not(Box::new(Self::from_expr(*expr)?))),
124 Expr::Like(Like {
126 negated,
127 expr,
128 pattern,
129 case_insensitive,
130 ..
131 }) if is_column(&expr) && is_string_literal(&pattern) => {
132 let Expr::Column(c) = *expr else {
134 unreachable!();
135 };
136 let Expr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else {
137 unreachable!();
138 };
139
140 let p = Predicate::Like(c.name, pattern, case_insensitive);
141
142 if negated {
143 Some(Predicate::Not(Box::new(p)))
144 } else {
145 Some(p)
146 }
147 }
148 Expr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) {
150 (Expr::Literal(scalar), Operator::Eq, Expr::Column(c))
152 | (Expr::Column(c), Operator::Eq, Expr::Literal(scalar)) => {
153 let Ok(v) = Value::try_from(scalar) else {
154 return None;
155 };
156
157 Some(Predicate::Eq(c.name, v))
158 }
159 (Expr::Literal(scalar), Operator::NotEq, Expr::Column(c))
161 | (Expr::Column(c), Operator::NotEq, Expr::Literal(scalar)) => {
162 let Ok(v) = Value::try_from(scalar) else {
163 return None;
164 };
165
166 Some(Predicate::NotEq(c.name, v))
167 }
168 (left, Operator::And, right) => {
170 let left = Self::from_expr(left)?;
171 let right = Self::from_expr(right)?;
172
173 Some(Predicate::And(Box::new(left), Box::new(right)))
174 }
175 (left, Operator::Or, right) => {
177 let left = Self::from_expr(left)?;
178 let right = Self::from_expr(right)?;
179
180 Some(Predicate::Or(Box::new(left), Box::new(right)))
181 }
182 _ => None,
183 },
184 Expr::InList(list) => {
186 match (*list.expr, list.list, list.negated) {
187 (Expr::Column(c), list, negated) if is_all_scalars(&list) => {
189 let mut values = Vec::with_capacity(list.len());
190 for scalar in list {
191 let Expr::Literal(scalar) = scalar else {
193 unreachable!();
194 };
195
196 let Ok(value) = Value::try_from(scalar) else {
197 return None;
198 };
199
200 values.push(value);
201 }
202
203 let predicate = Predicate::InList(c.name, values);
204
205 if negated {
206 Some(Predicate::Not(Box::new(predicate)))
207 } else {
208 Some(predicate)
209 }
210 }
211 _ => None,
212 }
213 }
214 _ => None,
215 }
216 }
217}
218
219fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option<bool> {
224 let array = StringArray::from(vec![s]);
225 let patterns = StringArray::new_scalar(pattern);
226
227 let Ok(booleans) = (if *case_insensitive {
228 comparison::ilike(&array, &patterns)
229 } else {
230 comparison::like(&array, &patterns)
231 }) else {
232 return None;
233 };
234
235 Some(booleans.value(0))
237}
238
239fn is_string_literal(expr: &Expr) -> bool {
240 matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(_))))
241}
242
243fn is_column(expr: &Expr) -> bool {
244 matches!(expr, Expr::Column(_))
245}
246
247pub struct Predicates {
249 predicates: Vec<Predicate>,
250}
251
252impl Predicates {
253 pub fn from_scan_request(request: &Option<ScanRequest>) -> Predicates {
255 if let Some(request) = request {
256 let mut predicates = Vec::with_capacity(request.filters.len());
257
258 for filter in &request.filters {
259 if let Some(predicate) = Predicate::from_expr(filter.clone()) {
260 predicates.push(predicate);
261 }
262 }
263
264 Self { predicates }
265 } else {
266 Self {
267 predicates: Vec::new(),
268 }
269 }
270 }
271
272 pub fn eval(&self, row: &[(&str, &Value)]) -> bool {
275 if self.predicates.is_empty() {
277 return true;
278 }
279
280 self.predicates
281 .iter()
282 .filter_map(|p| p.eval(row))
283 .all(|b| b)
284 }
285}
286
287fn is_all_scalars(list: &[Expr]) -> bool {
289 list.iter().all(|v| matches!(v, Expr::Literal(_)))
290}
291
292#[cfg(test)]
293mod tests {
294 use datafusion::common::{Column, ScalarValue};
295 use datafusion::logical_expr::expr::InList;
296 use datafusion::logical_expr::BinaryExpr;
297
298 use super::*;
299
300 #[test]
301 fn test_predicate_eval() {
302 let a_col = "a".to_string();
303 let b_col = "b".to_string();
304 let a_value = Value::from("a_value");
305 let b_value = Value::from("b_value");
306 let wrong_value = Value::from("wrong_value");
307
308 let a_row = [(a_col.as_str(), &a_value)];
309 let b_row = [("b", &wrong_value)];
310 let wrong_row = [(a_col.as_str(), &wrong_value)];
311
312 let p = Predicate::Eq(a_col.clone(), a_value.clone());
314 assert!(p.eval(&a_row).unwrap());
315 assert!(p.eval(&b_row).is_none());
316 assert!(!p.eval(&wrong_row).unwrap());
317
318 let p = Predicate::NotEq(a_col.clone(), a_value.clone());
320 assert!(!p.eval(&a_row).unwrap());
321 assert!(p.eval(&b_row).is_none());
322 assert!(p.eval(&wrong_row).unwrap());
323
324 let p = Predicate::InList(a_col.clone(), vec![a_value.clone(), b_value.clone()]);
326 assert!(p.eval(&a_row).unwrap());
327 assert!(p.eval(&b_row).is_none());
328 assert!(!p.eval(&wrong_row).unwrap());
329 assert!(p.eval(&[(&a_col, &b_value)]).unwrap());
330
331 let p1 = Predicate::Eq(a_col.clone(), a_value.clone());
332 let p2 = Predicate::Eq(b_col.clone(), b_value.clone());
333 let row = [(a_col.as_str(), &a_value), (b_col.as_str(), &b_value)];
334 let wrong_row = [(a_col.as_str(), &a_value), (b_col.as_str(), &wrong_value)];
335
336 let p = Predicate::And(Box::new(p1.clone()), Box::new(p2.clone()));
338 assert!(p.eval(&row).unwrap());
339 assert!(!p.eval(&wrong_row).unwrap());
340 assert!(p.eval(&[]).is_none());
341 assert!(p.eval(&[("c", &a_value)]).is_none());
342 assert!(!p
343 .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
344 .unwrap());
345 assert!(!p
346 .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
347 .unwrap());
348 assert!(p
349 .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
350 .is_none());
351 assert!(!p
352 .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
353 .unwrap());
354
355 let p = Predicate::Or(Box::new(p1), Box::new(p2));
357 assert!(p.eval(&row).unwrap());
358 assert!(p.eval(&wrong_row).unwrap());
359 assert!(p.eval(&[]).is_none());
360 assert!(p.eval(&[("c", &a_value)]).is_none());
361 assert!(!p
362 .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
363 .unwrap());
364 assert!(p
365 .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
366 .unwrap());
367 assert!(p
368 .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
369 .unwrap());
370 assert!(p
371 .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
372 .is_none());
373 }
374
375 #[test]
376 fn test_predicate_like() {
377 let expr = Expr::Like(Like {
379 negated: false,
380 expr: Box::new(column("a")),
381 pattern: Box::new(string_literal("%abc")),
382 case_insensitive: true,
383 escape_char: None,
384 });
385
386 let p = Predicate::from_expr(expr).unwrap();
387 assert!(
388 matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
389 c == "a"
390 && pattern == "%abc"
391 && *case_insensitive)
392 );
393
394 let match_row = [
395 ("a", &Value::from("hello AbC")),
396 ("b", &Value::from("b value")),
397 ];
398 let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))];
399
400 assert!(p.eval(&match_row).unwrap());
401 assert!(!p.eval(&unmatch_row).unwrap());
402 assert!(p.eval(&[]).is_none());
403
404 let expr = Expr::Like(Like {
406 negated: false,
407 expr: Box::new(column("a")),
408 pattern: Box::new(string_literal("%abc")),
409 case_insensitive: false,
410 escape_char: None,
411 });
412
413 let p = Predicate::from_expr(expr).unwrap();
414 assert!(
415 matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
416 c == "a"
417 && pattern == "%abc"
418 && !*case_insensitive)
419 );
420 assert!(!p.eval(&match_row).unwrap());
421 assert!(!p.eval(&unmatch_row).unwrap());
422 assert!(p.eval(&[]).is_none());
423
424 let expr = Expr::Like(Like {
426 negated: true,
427 expr: Box::new(column("a")),
428 pattern: Box::new(string_literal("%abc")),
429 case_insensitive: true,
430 escape_char: None,
431 });
432
433 let p = Predicate::from_expr(expr).unwrap();
434 assert!(!p.eval(&match_row).unwrap());
435 assert!(p.eval(&unmatch_row).unwrap());
436 assert!(p.eval(&[]).is_none());
437 }
438
439 fn column(name: &str) -> Expr {
440 Expr::Column(Column::from_name(name))
441 }
442
443 fn string_literal(v: &str) -> Expr {
444 Expr::Literal(ScalarValue::Utf8(Some(v.to_string())))
445 }
446
447 fn match_string_value(v: &Value, expected: &str) -> bool {
448 matches!(v, Value::String(bs) if bs.as_utf8() == expected)
449 }
450
451 fn match_string_values(vs: &[Value], expected: &[&str]) -> bool {
452 assert_eq!(vs.len(), expected.len());
453
454 let mut result = true;
455 for (i, v) in vs.iter().enumerate() {
456 result = result && match_string_value(v, expected[i]);
457 }
458
459 result
460 }
461
462 fn mock_exprs() -> (Expr, Expr) {
463 let expr1 = Expr::BinaryExpr(BinaryExpr {
464 left: Box::new(column("a")),
465 op: Operator::Eq,
466 right: Box::new(string_literal("a_value")),
467 });
468
469 let expr2 = Expr::BinaryExpr(BinaryExpr {
470 left: Box::new(column("b")),
471 op: Operator::NotEq,
472 right: Box::new(string_literal("b_value")),
473 });
474
475 (expr1, expr2)
476 }
477
478 #[test]
479 fn test_predicate_from_expr() {
480 let (expr1, expr2) = mock_exprs();
481
482 let p1 = Predicate::from_expr(expr1.clone()).unwrap();
483 assert!(matches!(&p1, Predicate::Eq(column, v) if column == "a"
484 && match_string_value(v, "a_value")));
485
486 let p2 = Predicate::from_expr(expr2.clone()).unwrap();
487 assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b"
488 && match_string_value(v, "b_value")));
489
490 let and_expr = Expr::BinaryExpr(BinaryExpr {
491 left: Box::new(expr1.clone()),
492 op: Operator::And,
493 right: Box::new(expr2.clone()),
494 });
495 let or_expr = Expr::BinaryExpr(BinaryExpr {
496 left: Box::new(expr1.clone()),
497 op: Operator::Or,
498 right: Box::new(expr2.clone()),
499 });
500 let not_expr = Expr::Not(Box::new(expr1.clone()));
501
502 let and_p = Predicate::from_expr(and_expr).unwrap();
503 assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2));
504 let or_p = Predicate::from_expr(or_expr).unwrap();
505 assert!(matches!(or_p, Predicate::Or(left, right) if *left == p1 && *right == p2));
506 let not_p = Predicate::from_expr(not_expr).unwrap();
507 assert!(matches!(not_p, Predicate::Not(p) if *p == p1));
508
509 let inlist_expr = Expr::InList(InList {
510 expr: Box::new(column("a")),
511 list: vec![string_literal("a1"), string_literal("a2")],
512 negated: false,
513 });
514
515 let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
516 assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a"
517 && match_string_values(values, &["a1", "a2"])));
518
519 let inlist_expr = Expr::InList(InList {
520 expr: Box::new(column("a")),
521 list: vec![string_literal("a1"), string_literal("a2")],
522 negated: true,
523 });
524 let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
525 assert!(matches!(inlist_p, Predicate::Not(p) if
526 matches!(&*p,
527 Predicate::InList(c, values) if c == "a"
528 && match_string_values(values, &["a1", "a2"]))));
529 }
530
531 #[test]
532 fn test_predicates_from_scan_request() {
533 let predicates = Predicates::from_scan_request(&None);
534 assert!(predicates.predicates.is_empty());
535
536 let (expr1, expr2) = mock_exprs();
537
538 let request = ScanRequest {
539 filters: vec![expr1, expr2],
540 ..Default::default()
541 };
542 let predicates = Predicates::from_scan_request(&Some(request));
543
544 assert_eq!(2, predicates.predicates.len());
545 assert!(
546 matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a"
547 && match_string_value(v, "a_value"))
548 );
549 assert!(
550 matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b"
551 && match_string_value(v, "b_value"))
552 );
553 }
554
555 #[test]
556 fn test_predicates_eval_row() {
557 let wrong_row = [
558 ("a", &Value::from("a_value")),
559 ("b", &Value::from("b_value")),
560 ("c", &Value::from("c_value")),
561 ];
562 let row = [
563 ("a", &Value::from("a_value")),
564 ("b", &Value::from("not_b_value")),
565 ("c", &Value::from("c_value")),
566 ];
567 let c_row = [("c", &Value::from("c_value"))];
568
569 let predicates = Predicates::from_scan_request(&None);
571 assert!(predicates.eval(&row));
572 assert!(predicates.eval(&wrong_row));
573 assert!(predicates.eval(&c_row));
574
575 let (expr1, expr2) = mock_exprs();
576 let request = ScanRequest {
577 filters: vec![expr1, expr2],
578 ..Default::default()
579 };
580 let predicates = Predicates::from_scan_request(&Some(request));
581 assert!(predicates.eval(&row));
582 assert!(!predicates.eval(&wrong_row));
583 assert!(predicates.eval(&c_row));
584 }
585}