1use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18use api::v1::meta::Partition;
19use datafusion_common::{ScalarValue, ToDFSchema};
20use datafusion_expr::Expr;
21use datafusion_expr::execution_props::ExecutionProps;
22use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
23use datatypes::arrow;
24use datatypes::value::{
25 Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
26};
27use serde::{Deserialize, Serialize};
28use snafu::ResultExt;
29use sql::statements::value_to_sql_value;
30use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
31
32use crate::error;
33use crate::partition::PartitionBound;
34
35#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
40pub struct PartitionExpr {
41 pub lhs: Box<Operand>,
42 pub op: RestrictedOp,
43 pub rhs: Box<Operand>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
47pub enum Operand {
48 Column(String),
49 Value(Value),
50 Expr(PartitionExpr),
51}
52
53pub fn col(column_name: impl Into<String>) -> Operand {
54 Operand::Column(column_name.into())
55}
56
57impl From<Value> for Operand {
58 fn from(value: Value) -> Self {
59 Operand::Value(value)
60 }
61}
62
63impl Operand {
64 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
65 match self {
66 Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
67 Self::Value(v) => {
68 let scalar_value = match v {
69 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
70 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
71 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
72 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
73 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
74 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
75 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
76 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
77 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
78 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
79 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
80 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
81 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
82 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
83 Value::Null => ScalarValue::Null,
84 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
85 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
86 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
87 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
88 Value::IntervalMonthDayNano(v) => {
89 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
90 }
91 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
92 Value::Decimal128(d) => {
93 let (v, p, s) = d.to_scalar_value();
94 ScalarValue::Decimal128(v, p, s)
95 }
96 other => {
97 return error::UnsupportedPartitionExprValueSnafu {
98 value: other.clone(),
99 }
100 .fail();
101 }
102 };
103 Ok(datafusion_expr::lit(scalar_value))
104 }
105 Self::Expr(e) => e.try_as_logical_expr(),
106 }
107 }
108
109 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
110 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
111 }
112
113 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
114 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
115 }
116
117 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
118 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
119 }
120
121 pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
122 PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
123 }
124
125 pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
126 PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
127 }
128
129 pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
130 PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
131 }
132}
133
134impl Display for Operand {
135 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136 match self {
137 Self::Column(v) => write!(f, "{v}"),
138 Self::Value(v) => write!(f, "{v}"),
139 Self::Expr(v) => write!(f, "{v}"),
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
147pub enum RestrictedOp {
148 Eq,
150 NotEq,
151 Lt,
152 LtEq,
153 Gt,
154 GtEq,
155
156 And,
158 Or,
159}
160
161impl RestrictedOp {
162 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
163 match op {
164 ParserBinaryOperator::Eq => Some(Self::Eq),
165 ParserBinaryOperator::NotEq => Some(Self::NotEq),
166 ParserBinaryOperator::Lt => Some(Self::Lt),
167 ParserBinaryOperator::LtEq => Some(Self::LtEq),
168 ParserBinaryOperator::Gt => Some(Self::Gt),
169 ParserBinaryOperator::GtEq => Some(Self::GtEq),
170 ParserBinaryOperator::And => Some(Self::And),
171 ParserBinaryOperator::Or => Some(Self::Or),
172 _ => None,
173 }
174 }
175
176 pub fn to_parser_op(&self) -> ParserBinaryOperator {
177 match self {
178 Self::Eq => ParserBinaryOperator::Eq,
179 Self::NotEq => ParserBinaryOperator::NotEq,
180 Self::Lt => ParserBinaryOperator::Lt,
181 Self::LtEq => ParserBinaryOperator::LtEq,
182 Self::Gt => ParserBinaryOperator::Gt,
183 Self::GtEq => ParserBinaryOperator::GtEq,
184 Self::And => ParserBinaryOperator::And,
185 Self::Or => ParserBinaryOperator::Or,
186 }
187 }
188}
189impl Display for RestrictedOp {
190 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
191 match self {
192 Self::Eq => write!(f, "="),
193 Self::NotEq => write!(f, "<>"),
194 Self::Lt => write!(f, "<"),
195 Self::LtEq => write!(f, "<="),
196 Self::Gt => write!(f, ">"),
197 Self::GtEq => write!(f, ">="),
198 Self::And => write!(f, "AND"),
199 Self::Or => write!(f, "OR"),
200 }
201 }
202}
203
204impl PartitionExpr {
205 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
206 Self {
207 lhs: Box::new(lhs),
208 op,
209 rhs: Box::new(rhs),
210 }
211 }
212
213 pub fn to_parser_expr(&self) -> ParserExpr {
217 let lhs = match &*self.lhs {
220 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
221 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
222 Operand::Expr(e) => e.to_parser_expr(),
223 };
224
225 let rhs = match &*self.rhs {
226 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
227 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
228 Operand::Expr(e) => e.to_parser_expr(),
229 };
230
231 ParserExpr::BinaryOp {
232 left: Box::new(lhs),
233 op: self.op.to_parser_op(),
234 right: Box::new(rhs),
235 }
236 }
237
238 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
239 let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
242 let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
243
244 match (self.op.clone(), lhs_is_null, rhs_is_null) {
245 (RestrictedOp::Eq, _, true) => {
246 return Ok(self.lhs.try_as_logical_expr()?.is_null());
247 }
248 (RestrictedOp::Eq, true, _) => {
249 return Ok(self.rhs.try_as_logical_expr()?.is_null());
250 }
251 (RestrictedOp::NotEq, _, true) => {
252 return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
253 }
254 (RestrictedOp::NotEq, true, _) => {
255 return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
256 }
257 _ => {}
258 }
259
260 if matches!(
261 self.op,
262 RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
263 ) {
264 if matches!(self.lhs.as_ref(), Operand::Column(_)) {
265 let column_expr = self.lhs.try_as_logical_expr()?;
266 let other_expr = self.rhs.try_as_logical_expr()?;
267 let base = match self.op {
268 RestrictedOp::Lt => column_expr.clone().lt(other_expr),
269 RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr),
270 RestrictedOp::Gt => column_expr.clone().gt(other_expr),
271 RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr),
272 _ => unreachable!(),
273 };
274 return Ok(datafusion_expr::or(base, column_expr.is_null()));
275 } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
276 let other_expr = self.lhs.try_as_logical_expr()?;
277 let column_expr = self.rhs.try_as_logical_expr()?;
278 let base = match self.op {
279 RestrictedOp::Lt => other_expr.lt(column_expr.clone()),
280 RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()),
281 RestrictedOp::Gt => other_expr.gt(column_expr.clone()),
282 RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()),
283 _ => unreachable!(),
284 };
285 return Ok(datafusion_expr::or(base, column_expr.is_null()));
286 }
287 }
288
289 let lhs = self.lhs.try_as_logical_expr()?;
291 let rhs = self.rhs.try_as_logical_expr()?;
292
293 let expr = match &self.op {
294 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
295 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
296 RestrictedOp::Gt => lhs.gt(rhs),
297 RestrictedOp::GtEq => lhs.gt_eq(rhs),
298 RestrictedOp::Lt => lhs.lt(rhs),
299 RestrictedOp::LtEq => lhs.lt_eq(rhs),
300 RestrictedOp::Eq => lhs.eq(rhs),
301 RestrictedOp::NotEq => lhs.not_eq(rhs),
302 };
303 Ok(expr)
304 }
305
306 pub fn lhs(&self) -> &Operand {
308 &self.lhs
309 }
310
311 pub fn rhs(&self) -> &Operand {
313 &self.rhs
314 }
315
316 pub fn op(&self) -> &RestrictedOp {
318 &self.op
319 }
320
321 pub fn try_as_physical_expr(
322 &self,
323 schema: &arrow::datatypes::SchemaRef,
324 ) -> error::Result<Arc<dyn PhysicalExpr>> {
325 let df_schema = schema
326 .clone()
327 .to_dfschema_ref()
328 .context(error::ToDFSchemaSnafu)?;
329 let execution_props = &ExecutionProps::default();
330 let expr = self.try_as_logical_expr()?;
331 create_physical_expr(&expr, &df_schema, execution_props)
332 .context(error::CreatePhysicalExprSnafu)
333 }
334
335 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
336 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
337 }
338
339 pub fn as_json_str(&self) -> error::Result<String> {
343 serde_json::to_string(&PartitionBound::Expr(self.clone()))
344 .context(error::SerializeJsonSnafu)
345 }
346
347 pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
351 if s.is_empty() {
352 return Ok(None);
353 }
354
355 let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
356 match bound {
357 PartitionBound::Expr(expr) => Ok(Some(expr)),
358 _ => Ok(None),
359 }
360 }
361
362 pub fn as_pb_partition(&self) -> error::Result<Partition> {
364 Ok(Partition {
365 expression: self.as_json_str()?,
366 ..Default::default()
367 })
368 }
369}
370
371impl Display for PartitionExpr {
372 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
373 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn test_partition_expr() {
383 let cases = [
384 (
385 Operand::Column("a".to_string()),
386 RestrictedOp::Eq,
387 Operand::Value(Value::UInt32(10)),
388 "a = 10",
389 ),
390 (
391 Operand::Column("a".to_string()),
392 RestrictedOp::NotEq,
393 Operand::Value(Value::UInt32(10)),
394 "a <> 10",
395 ),
396 (
397 Operand::Column("a".to_string()),
398 RestrictedOp::Lt,
399 Operand::Value(Value::UInt32(10)),
400 "a < 10",
401 ),
402 (
403 Operand::Column("a".to_string()),
404 RestrictedOp::LtEq,
405 Operand::Value(Value::UInt32(10)),
406 "a <= 10",
407 ),
408 (
409 Operand::Column("a".to_string()),
410 RestrictedOp::Gt,
411 Operand::Value(Value::UInt32(10)),
412 "a > 10",
413 ),
414 (
415 Operand::Column("a".to_string()),
416 RestrictedOp::GtEq,
417 Operand::Value(Value::UInt32(10)),
418 "a >= 10",
419 ),
420 (
421 Operand::Column("a".to_string()),
422 RestrictedOp::And,
423 Operand::Column("b".to_string()),
424 "a AND b",
425 ),
426 (
427 Operand::Column("a".to_string()),
428 RestrictedOp::Or,
429 Operand::Column("b".to_string()),
430 "a OR b",
431 ),
432 (
433 Operand::Column("a".to_string()),
434 RestrictedOp::Or,
435 Operand::Expr(PartitionExpr::new(
436 Operand::Column("c".to_string()),
437 RestrictedOp::And,
438 Operand::Column("d".to_string()),
439 )),
440 "a OR c AND d",
441 ),
442 ];
443
444 for case in cases {
445 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
446 assert_eq!(case.3, expr.to_string());
447 }
448 }
449
450 #[test]
451 fn test_try_as_logical_expr_null_equality() {
452 let eq_expr = PartitionExpr::new(
453 Operand::Column("a".to_string()),
454 RestrictedOp::Eq,
455 Operand::Value(Value::Null),
456 );
457 assert_eq!(
458 eq_expr.try_as_logical_expr().unwrap().to_string(),
459 "a IS NULL"
460 );
461
462 let neq_expr = PartitionExpr::new(
463 Operand::Column("a".to_string()),
464 RestrictedOp::NotEq,
465 Operand::Value(Value::Null),
466 );
467 assert_eq!(
468 neq_expr.try_as_logical_expr().unwrap().to_string(),
469 "a IS NOT NULL"
470 );
471 }
472
473 #[test]
474 fn test_try_as_logical_expr_null_range_comparison() {
475 let lt_expr = PartitionExpr::new(
477 Operand::Column("a".to_string()),
478 RestrictedOp::Lt,
479 Operand::Value(Value::Int64(10)),
480 );
481 assert_eq!(
482 lt_expr.try_as_logical_expr().unwrap().to_string(),
483 "a < Int64(10) OR a IS NULL"
484 );
485
486 let lt_expr_rhs_column = PartitionExpr::new(
488 Operand::Value(Value::Int64(10)),
489 RestrictedOp::Lt,
490 Operand::Column("a".to_string()),
491 );
492 assert_eq!(
493 lt_expr_rhs_column
494 .try_as_logical_expr()
495 .unwrap()
496 .to_string(),
497 "Int64(10) < a OR a IS NULL"
498 );
499
500 let gt_expr = PartitionExpr::new(
502 Operand::Column("a".to_string()),
503 RestrictedOp::Gt,
504 Operand::Value(Value::Int64(10)),
505 );
506 assert_eq!(
507 gt_expr.try_as_logical_expr().unwrap().to_string(),
508 "a > Int64(10) OR a IS NULL"
509 );
510
511 let gt_expr_rhs_column = PartitionExpr::new(
513 Operand::Value(Value::Int64(10)),
514 RestrictedOp::Gt,
515 Operand::Column("a".to_string()),
516 );
517 assert_eq!(
518 gt_expr_rhs_column
519 .try_as_logical_expr()
520 .unwrap()
521 .to_string(),
522 "Int64(10) > a OR a IS NULL"
523 );
524
525 let gteq_expr = PartitionExpr::new(
527 Operand::Column("a".to_string()),
528 RestrictedOp::GtEq,
529 Operand::Value(Value::Int64(10)),
530 );
531 assert_eq!(
532 gteq_expr.try_as_logical_expr().unwrap().to_string(),
533 "a >= Int64(10) OR a IS NULL"
534 );
535
536 let lteq_expr = PartitionExpr::new(
538 Operand::Column("a".to_string()),
539 RestrictedOp::LtEq,
540 Operand::Value(Value::Int64(10)),
541 );
542 assert_eq!(
543 lteq_expr.try_as_logical_expr().unwrap().to_string(),
544 "a <= Int64(10) OR a IS NULL"
545 );
546 }
547
548 #[test]
549 fn test_serde_partition_expr() {
550 let expr = PartitionExpr::new(
551 Operand::Column("a".to_string()),
552 RestrictedOp::Eq,
553 Operand::Value(Value::UInt32(10)),
554 );
555 let json = expr.as_json_str().unwrap();
556 assert_eq!(
557 json,
558 "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
559 );
560
561 let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
562 let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
563 let expected = PartitionExpr::new(
564 Operand::Column("a".to_string()),
565 RestrictedOp::GtEq,
566 Operand::Value(Value::UInt32(10)),
567 );
568 assert_eq!(expr2, expected);
569
570 let json = "";
572 let expr3 = PartitionExpr::from_json_str(json).unwrap();
573 assert!(expr3.is_none());
574
575 let json = r#""MaxValue""#;
577 let expr4 = PartitionExpr::from_json_str(json).unwrap();
578 assert!(expr4.is_none());
579
580 let json = r#"{"Value":{"UInt32":10}}"#;
581 let expr5 = PartitionExpr::from_json_str(json).unwrap();
582 assert!(expr5.is_none());
583 }
584}