catalog/system_schema/
predicate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use arrow::array::StringArray;
16use arrow::compute::kernels::comparison;
17use datafusion::common::ScalarValue;
18use datafusion::logical_expr::expr::Like;
19use datafusion::logical_expr::{Expr, Operator};
20use datatypes::value::Value;
21use store_api::storage::ScanRequest;
22
23type ColumnName = String;
24/// Predicate to filter `information_schema` tables stream,
25/// we only support these simple predicates currently.
26/// TODO(dennis): supports more predicate types.
27#[derive(Clone, PartialEq, Eq, Debug)]
28pub(crate) enum Predicate {
29    Eq(ColumnName, Value),
30    Like(ColumnName, String, bool),
31    NotEq(ColumnName, Value),
32    InList(ColumnName, Vec<Value>),
33    And(Box<Predicate>, Box<Predicate>),
34    Or(Box<Predicate>, Box<Predicate>),
35    Not(Box<Predicate>),
36}
37
38impl Predicate {
39    /// Evaluate the predicate with the row, returns:
40    /// - `None` when the predicate can't evaluate with the row.
41    /// - `Some(true)` when the predicate is satisfied,
42    /// - `Some(false)` when the predicate is not satisfied,
43    fn eval(&self, row: &[(&str, &Value)]) -> Option<bool> {
44        match self {
45            Predicate::Eq(c, v) => {
46                for (column, value) in row {
47                    if c != column {
48                        continue;
49                    }
50                    return Some(v == *value);
51                }
52            }
53            Predicate::Like(c, pattern, case_insensitive) => {
54                for (column, value) in row {
55                    if c != column {
56                        continue;
57                    }
58
59                    let Value::String(bs) = value else {
60                        continue;
61                    };
62
63                    return like_utf8(bs.as_utf8(), pattern, case_insensitive);
64                }
65            }
66            Predicate::NotEq(c, v) => {
67                for (column, value) in row {
68                    if c != column {
69                        continue;
70                    }
71                    return Some(v != *value);
72                }
73            }
74            Predicate::InList(c, values) => {
75                for (column, value) in row {
76                    if c != column {
77                        continue;
78                    }
79                    return Some(values.iter().any(|v| v == *value));
80                }
81            }
82            Predicate::And(left, right) => {
83                let left = left.eval(row);
84
85                // short-circuit
86                if matches!(left, Some(false)) {
87                    return Some(false);
88                }
89
90                return match (left, right.eval(row)) {
91                    (Some(left), Some(right)) => Some(left && right),
92                    (None, Some(false)) => Some(false),
93                    _ => None,
94                };
95            }
96            Predicate::Or(left, right) => {
97                let left = left.eval(row);
98
99                // short-circuit
100                if matches!(left, Some(true)) {
101                    return Some(true);
102                }
103
104                return match (left, right.eval(row)) {
105                    (Some(left), Some(right)) => Some(left || right),
106                    (None, Some(true)) => Some(true),
107                    _ => None,
108                };
109            }
110            Predicate::Not(p) => {
111                return Some(!p.eval(row)?);
112            }
113        }
114
115        // Can't evaluate predicate with the row
116        None
117    }
118
119    /// Try to create a predicate from datafusion [`Expr`], return None if fails.
120    fn from_expr(expr: Expr) -> Option<Predicate> {
121        match expr {
122            // NOT expr
123            Expr::Not(expr) => Some(Predicate::Not(Box::new(Self::from_expr(*expr)?))),
124            // expr LIKE pattern
125            Expr::Like(Like {
126                negated,
127                expr,
128                pattern,
129                case_insensitive,
130                ..
131            }) if is_column(&expr) && is_string_literal(&pattern) => {
132                // Safety: ensured by gurad
133                let Expr::Column(c) = *expr else {
134                    unreachable!();
135                };
136                let Expr::Literal(ScalarValue::Utf8(Some(pattern))) = *pattern else {
137                    unreachable!();
138                };
139
140                let p = Predicate::Like(c.name, pattern, case_insensitive);
141
142                if negated {
143                    Some(Predicate::Not(Box::new(p)))
144                } else {
145                    Some(p)
146                }
147            }
148            // left OP right
149            Expr::BinaryExpr(bin) => match (*bin.left, bin.op, *bin.right) {
150                // left == right
151                (Expr::Literal(scalar), Operator::Eq, Expr::Column(c))
152                | (Expr::Column(c), Operator::Eq, Expr::Literal(scalar)) => {
153                    let Ok(v) = Value::try_from(scalar) else {
154                        return None;
155                    };
156
157                    Some(Predicate::Eq(c.name, v))
158                }
159                // left != right
160                (Expr::Literal(scalar), Operator::NotEq, Expr::Column(c))
161                | (Expr::Column(c), Operator::NotEq, Expr::Literal(scalar)) => {
162                    let Ok(v) = Value::try_from(scalar) else {
163                        return None;
164                    };
165
166                    Some(Predicate::NotEq(c.name, v))
167                }
168                // left AND right
169                (left, Operator::And, right) => {
170                    let left = Self::from_expr(left)?;
171                    let right = Self::from_expr(right)?;
172
173                    Some(Predicate::And(Box::new(left), Box::new(right)))
174                }
175                // left OR right
176                (left, Operator::Or, right) => {
177                    let left = Self::from_expr(left)?;
178                    let right = Self::from_expr(right)?;
179
180                    Some(Predicate::Or(Box::new(left), Box::new(right)))
181                }
182                _ => None,
183            },
184            // [NOT] IN (LIST)
185            Expr::InList(list) => {
186                match (*list.expr, list.list, list.negated) {
187                    // column [NOT] IN (v1, v2, v3, ...)
188                    (Expr::Column(c), list, negated) if is_all_scalars(&list) => {
189                        let mut values = Vec::with_capacity(list.len());
190                        for scalar in list {
191                            // Safety: checked by `is_all_scalars`
192                            let Expr::Literal(scalar) = scalar else {
193                                unreachable!();
194                            };
195
196                            let Ok(value) = Value::try_from(scalar) else {
197                                return None;
198                            };
199
200                            values.push(value);
201                        }
202
203                        let predicate = Predicate::InList(c.name, values);
204
205                        if negated {
206                            Some(Predicate::Not(Box::new(predicate)))
207                        } else {
208                            Some(predicate)
209                        }
210                    }
211                    _ => None,
212                }
213            }
214            _ => None,
215        }
216    }
217}
218
219/// Perform SQL left LIKE right, return `None` if fail to evaluate.
220/// - `s` the target string
221/// - `pattern` the pattern just like '%abc'
222/// - `case_insensitive` whether to perform case-insensitive like or not.
223fn like_utf8(s: &str, pattern: &str, case_insensitive: &bool) -> Option<bool> {
224    let array = StringArray::from(vec![s]);
225    let patterns = StringArray::new_scalar(pattern);
226
227    let Ok(booleans) = (if *case_insensitive {
228        comparison::ilike(&array, &patterns)
229    } else {
230        comparison::like(&array, &patterns)
231    }) else {
232        return None;
233    };
234
235    // Safety: at least one value in result
236    Some(booleans.value(0))
237}
238
239fn is_string_literal(expr: &Expr) -> bool {
240    matches!(expr, Expr::Literal(ScalarValue::Utf8(Some(_))))
241}
242
243fn is_column(expr: &Expr) -> bool {
244    matches!(expr, Expr::Column(_))
245}
246
247/// A list of predicate
248pub struct Predicates {
249    predicates: Vec<Predicate>,
250}
251
252impl Predicates {
253    /// Try its best to create predicates from [`ScanRequest`].
254    pub fn from_scan_request(request: &Option<ScanRequest>) -> Predicates {
255        if let Some(request) = request {
256            let mut predicates = Vec::with_capacity(request.filters.len());
257
258            for filter in &request.filters {
259                if let Some(predicate) = Predicate::from_expr(filter.clone()) {
260                    predicates.push(predicate);
261                }
262            }
263
264            Self { predicates }
265        } else {
266            Self {
267                predicates: Vec::new(),
268            }
269        }
270    }
271
272    /// Evaluate the predicates with the row.
273    /// returns true when all the predicates are satisfied or can't be evaluated.
274    pub fn eval(&self, row: &[(&str, &Value)]) -> bool {
275        // fast path
276        if self.predicates.is_empty() {
277            return true;
278        }
279
280        self.predicates
281            .iter()
282            .filter_map(|p| p.eval(row))
283            .all(|b| b)
284    }
285}
286
287/// Returns true when the values are all [`DfExpr::Literal`].
288fn is_all_scalars(list: &[Expr]) -> bool {
289    list.iter().all(|v| matches!(v, Expr::Literal(_)))
290}
291
292#[cfg(test)]
293mod tests {
294    use datafusion::common::{Column, ScalarValue};
295    use datafusion::logical_expr::expr::InList;
296    use datafusion::logical_expr::BinaryExpr;
297
298    use super::*;
299
300    #[test]
301    fn test_predicate_eval() {
302        let a_col = "a".to_string();
303        let b_col = "b".to_string();
304        let a_value = Value::from("a_value");
305        let b_value = Value::from("b_value");
306        let wrong_value = Value::from("wrong_value");
307
308        let a_row = [(a_col.as_str(), &a_value)];
309        let b_row = [("b", &wrong_value)];
310        let wrong_row = [(a_col.as_str(), &wrong_value)];
311
312        // Predicate::Eq
313        let p = Predicate::Eq(a_col.clone(), a_value.clone());
314        assert!(p.eval(&a_row).unwrap());
315        assert!(p.eval(&b_row).is_none());
316        assert!(!p.eval(&wrong_row).unwrap());
317
318        // Predicate::NotEq
319        let p = Predicate::NotEq(a_col.clone(), a_value.clone());
320        assert!(!p.eval(&a_row).unwrap());
321        assert!(p.eval(&b_row).is_none());
322        assert!(p.eval(&wrong_row).unwrap());
323
324        // Predicate::InList
325        let p = Predicate::InList(a_col.clone(), vec![a_value.clone(), b_value.clone()]);
326        assert!(p.eval(&a_row).unwrap());
327        assert!(p.eval(&b_row).is_none());
328        assert!(!p.eval(&wrong_row).unwrap());
329        assert!(p.eval(&[(&a_col, &b_value)]).unwrap());
330
331        let p1 = Predicate::Eq(a_col.clone(), a_value.clone());
332        let p2 = Predicate::Eq(b_col.clone(), b_value.clone());
333        let row = [(a_col.as_str(), &a_value), (b_col.as_str(), &b_value)];
334        let wrong_row = [(a_col.as_str(), &a_value), (b_col.as_str(), &wrong_value)];
335
336        //Predicate::And
337        let p = Predicate::And(Box::new(p1.clone()), Box::new(p2.clone()));
338        assert!(p.eval(&row).unwrap());
339        assert!(!p.eval(&wrong_row).unwrap());
340        assert!(p.eval(&[]).is_none());
341        assert!(p.eval(&[("c", &a_value)]).is_none());
342        assert!(!p
343            .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
344            .unwrap());
345        assert!(!p
346            .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
347            .unwrap());
348        assert!(p
349            .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
350            .is_none());
351        assert!(!p
352            .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
353            .unwrap());
354
355        //Predicate::Or
356        let p = Predicate::Or(Box::new(p1), Box::new(p2));
357        assert!(p.eval(&row).unwrap());
358        assert!(p.eval(&wrong_row).unwrap());
359        assert!(p.eval(&[]).is_none());
360        assert!(p.eval(&[("c", &a_value)]).is_none());
361        assert!(!p
362            .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &a_value)])
363            .unwrap());
364        assert!(p
365            .eval(&[(a_col.as_str(), &b_value), (b_col.as_str(), &b_value)])
366            .unwrap());
367        assert!(p
368            .eval(&[(a_col.as_ref(), &a_value), ("c", &a_value)])
369            .unwrap());
370        assert!(p
371            .eval(&[(a_col.as_ref(), &b_value), ("c", &a_value)])
372            .is_none());
373    }
374
375    #[test]
376    fn test_predicate_like() {
377        // case insensitive
378        let expr = Expr::Like(Like {
379            negated: false,
380            expr: Box::new(column("a")),
381            pattern: Box::new(string_literal("%abc")),
382            case_insensitive: true,
383            escape_char: None,
384        });
385
386        let p = Predicate::from_expr(expr).unwrap();
387        assert!(
388            matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
389                         c == "a"
390                         && pattern == "%abc"
391                         && *case_insensitive)
392        );
393
394        let match_row = [
395            ("a", &Value::from("hello AbC")),
396            ("b", &Value::from("b value")),
397        ];
398        let unmatch_row = [("a", &Value::from("bca")), ("b", &Value::from("b value"))];
399
400        assert!(p.eval(&match_row).unwrap());
401        assert!(!p.eval(&unmatch_row).unwrap());
402        assert!(p.eval(&[]).is_none());
403
404        // case sensitive
405        let expr = Expr::Like(Like {
406            negated: false,
407            expr: Box::new(column("a")),
408            pattern: Box::new(string_literal("%abc")),
409            case_insensitive: false,
410            escape_char: None,
411        });
412
413        let p = Predicate::from_expr(expr).unwrap();
414        assert!(
415            matches!(&p, Predicate::Like(c, pattern, case_insensitive) if
416                         c == "a"
417                         && pattern == "%abc"
418                         && !*case_insensitive)
419        );
420        assert!(!p.eval(&match_row).unwrap());
421        assert!(!p.eval(&unmatch_row).unwrap());
422        assert!(p.eval(&[]).is_none());
423
424        // not like
425        let expr = Expr::Like(Like {
426            negated: true,
427            expr: Box::new(column("a")),
428            pattern: Box::new(string_literal("%abc")),
429            case_insensitive: true,
430            escape_char: None,
431        });
432
433        let p = Predicate::from_expr(expr).unwrap();
434        assert!(!p.eval(&match_row).unwrap());
435        assert!(p.eval(&unmatch_row).unwrap());
436        assert!(p.eval(&[]).is_none());
437    }
438
439    fn column(name: &str) -> Expr {
440        Expr::Column(Column::from_name(name))
441    }
442
443    fn string_literal(v: &str) -> Expr {
444        Expr::Literal(ScalarValue::Utf8(Some(v.to_string())))
445    }
446
447    fn match_string_value(v: &Value, expected: &str) -> bool {
448        matches!(v, Value::String(bs) if bs.as_utf8() == expected)
449    }
450
451    fn match_string_values(vs: &[Value], expected: &[&str]) -> bool {
452        assert_eq!(vs.len(), expected.len());
453
454        let mut result = true;
455        for (i, v) in vs.iter().enumerate() {
456            result = result && match_string_value(v, expected[i]);
457        }
458
459        result
460    }
461
462    fn mock_exprs() -> (Expr, Expr) {
463        let expr1 = Expr::BinaryExpr(BinaryExpr {
464            left: Box::new(column("a")),
465            op: Operator::Eq,
466            right: Box::new(string_literal("a_value")),
467        });
468
469        let expr2 = Expr::BinaryExpr(BinaryExpr {
470            left: Box::new(column("b")),
471            op: Operator::NotEq,
472            right: Box::new(string_literal("b_value")),
473        });
474
475        (expr1, expr2)
476    }
477
478    #[test]
479    fn test_predicate_from_expr() {
480        let (expr1, expr2) = mock_exprs();
481
482        let p1 = Predicate::from_expr(expr1.clone()).unwrap();
483        assert!(matches!(&p1, Predicate::Eq(column, v) if column == "a"
484                         && match_string_value(v, "a_value")));
485
486        let p2 = Predicate::from_expr(expr2.clone()).unwrap();
487        assert!(matches!(&p2, Predicate::NotEq(column, v) if column == "b"
488                         && match_string_value(v, "b_value")));
489
490        let and_expr = Expr::BinaryExpr(BinaryExpr {
491            left: Box::new(expr1.clone()),
492            op: Operator::And,
493            right: Box::new(expr2.clone()),
494        });
495        let or_expr = Expr::BinaryExpr(BinaryExpr {
496            left: Box::new(expr1.clone()),
497            op: Operator::Or,
498            right: Box::new(expr2.clone()),
499        });
500        let not_expr = Expr::Not(Box::new(expr1.clone()));
501
502        let and_p = Predicate::from_expr(and_expr).unwrap();
503        assert!(matches!(and_p, Predicate::And(left, right) if *left == p1 && *right == p2));
504        let or_p = Predicate::from_expr(or_expr).unwrap();
505        assert!(matches!(or_p, Predicate::Or(left, right) if *left == p1 && *right == p2));
506        let not_p = Predicate::from_expr(not_expr).unwrap();
507        assert!(matches!(not_p, Predicate::Not(p) if *p == p1));
508
509        let inlist_expr = Expr::InList(InList {
510            expr: Box::new(column("a")),
511            list: vec![string_literal("a1"), string_literal("a2")],
512            negated: false,
513        });
514
515        let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
516        assert!(matches!(&inlist_p, Predicate::InList(c, values) if c == "a"
517                         && match_string_values(values, &["a1", "a2"])));
518
519        let inlist_expr = Expr::InList(InList {
520            expr: Box::new(column("a")),
521            list: vec![string_literal("a1"), string_literal("a2")],
522            negated: true,
523        });
524        let inlist_p = Predicate::from_expr(inlist_expr).unwrap();
525        assert!(matches!(inlist_p, Predicate::Not(p) if
526                         matches!(&*p,
527                                  Predicate::InList(c, values) if c == "a"
528                                  && match_string_values(values, &["a1", "a2"]))));
529    }
530
531    #[test]
532    fn test_predicates_from_scan_request() {
533        let predicates = Predicates::from_scan_request(&None);
534        assert!(predicates.predicates.is_empty());
535
536        let (expr1, expr2) = mock_exprs();
537
538        let request = ScanRequest {
539            filters: vec![expr1, expr2],
540            ..Default::default()
541        };
542        let predicates = Predicates::from_scan_request(&Some(request));
543
544        assert_eq!(2, predicates.predicates.len());
545        assert!(
546            matches!(&predicates.predicates[0], Predicate::Eq(column, v) if column == "a"
547                     && match_string_value(v, "a_value"))
548        );
549        assert!(
550            matches!(&predicates.predicates[1], Predicate::NotEq(column, v) if column == "b"
551                     && match_string_value(v, "b_value"))
552        );
553    }
554
555    #[test]
556    fn test_predicates_eval_row() {
557        let wrong_row = [
558            ("a", &Value::from("a_value")),
559            ("b", &Value::from("b_value")),
560            ("c", &Value::from("c_value")),
561        ];
562        let row = [
563            ("a", &Value::from("a_value")),
564            ("b", &Value::from("not_b_value")),
565            ("c", &Value::from("c_value")),
566        ];
567        let c_row = [("c", &Value::from("c_value"))];
568
569        // test empty predicates, always returns true
570        let predicates = Predicates::from_scan_request(&None);
571        assert!(predicates.eval(&row));
572        assert!(predicates.eval(&wrong_row));
573        assert!(predicates.eval(&c_row));
574
575        let (expr1, expr2) = mock_exprs();
576        let request = ScanRequest {
577            filters: vec![expr1, expr2],
578            ..Default::default()
579        };
580        let predicates = Predicates::from_scan_request(&Some(request));
581        assert!(predicates.eval(&row));
582        assert!(!predicates.eval(&wrong_row));
583        assert!(predicates.eval(&c_row));
584    }
585}