partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::execution_props::ExecutionProps;
21use datafusion_expr::Expr;
22use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
23use datatypes::arrow;
24use datatypes::value::{
25    duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
36/// by [`Self::to_parser_expr`].
37///
38/// [Expr]: sqlparser::ast::Expr
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41    pub(crate) lhs: Box<Operand>,
42    pub(crate) op: RestrictedOp,
43    pub(crate) rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48    Column(String),
49    Value(Value),
50    Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54    Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58    fn from(value: Value) -> Self {
59        Operand::Value(value)
60    }
61}
62
63impl Operand {
64    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65        match self {
66            Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67            Self::Value(v) => {
68                let scalar_value = match v {
69                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83                    Value::Null => ScalarValue::Null,
84                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88                    Value::IntervalMonthDayNano(v) => {
89                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90                    }
91                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92                    Value::Decimal128(d) => {
93                        let (v, p, s) = d.to_scalar_value();
94                        ScalarValue::Decimal128(v, p, s)
95                    }
96                    other => {
97                        return error::UnsupportedPartitionExprValueSnafu {
98                            value: other.clone(),
99                        }
100                        .fail()
101                    }
102                };
103                Ok(datafusion_expr::lit(scalar_value))
104            }
105            Self::Expr(e) => e.try_as_logical_expr(),
106        }
107    }
108
109    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111    }
112
113    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115    }
116
117    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119    }
120
121    pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122        PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123    }
124
125    pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126        PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127    }
128
129    pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130        PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131    }
132}
133
134impl Display for Operand {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::Column(v) => write!(f, "{v}"),
138            Self::Value(v) => write!(f, "{v}"),
139            Self::Expr(v) => write!(f, "{v}"),
140        }
141    }
142}
143
144/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
145/// partition expressions.
146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148    // Evaluate to binary
149    Eq,
150    NotEq,
151    Lt,
152    LtEq,
153    Gt,
154    GtEq,
155
156    // Conjunction
157    And,
158    Or,
159}
160
161impl RestrictedOp {
162    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163        match op {
164            ParserBinaryOperator::Eq => Some(Self::Eq),
165            ParserBinaryOperator::NotEq => Some(Self::NotEq),
166            ParserBinaryOperator::Lt => Some(Self::Lt),
167            ParserBinaryOperator::LtEq => Some(Self::LtEq),
168            ParserBinaryOperator::Gt => Some(Self::Gt),
169            ParserBinaryOperator::GtEq => Some(Self::GtEq),
170            ParserBinaryOperator::And => Some(Self::And),
171            ParserBinaryOperator::Or => Some(Self::Or),
172            _ => None,
173        }
174    }
175
176    pub fn to_parser_op(&self) -> ParserBinaryOperator {
177        match self {
178            Self::Eq => ParserBinaryOperator::Eq,
179            Self::NotEq => ParserBinaryOperator::NotEq,
180            Self::Lt => ParserBinaryOperator::Lt,
181            Self::LtEq => ParserBinaryOperator::LtEq,
182            Self::Gt => ParserBinaryOperator::Gt,
183            Self::GtEq => ParserBinaryOperator::GtEq,
184            Self::And => ParserBinaryOperator::And,
185            Self::Or => ParserBinaryOperator::Or,
186        }
187    }
188}
189impl Display for RestrictedOp {
190    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191        match self {
192            Self::Eq => write!(f, "="),
193            Self::NotEq => write!(f, "<>"),
194            Self::Lt => write!(f, "<"),
195            Self::LtEq => write!(f, "<="),
196            Self::Gt => write!(f, ">"),
197            Self::GtEq => write!(f, ">="),
198            Self::And => write!(f, "AND"),
199            Self::Or => write!(f, "OR"),
200        }
201    }
202}
203
204impl PartitionExpr {
205    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206        Self {
207            lhs: Box::new(lhs),
208            op,
209            rhs: Box::new(rhs),
210        }
211    }
212
213    /// Convert [Self] back to sqlparser's [Expr]
214    ///
215    /// [Expr]: ParserExpr
216    pub fn to_parser_expr(&self) -> ParserExpr {
217        // Safety: Partition rule won't contains unsupported value type.
218        // Otherwise it will be rejected by the parser.
219        let lhs = match &*self.lhs {
220            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
222            Operand::Expr(e) => e.to_parser_expr(),
223        };
224
225        let rhs = match &*self.rhs {
226            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
228            Operand::Expr(e) => e.to_parser_expr(),
229        };
230
231        ParserExpr::BinaryOp {
232            left: Box::new(lhs),
233            op: self.op.to_parser_op(),
234            right: Box::new(rhs),
235        }
236    }
237
238    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239        let lhs = self.lhs.try_as_logical_expr()?;
240        let rhs = self.rhs.try_as_logical_expr()?;
241
242        let expr = match &self.op {
243            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
244            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
245            RestrictedOp::Gt => lhs.gt(rhs),
246            RestrictedOp::GtEq => lhs.gt_eq(rhs),
247            RestrictedOp::Lt => lhs.lt(rhs),
248            RestrictedOp::LtEq => lhs.lt_eq(rhs),
249            RestrictedOp::Eq => lhs.eq(rhs),
250            RestrictedOp::NotEq => lhs.not_eq(rhs),
251        };
252        Ok(expr)
253    }
254
255    pub fn try_as_physical_expr(
256        &self,
257        schema: &arrow::datatypes::SchemaRef,
258    ) -> error::Result<Arc<dyn PhysicalExpr>> {
259        let df_schema = schema
260            .clone()
261            .to_dfschema_ref()
262            .context(error::ToDFSchemaSnafu)?;
263        let execution_props = &ExecutionProps::default();
264        let expr = self.try_as_logical_expr()?;
265        create_physical_expr(&expr, &df_schema, execution_props)
266            .context(error::CreatePhysicalExprSnafu)
267    }
268
269    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
270        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
271    }
272
273    /// Serializes `PartitionExpr` to json string.
274    ///
275    /// Wraps `PartitionBound::Expr` for compatibility.
276    pub fn as_json_str(&self) -> error::Result<String> {
277        serde_json::to_string(&PartitionBound::Expr(self.clone()))
278            .context(error::SerializeJsonSnafu)
279    }
280
281    /// Deserializes `PartitionExpr` from json string.
282    ///
283    /// Deserializes to `PartitionBound` for compatibility.
284    pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
285        if s.is_empty() {
286            return Ok(None);
287        }
288
289        let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
290        match bound {
291            PartitionBound::Expr(expr) => Ok(Some(expr)),
292            _ => Ok(None),
293        }
294    }
295
296    /// Converts [Self] to [Partition].
297    pub fn as_pb_partition(&self) -> error::Result<Partition> {
298        Ok(Partition {
299            expression: self.as_json_str()?,
300            ..Default::default()
301        })
302    }
303}
304
305impl Display for PartitionExpr {
306    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn test_partition_expr() {
317        let cases = [
318            (
319                Operand::Column("a".to_string()),
320                RestrictedOp::Eq,
321                Operand::Value(Value::UInt32(10)),
322                "a = 10",
323            ),
324            (
325                Operand::Column("a".to_string()),
326                RestrictedOp::NotEq,
327                Operand::Value(Value::UInt32(10)),
328                "a <> 10",
329            ),
330            (
331                Operand::Column("a".to_string()),
332                RestrictedOp::Lt,
333                Operand::Value(Value::UInt32(10)),
334                "a < 10",
335            ),
336            (
337                Operand::Column("a".to_string()),
338                RestrictedOp::LtEq,
339                Operand::Value(Value::UInt32(10)),
340                "a <= 10",
341            ),
342            (
343                Operand::Column("a".to_string()),
344                RestrictedOp::Gt,
345                Operand::Value(Value::UInt32(10)),
346                "a > 10",
347            ),
348            (
349                Operand::Column("a".to_string()),
350                RestrictedOp::GtEq,
351                Operand::Value(Value::UInt32(10)),
352                "a >= 10",
353            ),
354            (
355                Operand::Column("a".to_string()),
356                RestrictedOp::And,
357                Operand::Column("b".to_string()),
358                "a AND b",
359            ),
360            (
361                Operand::Column("a".to_string()),
362                RestrictedOp::Or,
363                Operand::Column("b".to_string()),
364                "a OR b",
365            ),
366            (
367                Operand::Column("a".to_string()),
368                RestrictedOp::Or,
369                Operand::Expr(PartitionExpr::new(
370                    Operand::Column("c".to_string()),
371                    RestrictedOp::And,
372                    Operand::Column("d".to_string()),
373                )),
374                "a OR c AND d",
375            ),
376        ];
377
378        for case in cases {
379            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
380            assert_eq!(case.3, expr.to_string());
381        }
382    }
383
384    #[test]
385    fn test_serde_partition_expr() {
386        let expr = PartitionExpr::new(
387            Operand::Column("a".to_string()),
388            RestrictedOp::Eq,
389            Operand::Value(Value::UInt32(10)),
390        );
391        let json = expr.as_json_str().unwrap();
392        assert_eq!(
393            json,
394            "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
395        );
396
397        let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
398        let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
399        let expected = PartitionExpr::new(
400            Operand::Column("a".to_string()),
401            RestrictedOp::GtEq,
402            Operand::Value(Value::UInt32(10)),
403        );
404        assert_eq!(expr2, expected);
405
406        // empty string
407        let json = "";
408        let expr3 = PartitionExpr::from_json_str(json).unwrap();
409        assert!(expr3.is_none());
410
411        // variants other than Expr
412        let json = r#""MaxValue""#;
413        let expr4 = PartitionExpr::from_json_str(json).unwrap();
414        assert!(expr4.is_none());
415
416        let json = r#"{"Value":{"UInt32":10}}"#;
417        let expr5 = PartitionExpr::from_json_str(json).unwrap();
418        assert!(expr5.is_none());
419    }
420}