1use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::execution_props::ExecutionProps;
21use datafusion_expr::Expr;
22use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
23use datatypes::arrow;
24use datatypes::value::{
25 duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value, Value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41 pub lhs: Box<Operand>,
42 pub op: RestrictedOp,
43 pub rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48 Column(String),
49 Value(Value),
50 Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54 Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58 fn from(value: Value) -> Self {
59 Operand::Value(value)
60 }
61}
62
63impl Operand {
64 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65 match self {
66 Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67 Self::Value(v) => {
68 let scalar_value = match v {
69 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83 Value::Null => ScalarValue::Null,
84 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88 Value::IntervalMonthDayNano(v) => {
89 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90 }
91 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92 Value::Decimal128(d) => {
93 let (v, p, s) = d.to_scalar_value();
94 ScalarValue::Decimal128(v, p, s)
95 }
96 other => {
97 return error::UnsupportedPartitionExprValueSnafu {
98 value: other.clone(),
99 }
100 .fail()
101 }
102 };
103 Ok(datafusion_expr::lit(scalar_value))
104 }
105 Self::Expr(e) => e.try_as_logical_expr(),
106 }
107 }
108
109 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111 }
112
113 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115 }
116
117 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119 }
120
121 pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122 PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123 }
124
125 pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126 PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127 }
128
129 pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130 PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131 }
132}
133
134impl Display for Operand {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Column(v) => write!(f, "{v}"),
138 Self::Value(v) => write!(f, "{v}"),
139 Self::Expr(v) => write!(f, "{v}"),
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148 Eq,
150 NotEq,
151 Lt,
152 LtEq,
153 Gt,
154 GtEq,
155
156 And,
158 Or,
159}
160
161impl RestrictedOp {
162 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163 match op {
164 ParserBinaryOperator::Eq => Some(Self::Eq),
165 ParserBinaryOperator::NotEq => Some(Self::NotEq),
166 ParserBinaryOperator::Lt => Some(Self::Lt),
167 ParserBinaryOperator::LtEq => Some(Self::LtEq),
168 ParserBinaryOperator::Gt => Some(Self::Gt),
169 ParserBinaryOperator::GtEq => Some(Self::GtEq),
170 ParserBinaryOperator::And => Some(Self::And),
171 ParserBinaryOperator::Or => Some(Self::Or),
172 _ => None,
173 }
174 }
175
176 pub fn to_parser_op(&self) -> ParserBinaryOperator {
177 match self {
178 Self::Eq => ParserBinaryOperator::Eq,
179 Self::NotEq => ParserBinaryOperator::NotEq,
180 Self::Lt => ParserBinaryOperator::Lt,
181 Self::LtEq => ParserBinaryOperator::LtEq,
182 Self::Gt => ParserBinaryOperator::Gt,
183 Self::GtEq => ParserBinaryOperator::GtEq,
184 Self::And => ParserBinaryOperator::And,
185 Self::Or => ParserBinaryOperator::Or,
186 }
187 }
188}
189impl Display for RestrictedOp {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 match self {
192 Self::Eq => write!(f, "="),
193 Self::NotEq => write!(f, "<>"),
194 Self::Lt => write!(f, "<"),
195 Self::LtEq => write!(f, "<="),
196 Self::Gt => write!(f, ">"),
197 Self::GtEq => write!(f, ">="),
198 Self::And => write!(f, "AND"),
199 Self::Or => write!(f, "OR"),
200 }
201 }
202}
203
204impl PartitionExpr {
205 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206 Self {
207 lhs: Box::new(lhs),
208 op,
209 rhs: Box::new(rhs),
210 }
211 }
212
213 pub fn to_parser_expr(&self) -> ParserExpr {
217 let lhs = match &*self.lhs {
220 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
222 Operand::Expr(e) => e.to_parser_expr(),
223 };
224
225 let rhs = match &*self.rhs {
226 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
228 Operand::Expr(e) => e.to_parser_expr(),
229 };
230
231 ParserExpr::BinaryOp {
232 left: Box::new(lhs),
233 op: self.op.to_parser_op(),
234 right: Box::new(rhs),
235 }
236 }
237
238 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239 let lhs = self.lhs.try_as_logical_expr()?;
240 let rhs = self.rhs.try_as_logical_expr()?;
241
242 let expr = match &self.op {
243 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
244 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
245 RestrictedOp::Gt => lhs.gt(rhs),
246 RestrictedOp::GtEq => lhs.gt_eq(rhs),
247 RestrictedOp::Lt => lhs.lt(rhs),
248 RestrictedOp::LtEq => lhs.lt_eq(rhs),
249 RestrictedOp::Eq => lhs.eq(rhs),
250 RestrictedOp::NotEq => lhs.not_eq(rhs),
251 };
252 Ok(expr)
253 }
254
255 pub fn lhs(&self) -> &Operand {
257 &self.lhs
258 }
259
260 pub fn rhs(&self) -> &Operand {
262 &self.rhs
263 }
264
265 pub fn op(&self) -> &RestrictedOp {
267 &self.op
268 }
269
270 pub fn try_as_physical_expr(
271 &self,
272 schema: &arrow::datatypes::SchemaRef,
273 ) -> error::Result<Arc<dyn PhysicalExpr>> {
274 let df_schema = schema
275 .clone()
276 .to_dfschema_ref()
277 .context(error::ToDFSchemaSnafu)?;
278 let execution_props = &ExecutionProps::default();
279 let expr = self.try_as_logical_expr()?;
280 create_physical_expr(&expr, &df_schema, execution_props)
281 .context(error::CreatePhysicalExprSnafu)
282 }
283
284 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
285 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
286 }
287
288 pub fn as_json_str(&self) -> error::Result<String> {
292 serde_json::to_string(&PartitionBound::Expr(self.clone()))
293 .context(error::SerializeJsonSnafu)
294 }
295
296 pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
300 if s.is_empty() {
301 return Ok(None);
302 }
303
304 let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
305 match bound {
306 PartitionBound::Expr(expr) => Ok(Some(expr)),
307 _ => Ok(None),
308 }
309 }
310
311 pub fn as_pb_partition(&self) -> error::Result<Partition> {
313 Ok(Partition {
314 expression: self.as_json_str()?,
315 ..Default::default()
316 })
317 }
318}
319
320impl Display for PartitionExpr {
321 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
322 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_partition_expr() {
332 let cases = [
333 (
334 Operand::Column("a".to_string()),
335 RestrictedOp::Eq,
336 Operand::Value(Value::UInt32(10)),
337 "a = 10",
338 ),
339 (
340 Operand::Column("a".to_string()),
341 RestrictedOp::NotEq,
342 Operand::Value(Value::UInt32(10)),
343 "a <> 10",
344 ),
345 (
346 Operand::Column("a".to_string()),
347 RestrictedOp::Lt,
348 Operand::Value(Value::UInt32(10)),
349 "a < 10",
350 ),
351 (
352 Operand::Column("a".to_string()),
353 RestrictedOp::LtEq,
354 Operand::Value(Value::UInt32(10)),
355 "a <= 10",
356 ),
357 (
358 Operand::Column("a".to_string()),
359 RestrictedOp::Gt,
360 Operand::Value(Value::UInt32(10)),
361 "a > 10",
362 ),
363 (
364 Operand::Column("a".to_string()),
365 RestrictedOp::GtEq,
366 Operand::Value(Value::UInt32(10)),
367 "a >= 10",
368 ),
369 (
370 Operand::Column("a".to_string()),
371 RestrictedOp::And,
372 Operand::Column("b".to_string()),
373 "a AND b",
374 ),
375 (
376 Operand::Column("a".to_string()),
377 RestrictedOp::Or,
378 Operand::Column("b".to_string()),
379 "a OR b",
380 ),
381 (
382 Operand::Column("a".to_string()),
383 RestrictedOp::Or,
384 Operand::Expr(PartitionExpr::new(
385 Operand::Column("c".to_string()),
386 RestrictedOp::And,
387 Operand::Column("d".to_string()),
388 )),
389 "a OR c AND d",
390 ),
391 ];
392
393 for case in cases {
394 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
395 assert_eq!(case.3, expr.to_string());
396 }
397 }
398
399 #[test]
400 fn test_serde_partition_expr() {
401 let expr = PartitionExpr::new(
402 Operand::Column("a".to_string()),
403 RestrictedOp::Eq,
404 Operand::Value(Value::UInt32(10)),
405 );
406 let json = expr.as_json_str().unwrap();
407 assert_eq!(
408 json,
409 "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
410 );
411
412 let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
413 let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
414 let expected = PartitionExpr::new(
415 Operand::Column("a".to_string()),
416 RestrictedOp::GtEq,
417 Operand::Value(Value::UInt32(10)),
418 );
419 assert_eq!(expr2, expected);
420
421 let json = "";
423 let expr3 = PartitionExpr::from_json_str(json).unwrap();
424 assert!(expr3.is_none());
425
426 let json = r#""MaxValue""#;
428 let expr4 = PartitionExpr::from_json_str(json).unwrap();
429 assert!(expr4.is_none());
430
431 let json = r#"{"Value":{"UInt32":10}}"#;
432 let expr5 = PartitionExpr::from_json_str(json).unwrap();
433 assert!(expr5.is_none());
434 }
435}