1use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use datafusion_common::{ScalarValue, ToDFSchema};
19use datafusion_expr::execution_props::ExecutionProps;
20use datafusion_expr::Expr;
21use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
22use datatypes::arrow;
23use datatypes::value::{
24 duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
25};
26use serde::{Deserialize, Serialize};
27use snafu::ResultExt;
28use sql::statements::value_to_sql_value;
29use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
30
31use crate::error;
32
33#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
38pub struct PartitionExpr {
39 pub(crate) lhs: Box<Operand>,
40 pub(crate) op: RestrictedOp,
41 pub(crate) rhs: Box<Operand>,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
45pub enum Operand {
46 Column(String),
47 Value(Value),
48 Expr(PartitionExpr),
49}
50
51pub fn col(column_name: impl Into<String>) -> Operand {
52 Operand::Column(column_name.into())
53}
54
55impl From<Value> for Operand {
56 fn from(value: Value) -> Self {
57 Operand::Value(value)
58 }
59}
60
61impl Operand {
62 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
63 match self {
64 Self::Column(c) => Ok(datafusion_expr::col(c)),
65 Self::Value(v) => {
66 let scalar_value = match v {
67 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
68 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
69 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
70 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
71 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
72 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
73 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
74 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
75 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
76 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
77 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
78 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
79 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
80 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
81 Value::Null => ScalarValue::Null,
82 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
83 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
84 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
85 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
86 Value::IntervalMonthDayNano(v) => {
87 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
88 }
89 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
90 Value::Decimal128(d) => {
91 let (v, p, s) = d.to_scalar_value();
92 ScalarValue::Decimal128(v, p, s)
93 }
94 other => {
95 return error::UnsupportedPartitionExprValueSnafu {
96 value: other.clone(),
97 }
98 .fail()
99 }
100 };
101 Ok(datafusion_expr::lit(scalar_value))
102 }
103 Self::Expr(e) => e.try_as_logical_expr(),
104 }
105 }
106
107 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
108 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
109 }
110
111 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
112 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
113 }
114
115 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
116 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
117 }
118}
119
120impl Display for Operand {
121 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
122 match self {
123 Self::Column(v) => write!(f, "{v}"),
124 Self::Value(v) => write!(f, "{v}"),
125 Self::Expr(v) => write!(f, "{v}"),
126 }
127 }
128}
129
130#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
133pub enum RestrictedOp {
134 Eq,
136 NotEq,
137 Lt,
138 LtEq,
139 Gt,
140 GtEq,
141
142 And,
144 Or,
145}
146
147impl RestrictedOp {
148 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
149 match op {
150 ParserBinaryOperator::Eq => Some(Self::Eq),
151 ParserBinaryOperator::NotEq => Some(Self::NotEq),
152 ParserBinaryOperator::Lt => Some(Self::Lt),
153 ParserBinaryOperator::LtEq => Some(Self::LtEq),
154 ParserBinaryOperator::Gt => Some(Self::Gt),
155 ParserBinaryOperator::GtEq => Some(Self::GtEq),
156 ParserBinaryOperator::And => Some(Self::And),
157 ParserBinaryOperator::Or => Some(Self::Or),
158 _ => None,
159 }
160 }
161
162 pub fn to_parser_op(&self) -> ParserBinaryOperator {
163 match self {
164 Self::Eq => ParserBinaryOperator::Eq,
165 Self::NotEq => ParserBinaryOperator::NotEq,
166 Self::Lt => ParserBinaryOperator::Lt,
167 Self::LtEq => ParserBinaryOperator::LtEq,
168 Self::Gt => ParserBinaryOperator::Gt,
169 Self::GtEq => ParserBinaryOperator::GtEq,
170 Self::And => ParserBinaryOperator::And,
171 Self::Or => ParserBinaryOperator::Or,
172 }
173 }
174}
175impl Display for RestrictedOp {
176 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177 match self {
178 Self::Eq => write!(f, "="),
179 Self::NotEq => write!(f, "<>"),
180 Self::Lt => write!(f, "<"),
181 Self::LtEq => write!(f, "<="),
182 Self::Gt => write!(f, ">"),
183 Self::GtEq => write!(f, ">="),
184 Self::And => write!(f, "AND"),
185 Self::Or => write!(f, "OR"),
186 }
187 }
188}
189
190impl PartitionExpr {
191 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
192 Self {
193 lhs: Box::new(lhs),
194 op,
195 rhs: Box::new(rhs),
196 }
197 }
198
199 pub fn to_parser_expr(&self) -> ParserExpr {
203 let lhs = match &*self.lhs {
206 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
207 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
208 Operand::Expr(e) => e.to_parser_expr(),
209 };
210
211 let rhs = match &*self.rhs {
212 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
213 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
214 Operand::Expr(e) => e.to_parser_expr(),
215 };
216
217 ParserExpr::BinaryOp {
218 left: Box::new(lhs),
219 op: self.op.to_parser_op(),
220 right: Box::new(rhs),
221 }
222 }
223
224 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
225 let lhs = self.lhs.try_as_logical_expr()?;
226 let rhs = self.rhs.try_as_logical_expr()?;
227
228 let expr = match &self.op {
229 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
230 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
231 RestrictedOp::Gt => lhs.gt(rhs),
232 RestrictedOp::GtEq => lhs.gt_eq(rhs),
233 RestrictedOp::Lt => lhs.lt(rhs),
234 RestrictedOp::LtEq => lhs.lt_eq(rhs),
235 RestrictedOp::Eq => lhs.eq(rhs),
236 RestrictedOp::NotEq => lhs.not_eq(rhs),
237 };
238 Ok(expr)
239 }
240
241 pub fn try_as_physical_expr(
242 &self,
243 schema: &arrow::datatypes::SchemaRef,
244 ) -> error::Result<Arc<dyn PhysicalExpr>> {
245 let df_schema = schema
246 .clone()
247 .to_dfschema_ref()
248 .context(error::ToDFSchemaSnafu)?;
249 let execution_props = &ExecutionProps::default();
250 let expr = self.try_as_logical_expr()?;
251 create_physical_expr(&expr, &df_schema, execution_props)
252 .context(error::CreatePhysicalExprSnafu)
253 }
254
255 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
256 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
257 }
258}
259
260impl Display for PartitionExpr {
261 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
262 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
263 }
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269
270 #[test]
271 fn test_partition_expr() {
272 let cases = [
273 (
274 Operand::Column("a".to_string()),
275 RestrictedOp::Eq,
276 Operand::Value(Value::UInt32(10)),
277 "a = 10",
278 ),
279 (
280 Operand::Column("a".to_string()),
281 RestrictedOp::NotEq,
282 Operand::Value(Value::UInt32(10)),
283 "a <> 10",
284 ),
285 (
286 Operand::Column("a".to_string()),
287 RestrictedOp::Lt,
288 Operand::Value(Value::UInt32(10)),
289 "a < 10",
290 ),
291 (
292 Operand::Column("a".to_string()),
293 RestrictedOp::LtEq,
294 Operand::Value(Value::UInt32(10)),
295 "a <= 10",
296 ),
297 (
298 Operand::Column("a".to_string()),
299 RestrictedOp::Gt,
300 Operand::Value(Value::UInt32(10)),
301 "a > 10",
302 ),
303 (
304 Operand::Column("a".to_string()),
305 RestrictedOp::GtEq,
306 Operand::Value(Value::UInt32(10)),
307 "a >= 10",
308 ),
309 (
310 Operand::Column("a".to_string()),
311 RestrictedOp::And,
312 Operand::Column("b".to_string()),
313 "a AND b",
314 ),
315 (
316 Operand::Column("a".to_string()),
317 RestrictedOp::Or,
318 Operand::Column("b".to_string()),
319 "a OR b",
320 ),
321 (
322 Operand::Column("a".to_string()),
323 RestrictedOp::Or,
324 Operand::Expr(PartitionExpr::new(
325 Operand::Column("c".to_string()),
326 RestrictedOp::And,
327 Operand::Column("d".to_string()),
328 )),
329 "a OR c AND d",
330 ),
331 ];
332
333 for case in cases {
334 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
335 assert_eq!(case.3, expr.to_string());
336 }
337 }
338}