partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use datafusion_common::{ScalarValue, ToDFSchema};
19use datafusion_expr::execution_props::ExecutionProps;
20use datafusion_expr::Expr;
21use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
22use datatypes::arrow;
23use datatypes::value::{
24    duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
25};
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use sql::statements::value_to_sql_value;
29use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
30
31use crate::error;
32
33/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
34/// by [`Self::to_parser_expr`].
35///
36/// [Expr]: sqlparser::ast::Expr
37#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub struct PartitionExpr {
39    pub(crate) lhs: Box<Operand>,
40    pub(crate) op: RestrictedOp,
41    pub(crate) rhs: Box<Operand>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
45pub enum Operand {
46    Column(String),
47    Value(Value),
48    Expr(PartitionExpr),
49}
50
51pub fn col(column_name: impl Into<String>) -> Operand {
52    Operand::Column(column_name.into())
53}
54
55impl From<Value> for Operand {
56    fn from(value: Value) -> Self {
57        Operand::Value(value)
58    }
59}
60
61impl Operand {
62    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
63        match self {
64            Self::Column(c) => Ok(datafusion_expr::col(c)),
65            Self::Value(v) => {
66                let scalar_value = match v {
67                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
68                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
69                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
70                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
71                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
72                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
73                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
74                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
75                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
76                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
77                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
78                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
79                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
80                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
81                    Value::Null => ScalarValue::Null,
82                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
83                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
84                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
85                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
86                    Value::IntervalMonthDayNano(v) => {
87                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
88                    }
89                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
90                    Value::Decimal128(d) => {
91                        let (v, p, s) = d.to_scalar_value();
92                        ScalarValue::Decimal128(v, p, s)
93                    }
94                    other => {
95                        return error::UnsupportedPartitionExprValueSnafu {
96                            value: other.clone(),
97                        }
98                        .fail()
99                    }
100                };
101                Ok(datafusion_expr::lit(scalar_value))
102            }
103            Self::Expr(e) => e.try_as_logical_expr(),
104        }
105    }
106
107    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
108        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
109    }
110
111    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
112        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
113    }
114
115    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
116        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
117    }
118}
119
120impl Display for Operand {
121    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122        match self {
123            Self::Column(v) => write!(f, "{v}"),
124            Self::Value(v) => write!(f, "{v}"),
125            Self::Expr(v) => write!(f, "{v}"),
126        }
127    }
128}
129
130/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
131/// partition expressions.
132#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
133pub enum RestrictedOp {
134    // Evaluate to binary
135    Eq,
136    NotEq,
137    Lt,
138    LtEq,
139    Gt,
140    GtEq,
141
142    // Conjunction
143    And,
144    Or,
145}
146
147impl RestrictedOp {
148    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
149        match op {
150            ParserBinaryOperator::Eq => Some(Self::Eq),
151            ParserBinaryOperator::NotEq => Some(Self::NotEq),
152            ParserBinaryOperator::Lt => Some(Self::Lt),
153            ParserBinaryOperator::LtEq => Some(Self::LtEq),
154            ParserBinaryOperator::Gt => Some(Self::Gt),
155            ParserBinaryOperator::GtEq => Some(Self::GtEq),
156            ParserBinaryOperator::And => Some(Self::And),
157            ParserBinaryOperator::Or => Some(Self::Or),
158            _ => None,
159        }
160    }
161
162    pub fn to_parser_op(&self) -> ParserBinaryOperator {
163        match self {
164            Self::Eq => ParserBinaryOperator::Eq,
165            Self::NotEq => ParserBinaryOperator::NotEq,
166            Self::Lt => ParserBinaryOperator::Lt,
167            Self::LtEq => ParserBinaryOperator::LtEq,
168            Self::Gt => ParserBinaryOperator::Gt,
169            Self::GtEq => ParserBinaryOperator::GtEq,
170            Self::And => ParserBinaryOperator::And,
171            Self::Or => ParserBinaryOperator::Or,
172        }
173    }
174}
175impl Display for RestrictedOp {
176    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177        match self {
178            Self::Eq => write!(f, "="),
179            Self::NotEq => write!(f, "<>"),
180            Self::Lt => write!(f, "<"),
181            Self::LtEq => write!(f, "<="),
182            Self::Gt => write!(f, ">"),
183            Self::GtEq => write!(f, ">="),
184            Self::And => write!(f, "AND"),
185            Self::Or => write!(f, "OR"),
186        }
187    }
188}
189
190impl PartitionExpr {
191    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
192        Self {
193            lhs: Box::new(lhs),
194            op,
195            rhs: Box::new(rhs),
196        }
197    }
198
199    /// Convert [Self] back to sqlparser's [Expr]
200    ///
201    /// [Expr]: ParserExpr
202    pub fn to_parser_expr(&self) -> ParserExpr {
203        // Safety: Partition rule won't contains unsupported value type.
204        // Otherwise it will be rejected by the parser.
205        let lhs = match &*self.lhs {
206            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
207            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
208            Operand::Expr(e) => e.to_parser_expr(),
209        };
210
211        let rhs = match &*self.rhs {
212            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
213            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
214            Operand::Expr(e) => e.to_parser_expr(),
215        };
216
217        ParserExpr::BinaryOp {
218            left: Box::new(lhs),
219            op: self.op.to_parser_op(),
220            right: Box::new(rhs),
221        }
222    }
223
224    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
225        let lhs = self.lhs.try_as_logical_expr()?;
226        let rhs = self.rhs.try_as_logical_expr()?;
227
228        let expr = match &self.op {
229            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
230            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
231            RestrictedOp::Gt => lhs.gt(rhs),
232            RestrictedOp::GtEq => lhs.gt_eq(rhs),
233            RestrictedOp::Lt => lhs.lt(rhs),
234            RestrictedOp::LtEq => lhs.lt_eq(rhs),
235            RestrictedOp::Eq => lhs.eq(rhs),
236            RestrictedOp::NotEq => lhs.not_eq(rhs),
237        };
238        Ok(expr)
239    }
240
241    pub fn try_as_physical_expr(
242        &self,
243        schema: &arrow::datatypes::SchemaRef,
244    ) -> error::Result<Arc<dyn PhysicalExpr>> {
245        let df_schema = schema
246            .clone()
247            .to_dfschema_ref()
248            .context(error::ToDFSchemaSnafu)?;
249        let execution_props = &ExecutionProps::default();
250        let expr = self.try_as_logical_expr()?;
251        create_physical_expr(&expr, &df_schema, execution_props)
252            .context(error::CreatePhysicalExprSnafu)
253    }
254
255    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
256        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
257    }
258}
259
260impl Display for PartitionExpr {
261    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
262        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_partition_expr() {
272        let cases = [
273            (
274                Operand::Column("a".to_string()),
275                RestrictedOp::Eq,
276                Operand::Value(Value::UInt32(10)),
277                "a = 10",
278            ),
279            (
280                Operand::Column("a".to_string()),
281                RestrictedOp::NotEq,
282                Operand::Value(Value::UInt32(10)),
283                "a <> 10",
284            ),
285            (
286                Operand::Column("a".to_string()),
287                RestrictedOp::Lt,
288                Operand::Value(Value::UInt32(10)),
289                "a < 10",
290            ),
291            (
292                Operand::Column("a".to_string()),
293                RestrictedOp::LtEq,
294                Operand::Value(Value::UInt32(10)),
295                "a <= 10",
296            ),
297            (
298                Operand::Column("a".to_string()),
299                RestrictedOp::Gt,
300                Operand::Value(Value::UInt32(10)),
301                "a > 10",
302            ),
303            (
304                Operand::Column("a".to_string()),
305                RestrictedOp::GtEq,
306                Operand::Value(Value::UInt32(10)),
307                "a >= 10",
308            ),
309            (
310                Operand::Column("a".to_string()),
311                RestrictedOp::And,
312                Operand::Column("b".to_string()),
313                "a AND b",
314            ),
315            (
316                Operand::Column("a".to_string()),
317                RestrictedOp::Or,
318                Operand::Column("b".to_string()),
319                "a OR b",
320            ),
321            (
322                Operand::Column("a".to_string()),
323                RestrictedOp::Or,
324                Operand::Expr(PartitionExpr::new(
325                    Operand::Column("c".to_string()),
326                    RestrictedOp::And,
327                    Operand::Column("d".to_string()),
328                )),
329                "a OR c AND d",
330            ),
331        ];
332
333        for case in cases {
334            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
335            assert_eq!(case.3, expr.to_string());
336        }
337    }
338}