1use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::execution_props::ExecutionProps;
21use datafusion_expr::Expr;
22use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
23use datatypes::arrow;
24use datatypes::value::{
25 duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41 pub(crate) lhs: Box<Operand>,
42 pub(crate) op: RestrictedOp,
43 pub(crate) rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48 Column(String),
49 Value(Value),
50 Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54 Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58 fn from(value: Value) -> Self {
59 Operand::Value(value)
60 }
61}
62
63impl Operand {
64 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65 match self {
66 Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67 Self::Value(v) => {
68 let scalar_value = match v {
69 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83 Value::Null => ScalarValue::Null,
84 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88 Value::IntervalMonthDayNano(v) => {
89 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90 }
91 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92 Value::Decimal128(d) => {
93 let (v, p, s) = d.to_scalar_value();
94 ScalarValue::Decimal128(v, p, s)
95 }
96 other => {
97 return error::UnsupportedPartitionExprValueSnafu {
98 value: other.clone(),
99 }
100 .fail()
101 }
102 };
103 Ok(datafusion_expr::lit(scalar_value))
104 }
105 Self::Expr(e) => e.try_as_logical_expr(),
106 }
107 }
108
109 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111 }
112
113 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115 }
116
117 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119 }
120
121 pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122 PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123 }
124
125 pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126 PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127 }
128
129 pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130 PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131 }
132}
133
134impl Display for Operand {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Column(v) => write!(f, "{v}"),
138 Self::Value(v) => write!(f, "{v}"),
139 Self::Expr(v) => write!(f, "{v}"),
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148 Eq,
150 NotEq,
151 Lt,
152 LtEq,
153 Gt,
154 GtEq,
155
156 And,
158 Or,
159}
160
161impl RestrictedOp {
162 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163 match op {
164 ParserBinaryOperator::Eq => Some(Self::Eq),
165 ParserBinaryOperator::NotEq => Some(Self::NotEq),
166 ParserBinaryOperator::Lt => Some(Self::Lt),
167 ParserBinaryOperator::LtEq => Some(Self::LtEq),
168 ParserBinaryOperator::Gt => Some(Self::Gt),
169 ParserBinaryOperator::GtEq => Some(Self::GtEq),
170 ParserBinaryOperator::And => Some(Self::And),
171 ParserBinaryOperator::Or => Some(Self::Or),
172 _ => None,
173 }
174 }
175
176 pub fn to_parser_op(&self) -> ParserBinaryOperator {
177 match self {
178 Self::Eq => ParserBinaryOperator::Eq,
179 Self::NotEq => ParserBinaryOperator::NotEq,
180 Self::Lt => ParserBinaryOperator::Lt,
181 Self::LtEq => ParserBinaryOperator::LtEq,
182 Self::Gt => ParserBinaryOperator::Gt,
183 Self::GtEq => ParserBinaryOperator::GtEq,
184 Self::And => ParserBinaryOperator::And,
185 Self::Or => ParserBinaryOperator::Or,
186 }
187 }
188}
189impl Display for RestrictedOp {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 match self {
192 Self::Eq => write!(f, "="),
193 Self::NotEq => write!(f, "<>"),
194 Self::Lt => write!(f, "<"),
195 Self::LtEq => write!(f, "<="),
196 Self::Gt => write!(f, ">"),
197 Self::GtEq => write!(f, ">="),
198 Self::And => write!(f, "AND"),
199 Self::Or => write!(f, "OR"),
200 }
201 }
202}
203
204impl PartitionExpr {
205 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206 Self {
207 lhs: Box::new(lhs),
208 op,
209 rhs: Box::new(rhs),
210 }
211 }
212
213 pub fn to_parser_expr(&self) -> ParserExpr {
217 let lhs = match &*self.lhs {
220 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
222 Operand::Expr(e) => e.to_parser_expr(),
223 };
224
225 let rhs = match &*self.rhs {
226 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap()),
228 Operand::Expr(e) => e.to_parser_expr(),
229 };
230
231 ParserExpr::BinaryOp {
232 left: Box::new(lhs),
233 op: self.op.to_parser_op(),
234 right: Box::new(rhs),
235 }
236 }
237
238 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239 let lhs = self.lhs.try_as_logical_expr()?;
240 let rhs = self.rhs.try_as_logical_expr()?;
241
242 let expr = match &self.op {
243 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
244 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
245 RestrictedOp::Gt => lhs.gt(rhs),
246 RestrictedOp::GtEq => lhs.gt_eq(rhs),
247 RestrictedOp::Lt => lhs.lt(rhs),
248 RestrictedOp::LtEq => lhs.lt_eq(rhs),
249 RestrictedOp::Eq => lhs.eq(rhs),
250 RestrictedOp::NotEq => lhs.not_eq(rhs),
251 };
252 Ok(expr)
253 }
254
255 pub fn try_as_physical_expr(
256 &self,
257 schema: &arrow::datatypes::SchemaRef,
258 ) -> error::Result<Arc<dyn PhysicalExpr>> {
259 let df_schema = schema
260 .clone()
261 .to_dfschema_ref()
262 .context(error::ToDFSchemaSnafu)?;
263 let execution_props = &ExecutionProps::default();
264 let expr = self.try_as_logical_expr()?;
265 create_physical_expr(&expr, &df_schema, execution_props)
266 .context(error::CreatePhysicalExprSnafu)
267 }
268
269 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
270 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
271 }
272
273 pub fn as_json_str(&self) -> error::Result<String> {
277 serde_json::to_string(&PartitionBound::Expr(self.clone()))
278 .context(error::SerializeJsonSnafu)
279 }
280
281 pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
285 if s.is_empty() {
286 return Ok(None);
287 }
288
289 let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
290 match bound {
291 PartitionBound::Expr(expr) => Ok(Some(expr)),
292 _ => Ok(None),
293 }
294 }
295
296 pub fn as_pb_partition(&self) -> error::Result<Partition> {
298 Ok(Partition {
299 expression: self.as_json_str()?,
300 ..Default::default()
301 })
302 }
303}
304
305impl Display for PartitionExpr {
306 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
307 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn test_partition_expr() {
317 let cases = [
318 (
319 Operand::Column("a".to_string()),
320 RestrictedOp::Eq,
321 Operand::Value(Value::UInt32(10)),
322 "a = 10",
323 ),
324 (
325 Operand::Column("a".to_string()),
326 RestrictedOp::NotEq,
327 Operand::Value(Value::UInt32(10)),
328 "a <> 10",
329 ),
330 (
331 Operand::Column("a".to_string()),
332 RestrictedOp::Lt,
333 Operand::Value(Value::UInt32(10)),
334 "a < 10",
335 ),
336 (
337 Operand::Column("a".to_string()),
338 RestrictedOp::LtEq,
339 Operand::Value(Value::UInt32(10)),
340 "a <= 10",
341 ),
342 (
343 Operand::Column("a".to_string()),
344 RestrictedOp::Gt,
345 Operand::Value(Value::UInt32(10)),
346 "a > 10",
347 ),
348 (
349 Operand::Column("a".to_string()),
350 RestrictedOp::GtEq,
351 Operand::Value(Value::UInt32(10)),
352 "a >= 10",
353 ),
354 (
355 Operand::Column("a".to_string()),
356 RestrictedOp::And,
357 Operand::Column("b".to_string()),
358 "a AND b",
359 ),
360 (
361 Operand::Column("a".to_string()),
362 RestrictedOp::Or,
363 Operand::Column("b".to_string()),
364 "a OR b",
365 ),
366 (
367 Operand::Column("a".to_string()),
368 RestrictedOp::Or,
369 Operand::Expr(PartitionExpr::new(
370 Operand::Column("c".to_string()),
371 RestrictedOp::And,
372 Operand::Column("d".to_string()),
373 )),
374 "a OR c AND d",
375 ),
376 ];
377
378 for case in cases {
379 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
380 assert_eq!(case.3, expr.to_string());
381 }
382 }
383
384 #[test]
385 fn test_serde_partition_expr() {
386 let expr = PartitionExpr::new(
387 Operand::Column("a".to_string()),
388 RestrictedOp::Eq,
389 Operand::Value(Value::UInt32(10)),
390 );
391 let json = expr.as_json_str().unwrap();
392 assert_eq!(
393 json,
394 "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
395 );
396
397 let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
398 let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
399 let expected = PartitionExpr::new(
400 Operand::Column("a".to_string()),
401 RestrictedOp::GtEq,
402 Operand::Value(Value::UInt32(10)),
403 );
404 assert_eq!(expr2, expected);
405
406 let json = "";
408 let expr3 = PartitionExpr::from_json_str(json).unwrap();
409 assert!(expr3.is_none());
410
411 let json = r#""MaxValue""#;
413 let expr4 = PartitionExpr::from_json_str(json).unwrap();
414 assert!(expr4.is_none());
415
416 let json = r#"{"Value":{"UInt32":10}}"#;
417 let expr5 = PartitionExpr::from_json_str(json).unwrap();
418 assert!(expr5.is_none());
419 }
420}