partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::Expr;
21use datafusion_expr::execution_props::ExecutionProps;
22use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
23use datatypes::arrow;
24use datatypes::value::{
25    Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
36/// by [`Self::to_parser_expr`].
37///
38/// [Expr]: sqlparser::ast::Expr
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41    pub lhs: Box<Operand>,
42    pub op: RestrictedOp,
43    pub rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48    Column(String),
49    Value(Value),
50    Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54    Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58    fn from(value: Value) -> Self {
59        Operand::Value(value)
60    }
61}
62
63impl Operand {
64    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65        match self {
66            Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67            Self::Value(v) => {
68                let scalar_value = match v {
69                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83                    Value::Null => ScalarValue::Null,
84                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88                    Value::IntervalMonthDayNano(v) => {
89                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90                    }
91                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92                    Value::Decimal128(d) => {
93                        let (v, p, s) = d.to_scalar_value();
94                        ScalarValue::Decimal128(v, p, s)
95                    }
96                    other => {
97                        return error::UnsupportedPartitionExprValueSnafu {
98                            value: other.clone(),
99                        }
100                        .fail();
101                    }
102                };
103                Ok(datafusion_expr::lit(scalar_value))
104            }
105            Self::Expr(e) => e.try_as_logical_expr(),
106        }
107    }
108
109    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111    }
112
113    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115    }
116
117    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119    }
120
121    pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122        PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123    }
124
125    pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126        PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127    }
128
129    pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130        PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131    }
132}
133
134impl Display for Operand {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::Column(v) => write!(f, "{v}"),
138            Self::Value(v) => write!(f, "{v}"),
139            Self::Expr(v) => write!(f, "{v}"),
140        }
141    }
142}
143
144/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
145/// partition expressions.
146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148    // Evaluate to binary
149    Eq,
150    NotEq,
151    Lt,
152    LtEq,
153    Gt,
154    GtEq,
155
156    // Conjunction
157    And,
158    Or,
159}
160
161impl RestrictedOp {
162    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163        match op {
164            ParserBinaryOperator::Eq => Some(Self::Eq),
165            ParserBinaryOperator::NotEq => Some(Self::NotEq),
166            ParserBinaryOperator::Lt => Some(Self::Lt),
167            ParserBinaryOperator::LtEq => Some(Self::LtEq),
168            ParserBinaryOperator::Gt => Some(Self::Gt),
169            ParserBinaryOperator::GtEq => Some(Self::GtEq),
170            ParserBinaryOperator::And => Some(Self::And),
171            ParserBinaryOperator::Or => Some(Self::Or),
172            _ => None,
173        }
174    }
175
176    pub fn to_parser_op(&self) -> ParserBinaryOperator {
177        match self {
178            Self::Eq => ParserBinaryOperator::Eq,
179            Self::NotEq => ParserBinaryOperator::NotEq,
180            Self::Lt => ParserBinaryOperator::Lt,
181            Self::LtEq => ParserBinaryOperator::LtEq,
182            Self::Gt => ParserBinaryOperator::Gt,
183            Self::GtEq => ParserBinaryOperator::GtEq,
184            Self::And => ParserBinaryOperator::And,
185            Self::Or => ParserBinaryOperator::Or,
186        }
187    }
188}
189impl Display for RestrictedOp {
190    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191        match self {
192            Self::Eq => write!(f, "="),
193            Self::NotEq => write!(f, "<>"),
194            Self::Lt => write!(f, "<"),
195            Self::LtEq => write!(f, "<="),
196            Self::Gt => write!(f, ">"),
197            Self::GtEq => write!(f, ">="),
198            Self::And => write!(f, "AND"),
199            Self::Or => write!(f, "OR"),
200        }
201    }
202}
203
204impl PartitionExpr {
205    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206        Self {
207            lhs: Box::new(lhs),
208            op,
209            rhs: Box::new(rhs),
210        }
211    }
212
213    /// Convert [Self] back to sqlparser's [Expr]
214    ///
215    /// [Expr]: ParserExpr
216    pub fn to_parser_expr(&self) -> ParserExpr {
217        // Safety: Partition rule won't contains unsupported value type.
218        // Otherwise it will be rejected by the parser.
219        let lhs = match &*self.lhs {
220            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
222            Operand::Expr(e) => e.to_parser_expr(),
223        };
224
225        let rhs = match &*self.rhs {
226            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
228            Operand::Expr(e) => e.to_parser_expr(),
229        };
230
231        ParserExpr::BinaryOp {
232            left: Box::new(lhs),
233            op: self.op.to_parser_op(),
234            right: Box::new(rhs),
235        }
236    }
237
238    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239        // Special handling for null equality.
240        // `col = NULL` -> `col IS NULL` to match SQL (DataFusion) semantics.
241        let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
242        let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
243
244        match (self.op.clone(), lhs_is_null, rhs_is_null) {
245            (RestrictedOp::Eq, _, true) => {
246                return Ok(self.lhs.try_as_logical_expr()?.is_null());
247            }
248            (RestrictedOp::Eq, true, _) => {
249                return Ok(self.rhs.try_as_logical_expr()?.is_null());
250            }
251            (RestrictedOp::NotEq, _, true) => {
252                return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
253            }
254            (RestrictedOp::NotEq, true, _) => {
255                return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
256            }
257            _ => {}
258        }
259
260        if matches!(
261            self.op,
262            RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
263        ) {
264            if matches!(self.lhs.as_ref(), Operand::Column(_)) {
265                let column_expr = self.lhs.try_as_logical_expr()?;
266                let other_expr = self.rhs.try_as_logical_expr()?;
267                let base = match self.op {
268                    RestrictedOp::Lt => column_expr.clone().lt(other_expr),
269                    RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr),
270                    RestrictedOp::Gt => column_expr.clone().gt(other_expr),
271                    RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr),
272                    _ => unreachable!(),
273                };
274                return Ok(datafusion_expr::or(base, column_expr.is_null()));
275            } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
276                let other_expr = self.lhs.try_as_logical_expr()?;
277                let column_expr = self.rhs.try_as_logical_expr()?;
278                let base = match self.op {
279                    RestrictedOp::Lt => other_expr.lt(column_expr.clone()),
280                    RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()),
281                    RestrictedOp::Gt => other_expr.gt(column_expr.clone()),
282                    RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()),
283                    _ => unreachable!(),
284                };
285                return Ok(datafusion_expr::or(base, column_expr.is_null()));
286            }
287        }
288
289        // Normal cases handling, without NULL
290        let lhs = self.lhs.try_as_logical_expr()?;
291        let rhs = self.rhs.try_as_logical_expr()?;
292
293        let expr = match &self.op {
294            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
295            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
296            RestrictedOp::Gt => lhs.gt(rhs),
297            RestrictedOp::GtEq => lhs.gt_eq(rhs),
298            RestrictedOp::Lt => lhs.lt(rhs),
299            RestrictedOp::LtEq => lhs.lt_eq(rhs),
300            RestrictedOp::Eq => lhs.eq(rhs),
301            RestrictedOp::NotEq => lhs.not_eq(rhs),
302        };
303        Ok(expr)
304    }
305
306    /// Get the left-hand side operand
307    pub fn lhs(&self) -> &Operand {
308        &self.lhs
309    }
310
311    /// Get the right-hand side operand
312    pub fn rhs(&self) -> &Operand {
313        &self.rhs
314    }
315
316    /// Get the operation
317    pub fn op(&self) -> &RestrictedOp {
318        &self.op
319    }
320
321    pub fn try_as_physical_expr(
322        &self,
323        schema: &arrow::datatypes::SchemaRef,
324    ) -> error::Result<Arc<dyn PhysicalExpr>> {
325        let df_schema = schema
326            .clone()
327            .to_dfschema_ref()
328            .context(error::ToDFSchemaSnafu)?;
329        let execution_props = &ExecutionProps::default();
330        let expr = self.try_as_logical_expr()?;
331        create_physical_expr(&expr, &df_schema, execution_props)
332            .context(error::CreatePhysicalExprSnafu)
333    }
334
335    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
336        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
337    }
338
339    /// Serializes `PartitionExpr` to json string.
340    ///
341    /// Wraps `PartitionBound::Expr` for compatibility.
342    pub fn as_json_str(&self) -> error::Result<String> {
343        serde_json::to_string(&PartitionBound::Expr(self.clone()))
344            .context(error::SerializeJsonSnafu)
345    }
346
347    /// Deserializes `PartitionExpr` from json string.
348    ///
349    /// Deserializes to `PartitionBound` for compatibility.
350    pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
351        if s.is_empty() {
352            return Ok(None);
353        }
354
355        let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
356        match bound {
357            PartitionBound::Expr(expr) => Ok(Some(expr)),
358            _ => Ok(None),
359        }
360    }
361
362    /// Converts [Self] to [Partition].
363    pub fn as_pb_partition(&self) -> error::Result<Partition> {
364        Ok(Partition {
365            expression: self.as_json_str()?,
366            ..Default::default()
367        })
368    }
369}
370
371impl Display for PartitionExpr {
372    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
373        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn test_partition_expr() {
383        let cases = [
384            (
385                Operand::Column("a".to_string()),
386                RestrictedOp::Eq,
387                Operand::Value(Value::UInt32(10)),
388                "a = 10",
389            ),
390            (
391                Operand::Column("a".to_string()),
392                RestrictedOp::NotEq,
393                Operand::Value(Value::UInt32(10)),
394                "a <> 10",
395            ),
396            (
397                Operand::Column("a".to_string()),
398                RestrictedOp::Lt,
399                Operand::Value(Value::UInt32(10)),
400                "a < 10",
401            ),
402            (
403                Operand::Column("a".to_string()),
404                RestrictedOp::LtEq,
405                Operand::Value(Value::UInt32(10)),
406                "a <= 10",
407            ),
408            (
409                Operand::Column("a".to_string()),
410                RestrictedOp::Gt,
411                Operand::Value(Value::UInt32(10)),
412                "a > 10",
413            ),
414            (
415                Operand::Column("a".to_string()),
416                RestrictedOp::GtEq,
417                Operand::Value(Value::UInt32(10)),
418                "a >= 10",
419            ),
420            (
421                Operand::Column("a".to_string()),
422                RestrictedOp::And,
423                Operand::Column("b".to_string()),
424                "a AND b",
425            ),
426            (
427                Operand::Column("a".to_string()),
428                RestrictedOp::Or,
429                Operand::Column("b".to_string()),
430                "a OR b",
431            ),
432            (
433                Operand::Column("a".to_string()),
434                RestrictedOp::Or,
435                Operand::Expr(PartitionExpr::new(
436                    Operand::Column("c".to_string()),
437                    RestrictedOp::And,
438                    Operand::Column("d".to_string()),
439                )),
440                "a OR c AND d",
441            ),
442        ];
443
444        for case in cases {
445            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
446            assert_eq!(case.3, expr.to_string());
447        }
448    }
449
450    #[test]
451    fn test_try_as_logical_expr_null_equality() {
452        let eq_expr = PartitionExpr::new(
453            Operand::Column("a".to_string()),
454            RestrictedOp::Eq,
455            Operand::Value(Value::Null),
456        );
457        assert_eq!(
458            eq_expr.try_as_logical_expr().unwrap().to_string(),
459            "a IS NULL"
460        );
461
462        let neq_expr = PartitionExpr::new(
463            Operand::Column("a".to_string()),
464            RestrictedOp::NotEq,
465            Operand::Value(Value::Null),
466        );
467        assert_eq!(
468            neq_expr.try_as_logical_expr().unwrap().to_string(),
469            "a IS NOT NULL"
470        );
471    }
472
473    #[test]
474    fn test_try_as_logical_expr_null_range_comparison() {
475        // Test Lt with column on LHS
476        let lt_expr = PartitionExpr::new(
477            Operand::Column("a".to_string()),
478            RestrictedOp::Lt,
479            Operand::Value(Value::Int64(10)),
480        );
481        assert_eq!(
482            lt_expr.try_as_logical_expr().unwrap().to_string(),
483            "a < Int64(10) OR a IS NULL"
484        );
485
486        // Test Lt with column on RHS
487        let lt_expr_rhs_column = PartitionExpr::new(
488            Operand::Value(Value::Int64(10)),
489            RestrictedOp::Lt,
490            Operand::Column("a".to_string()),
491        );
492        assert_eq!(
493            lt_expr_rhs_column
494                .try_as_logical_expr()
495                .unwrap()
496                .to_string(),
497            "Int64(10) < a OR a IS NULL"
498        );
499
500        // Test Gt with column on LHS
501        let gt_expr = PartitionExpr::new(
502            Operand::Column("a".to_string()),
503            RestrictedOp::Gt,
504            Operand::Value(Value::Int64(10)),
505        );
506        assert_eq!(
507            gt_expr.try_as_logical_expr().unwrap().to_string(),
508            "a > Int64(10) OR a IS NULL"
509        );
510
511        // Test Gt with column on RHS
512        let gt_expr_rhs_column = PartitionExpr::new(
513            Operand::Value(Value::Int64(10)),
514            RestrictedOp::Gt,
515            Operand::Column("a".to_string()),
516        );
517        assert_eq!(
518            gt_expr_rhs_column
519                .try_as_logical_expr()
520                .unwrap()
521                .to_string(),
522            "Int64(10) > a OR a IS NULL"
523        );
524
525        // Test GtEq with column on LHS
526        let gteq_expr = PartitionExpr::new(
527            Operand::Column("a".to_string()),
528            RestrictedOp::GtEq,
529            Operand::Value(Value::Int64(10)),
530        );
531        assert_eq!(
532            gteq_expr.try_as_logical_expr().unwrap().to_string(),
533            "a >= Int64(10) OR a IS NULL"
534        );
535
536        // Test LtEq with column on LHS
537        let lteq_expr = PartitionExpr::new(
538            Operand::Column("a".to_string()),
539            RestrictedOp::LtEq,
540            Operand::Value(Value::Int64(10)),
541        );
542        assert_eq!(
543            lteq_expr.try_as_logical_expr().unwrap().to_string(),
544            "a <= Int64(10) OR a IS NULL"
545        );
546    }
547
548    #[test]
549    fn test_serde_partition_expr() {
550        let expr = PartitionExpr::new(
551            Operand::Column("a".to_string()),
552            RestrictedOp::Eq,
553            Operand::Value(Value::UInt32(10)),
554        );
555        let json = expr.as_json_str().unwrap();
556        assert_eq!(
557            json,
558            "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
559        );
560
561        let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
562        let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
563        let expected = PartitionExpr::new(
564            Operand::Column("a".to_string()),
565            RestrictedOp::GtEq,
566            Operand::Value(Value::UInt32(10)),
567        );
568        assert_eq!(expr2, expected);
569
570        // empty string
571        let json = "";
572        let expr3 = PartitionExpr::from_json_str(json).unwrap();
573        assert!(expr3.is_none());
574
575        // variants other than Expr
576        let json = r#""MaxValue""#;
577        let expr4 = PartitionExpr::from_json_str(json).unwrap();
578        assert!(expr4.is_none());
579
580        let json = r#"{"Value":{"UInt32":10}}"#;
581        let expr5 = PartitionExpr::from_json_str(json).unwrap();
582        assert!(expr5.is_none());
583    }
584}