partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::execution_props::ExecutionProps;
21use datafusion_expr::Expr;
22use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
23use datatypes::arrow;
24use datatypes::value::{
25    duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
36/// by [`Self::to_parser_expr`].
37///
38/// [Expr]: sqlparser::ast::Expr
39#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41    pub lhs: Box<Operand>,
42    pub op: RestrictedOp,
43    pub rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48    Column(String),
49    Value(Value),
50    Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54    Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58    fn from(value: Value) -> Self {
59        Operand::Value(value)
60    }
61}
62
63impl Operand {
64    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65        match self {
66            Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67            Self::Value(v) => {
68                let scalar_value = match v {
69                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83                    Value::Null => ScalarValue::Null,
84                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88                    Value::IntervalMonthDayNano(v) => {
89                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90                    }
91                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92                    Value::Decimal128(d) => {
93                        let (v, p, s) = d.to_scalar_value();
94                        ScalarValue::Decimal128(v, p, s)
95                    }
96                    other => {
97                        return error::UnsupportedPartitionExprValueSnafu {
98                            value: other.clone(),
99                        }
100                        .fail()
101                    }
102                };
103                Ok(datafusion_expr::lit(scalar_value))
104            }
105            Self::Expr(e) => e.try_as_logical_expr(),
106        }
107    }
108
109    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111    }
112
113    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115    }
116
117    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119    }
120
121    pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122        PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123    }
124
125    pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126        PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127    }
128
129    pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130        PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131    }
132}
133
134impl Display for Operand {
135    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136        match self {
137            Self::Column(v) => write!(f, "{v}"),
138            Self::Value(v) => write!(f, "{v}"),
139            Self::Expr(v) => write!(f, "{v}"),
140        }
141    }
142}
143
144/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
145/// partition expressions.
146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148    // Evaluate to binary
149    Eq,
150    NotEq,
151    Lt,
152    LtEq,
153    Gt,
154    GtEq,
155
156    // Conjunction
157    And,
158    Or,
159}
160
161impl RestrictedOp {
162    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163        match op {
164            ParserBinaryOperator::Eq => Some(Self::Eq),
165            ParserBinaryOperator::NotEq => Some(Self::NotEq),
166            ParserBinaryOperator::Lt => Some(Self::Lt),
167            ParserBinaryOperator::LtEq => Some(Self::LtEq),
168            ParserBinaryOperator::Gt => Some(Self::Gt),
169            ParserBinaryOperator::GtEq => Some(Self::GtEq),
170            ParserBinaryOperator::And => Some(Self::And),
171            ParserBinaryOperator::Or => Some(Self::Or),
172            _ => None,
173        }
174    }
175
176    pub fn to_parser_op(&self) -> ParserBinaryOperator {
177        match self {
178            Self::Eq => ParserBinaryOperator::Eq,
179            Self::NotEq => ParserBinaryOperator::NotEq,
180            Self::Lt => ParserBinaryOperator::Lt,
181            Self::LtEq => ParserBinaryOperator::LtEq,
182            Self::Gt => ParserBinaryOperator::Gt,
183            Self::GtEq => ParserBinaryOperator::GtEq,
184            Self::And => ParserBinaryOperator::And,
185            Self::Or => ParserBinaryOperator::Or,
186        }
187    }
188}
189impl Display for RestrictedOp {
190    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191        match self {
192            Self::Eq => write!(f, "="),
193            Self::NotEq => write!(f, "<>"),
194            Self::Lt => write!(f, "<"),
195            Self::LtEq => write!(f, "<="),
196            Self::Gt => write!(f, ">"),
197            Self::GtEq => write!(f, ">="),
198            Self::And => write!(f, "AND"),
199            Self::Or => write!(f, "OR"),
200        }
201    }
202}
203
204impl PartitionExpr {
205    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206        Self {
207            lhs: Box::new(lhs),
208            op,
209            rhs: Box::new(rhs),
210        }
211    }
212
213    /// Convert [Self] back to sqlparser's [Expr]
214    ///
215    /// [Expr]: ParserExpr
216    pub fn to_parser_expr(&self) -> ParserExpr {
217        // Safety: Partition rule won't contains unsupported value type.
218        // Otherwise it will be rejected by the parser.
219        let lhs = match &*self.lhs {
220            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
222            Operand::Expr(e) => e.to_parser_expr(),
223        };
224
225        let rhs = match &*self.rhs {
226            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
228            Operand::Expr(e) => e.to_parser_expr(),
229        };
230
231        ParserExpr::BinaryOp {
232            left: Box::new(lhs),
233            op: self.op.to_parser_op(),
234            right: Box::new(rhs),
235        }
236    }
237
238    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239        let lhs = self.lhs.try_as_logical_expr()?;
240        let rhs = self.rhs.try_as_logical_expr()?;
241
242        let expr = match &self.op {
243            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
244            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
245            RestrictedOp::Gt => lhs.gt(rhs),
246            RestrictedOp::GtEq => lhs.gt_eq(rhs),
247            RestrictedOp::Lt => lhs.lt(rhs),
248            RestrictedOp::LtEq => lhs.lt_eq(rhs),
249            RestrictedOp::Eq => lhs.eq(rhs),
250            RestrictedOp::NotEq => lhs.not_eq(rhs),
251        };
252        Ok(expr)
253    }
254
255    /// Get the left-hand side operand
256    pub fn lhs(&self) -> &Operand {
257        &self.lhs
258    }
259
260    /// Get the right-hand side operand
261    pub fn rhs(&self) -> &Operand {
262        &self.rhs
263    }
264
265    /// Get the operation
266    pub fn op(&self) -> &RestrictedOp {
267        &self.op
268    }
269
270    pub fn try_as_physical_expr(
271        &self,
272        schema: &arrow::datatypes::SchemaRef,
273    ) -> error::Result<Arc<dyn PhysicalExpr>> {
274        let df_schema = schema
275            .clone()
276            .to_dfschema_ref()
277            .context(error::ToDFSchemaSnafu)?;
278        let execution_props = &ExecutionProps::default();
279        let expr = self.try_as_logical_expr()?;
280        create_physical_expr(&expr, &df_schema, execution_props)
281            .context(error::CreatePhysicalExprSnafu)
282    }
283
284    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
285        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
286    }
287
288    /// Serializes `PartitionExpr` to json string.
289    ///
290    /// Wraps `PartitionBound::Expr` for compatibility.
291    pub fn as_json_str(&self) -> error::Result<String> {
292        serde_json::to_string(&PartitionBound::Expr(self.clone()))
293            .context(error::SerializeJsonSnafu)
294    }
295
296    /// Deserializes `PartitionExpr` from json string.
297    ///
298    /// Deserializes to `PartitionBound` for compatibility.
299    pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
300        if s.is_empty() {
301            return Ok(None);
302        }
303
304        let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
305        match bound {
306            PartitionBound::Expr(expr) => Ok(Some(expr)),
307            _ => Ok(None),
308        }
309    }
310
311    /// Converts [Self] to [Partition].
312    pub fn as_pb_partition(&self) -> error::Result<Partition> {
313        Ok(Partition {
314            expression: self.as_json_str()?,
315            ..Default::default()
316        })
317    }
318}
319
320impl Display for PartitionExpr {
321    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
322        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
323    }
324}
325
326#[cfg(test)]
327mod tests {
328    use super::*;
329
330    #[test]
331    fn test_partition_expr() {
332        let cases = [
333            (
334                Operand::Column("a".to_string()),
335                RestrictedOp::Eq,
336                Operand::Value(Value::UInt32(10)),
337                "a = 10",
338            ),
339            (
340                Operand::Column("a".to_string()),
341                RestrictedOp::NotEq,
342                Operand::Value(Value::UInt32(10)),
343                "a <> 10",
344            ),
345            (
346                Operand::Column("a".to_string()),
347                RestrictedOp::Lt,
348                Operand::Value(Value::UInt32(10)),
349                "a < 10",
350            ),
351            (
352                Operand::Column("a".to_string()),
353                RestrictedOp::LtEq,
354                Operand::Value(Value::UInt32(10)),
355                "a <= 10",
356            ),
357            (
358                Operand::Column("a".to_string()),
359                RestrictedOp::Gt,
360                Operand::Value(Value::UInt32(10)),
361                "a > 10",
362            ),
363            (
364                Operand::Column("a".to_string()),
365                RestrictedOp::GtEq,
366                Operand::Value(Value::UInt32(10)),
367                "a >= 10",
368            ),
369            (
370                Operand::Column("a".to_string()),
371                RestrictedOp::And,
372                Operand::Column("b".to_string()),
373                "a AND b",
374            ),
375            (
376                Operand::Column("a".to_string()),
377                RestrictedOp::Or,
378                Operand::Column("b".to_string()),
379                "a OR b",
380            ),
381            (
382                Operand::Column("a".to_string()),
383                RestrictedOp::Or,
384                Operand::Expr(PartitionExpr::new(
385                    Operand::Column("c".to_string()),
386                    RestrictedOp::And,
387                    Operand::Column("d".to_string()),
388                )),
389                "a OR c AND d",
390            ),
391        ];
392
393        for case in cases {
394            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
395            assert_eq!(case.3, expr.to_string());
396        }
397    }
398
399    #[test]
400    fn test_serde_partition_expr() {
401        let expr = PartitionExpr::new(
402            Operand::Column("a".to_string()),
403            RestrictedOp::Eq,
404            Operand::Value(Value::UInt32(10)),
405        );
406        let json = expr.as_json_str().unwrap();
407        assert_eq!(
408            json,
409            "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
410        );
411
412        let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
413        let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
414        let expected = PartitionExpr::new(
415            Operand::Column("a".to_string()),
416            RestrictedOp::GtEq,
417            Operand::Value(Value::UInt32(10)),
418        );
419        assert_eq!(expr2, expected);
420
421        // empty string
422        let json = "";
423        let expr3 = PartitionExpr::from_json_str(json).unwrap();
424        assert!(expr3.is_none());
425
426        // variants other than Expr
427        let json = r#""MaxValue""#;
428        let expr4 = PartitionExpr::from_json_str(json).unwrap();
429        assert!(expr4.is_none());
430
431        let json = r#"{"Value":{"UInt32":10}}"#;
432        let expr5 = PartitionExpr::from_json_str(json).unwrap();
433        assert!(expr5.is_none());
434    }
435}