1use std::collections::HashSet;
16use std::fmt::{Debug, Display, Formatter};
17use std::sync::Arc;
18
19use api::v1::meta::Partition;
20use datafusion_common::{ScalarValue, ToDFSchema};
21use datafusion_expr::Expr;
22use datafusion_expr::execution_props::ExecutionProps;
23use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
24use datatypes::arrow;
25use datatypes::value::{
26 Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
27};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30use sql::statements::value_to_sql_value;
31use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
32
33use crate::error;
34use crate::partition::PartitionBound;
35
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
41pub struct PartitionExpr {
42 pub lhs: Box<Operand>,
43 pub op: RestrictedOp,
44 pub rhs: Box<Operand>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
48pub enum Operand {
49 Column(String),
50 Value(Value),
51 Expr(PartitionExpr),
52}
53
54pub fn col(column_name: impl Into<String>) -> Operand {
55 Operand::Column(column_name.into())
56}
57
58impl From<Value> for Operand {
59 fn from(value: Value) -> Self {
60 Operand::Value(value)
61 }
62}
63
64impl Operand {
65 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
66 match self {
67 Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
68 Self::Value(v) => {
69 let scalar_value = match v {
70 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
71 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
72 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
73 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
74 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
75 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
76 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
77 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
78 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
79 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
80 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
81 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
82 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
83 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
84 Value::Null => ScalarValue::Null,
85 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
86 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
87 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
88 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
89 Value::IntervalMonthDayNano(v) => {
90 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
91 }
92 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
93 Value::Decimal128(d) => {
94 let (v, p, s) = d.to_scalar_value();
95 ScalarValue::Decimal128(v, p, s)
96 }
97 other => {
98 return error::UnsupportedPartitionExprValueSnafu {
99 value: other.clone(),
100 }
101 .fail();
102 }
103 };
104 Ok(datafusion_expr::lit(scalar_value))
105 }
106 Self::Expr(e) => e.try_as_logical_expr(),
107 }
108 }
109
110 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
111 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
112 }
113
114 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
115 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
116 }
117
118 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
119 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
120 }
121
122 pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
123 PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
124 }
125
126 pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
127 PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
128 }
129
130 pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
131 PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
132 }
133}
134
135impl Display for Operand {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 match self {
138 Self::Column(v) => write!(f, "{v}"),
139 Self::Value(v) => write!(f, "{v}"),
140 Self::Expr(v) => write!(f, "{v}"),
141 }
142 }
143}
144
145#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
148pub enum RestrictedOp {
149 Eq,
151 NotEq,
152 Lt,
153 LtEq,
154 Gt,
155 GtEq,
156
157 And,
159 Or,
160}
161
162impl RestrictedOp {
163 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
164 match op {
165 ParserBinaryOperator::Eq => Some(Self::Eq),
166 ParserBinaryOperator::NotEq => Some(Self::NotEq),
167 ParserBinaryOperator::Lt => Some(Self::Lt),
168 ParserBinaryOperator::LtEq => Some(Self::LtEq),
169 ParserBinaryOperator::Gt => Some(Self::Gt),
170 ParserBinaryOperator::GtEq => Some(Self::GtEq),
171 ParserBinaryOperator::And => Some(Self::And),
172 ParserBinaryOperator::Or => Some(Self::Or),
173 _ => None,
174 }
175 }
176
177 pub fn to_parser_op(&self) -> ParserBinaryOperator {
178 match self {
179 Self::Eq => ParserBinaryOperator::Eq,
180 Self::NotEq => ParserBinaryOperator::NotEq,
181 Self::Lt => ParserBinaryOperator::Lt,
182 Self::LtEq => ParserBinaryOperator::LtEq,
183 Self::Gt => ParserBinaryOperator::Gt,
184 Self::GtEq => ParserBinaryOperator::GtEq,
185 Self::And => ParserBinaryOperator::And,
186 Self::Or => ParserBinaryOperator::Or,
187 }
188 }
189
190 fn invert_for_swap(&self) -> Self {
191 match self {
192 Self::Eq => Self::Eq,
193 Self::NotEq => Self::NotEq,
194 Self::Lt => Self::Gt,
195 Self::LtEq => Self::GtEq,
196 Self::Gt => Self::Lt,
197 Self::GtEq => Self::LtEq,
198 Self::And => Self::And,
199 Self::Or => Self::Or,
200 }
201 }
202}
203impl Display for RestrictedOp {
204 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205 match self {
206 Self::Eq => write!(f, "="),
207 Self::NotEq => write!(f, "<>"),
208 Self::Lt => write!(f, "<"),
209 Self::LtEq => write!(f, "<="),
210 Self::Gt => write!(f, ">"),
211 Self::GtEq => write!(f, ">="),
212 Self::And => write!(f, "AND"),
213 Self::Or => write!(f, "OR"),
214 }
215 }
216}
217
218impl PartitionExpr {
219 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
220 Self {
221 lhs: Box::new(lhs),
222 op,
223 rhs: Box::new(rhs),
224 }
225 .canonicalize()
226 }
227
228 pub fn canonicalize(self) -> Self {
230 let lhs = Self::canonicalize_operand(*self.lhs);
231 let rhs = Self::canonicalize_operand(*self.rhs);
232 let mut expr = Self {
233 lhs: Box::new(lhs),
234 op: self.op,
235 rhs: Box::new(rhs),
236 };
237
238 if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
239 std::mem::swap(&mut expr.lhs, &mut expr.rhs);
240 expr.op = expr.op.invert_for_swap();
241 }
242
243 expr
244 }
245
246 fn canonicalize_operand(operand: Operand) -> Operand {
247 match operand {
248 Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
249 other => other,
250 }
251 }
252
253 pub fn to_parser_expr(&self) -> ParserExpr {
257 let lhs = match &*self.lhs {
260 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
261 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
262 Operand::Expr(e) => e.to_parser_expr(),
263 };
264
265 let rhs = match &*self.rhs {
266 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
267 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
268 Operand::Expr(e) => e.to_parser_expr(),
269 };
270
271 ParserExpr::BinaryOp {
272 left: Box::new(lhs),
273 op: self.op.to_parser_op(),
274 right: Box::new(rhs),
275 }
276 }
277
278 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
279 let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
282 let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
283
284 match (self.op.clone(), lhs_is_null, rhs_is_null) {
285 (RestrictedOp::Eq, _, true) => {
286 return Ok(self.lhs.try_as_logical_expr()?.is_null());
287 }
288 (RestrictedOp::Eq, true, _) => {
289 return Ok(self.rhs.try_as_logical_expr()?.is_null());
290 }
291 (RestrictedOp::NotEq, _, true) => {
292 return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
293 }
294 (RestrictedOp::NotEq, true, _) => {
295 return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
296 }
297 _ => {}
298 }
299
300 if matches!(
301 self.op,
302 RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
303 ) {
304 if matches!(self.lhs.as_ref(), Operand::Column(_)) {
305 let column_expr = self.lhs.try_as_logical_expr()?;
306 let other_expr = self.rhs.try_as_logical_expr()?;
307 let base = match self.op {
308 RestrictedOp::Lt => column_expr.clone().lt(other_expr),
309 RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr),
310 RestrictedOp::Gt => column_expr.clone().gt(other_expr),
311 RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr),
312 _ => unreachable!(),
313 };
314 return Ok(datafusion_expr::or(base, column_expr.is_null()));
315 } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
316 let other_expr = self.lhs.try_as_logical_expr()?;
317 let column_expr = self.rhs.try_as_logical_expr()?;
318 let base = match self.op {
319 RestrictedOp::Lt => other_expr.lt(column_expr.clone()),
320 RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()),
321 RestrictedOp::Gt => other_expr.gt(column_expr.clone()),
322 RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()),
323 _ => unreachable!(),
324 };
325 return Ok(datafusion_expr::or(base, column_expr.is_null()));
326 }
327 }
328
329 let lhs = self.lhs.try_as_logical_expr()?;
331 let rhs = self.rhs.try_as_logical_expr()?;
332
333 let expr = match &self.op {
334 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
335 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
336 RestrictedOp::Gt => lhs.gt(rhs),
337 RestrictedOp::GtEq => lhs.gt_eq(rhs),
338 RestrictedOp::Lt => lhs.lt(rhs),
339 RestrictedOp::LtEq => lhs.lt_eq(rhs),
340 RestrictedOp::Eq => lhs.eq(rhs),
341 RestrictedOp::NotEq => lhs.not_eq(rhs),
342 };
343 Ok(expr)
344 }
345
346 pub fn lhs(&self) -> &Operand {
348 &self.lhs
349 }
350
351 pub fn rhs(&self) -> &Operand {
353 &self.rhs
354 }
355
356 pub fn op(&self) -> &RestrictedOp {
358 &self.op
359 }
360
361 pub fn try_as_physical_expr(
362 &self,
363 schema: &arrow::datatypes::SchemaRef,
364 ) -> error::Result<Arc<dyn PhysicalExpr>> {
365 let df_schema = schema
366 .clone()
367 .to_dfschema_ref()
368 .context(error::ToDFSchemaSnafu)?;
369 let execution_props = &ExecutionProps::default();
370 let expr = self.try_as_logical_expr()?;
371 create_physical_expr(&expr, &df_schema, execution_props)
372 .context(error::CreatePhysicalExprSnafu)
373 }
374
375 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
376 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
377 }
378
379 pub fn as_json_str(&self) -> error::Result<String> {
383 serde_json::to_string(&PartitionBound::Expr(self.clone()))
384 .context(error::SerializeJsonSnafu)
385 }
386
387 pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
391 if s.is_empty() {
392 return Ok(None);
393 }
394
395 let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
396 match bound {
397 PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
398 _ => Ok(None),
399 }
400 }
401
402 pub fn as_pb_partition(&self) -> error::Result<Partition> {
404 Ok(Partition {
405 expression: self.as_json_str()?,
406 ..Default::default()
407 })
408 }
409
410 pub fn collect_column_names(&self, columns: &mut HashSet<String>) {
412 Self::collect_operand_columns(&self.lhs, columns);
413 Self::collect_operand_columns(&self.rhs, columns);
414 }
415
416 fn collect_operand_columns(operand: &Operand, columns: &mut HashSet<String>) {
417 match operand {
418 Operand::Column(c) => {
419 columns.insert(c.clone());
420 }
421 Operand::Expr(e) => {
422 e.collect_column_names(columns);
423 }
424 Operand::Value(_) => {}
425 }
426 }
427}
428
429impl Display for PartitionExpr {
430 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
431 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
432 }
433}
434
435#[cfg(test)]
436mod tests {
437 use super::*;
438
439 #[test]
440 fn test_partition_expr() {
441 let cases = [
442 (
443 Operand::Column("a".to_string()),
444 RestrictedOp::Eq,
445 Operand::Value(Value::UInt32(10)),
446 "a = 10",
447 ),
448 (
449 Operand::Column("a".to_string()),
450 RestrictedOp::NotEq,
451 Operand::Value(Value::UInt32(10)),
452 "a <> 10",
453 ),
454 (
455 Operand::Column("a".to_string()),
456 RestrictedOp::Lt,
457 Operand::Value(Value::UInt32(10)),
458 "a < 10",
459 ),
460 (
461 Operand::Column("a".to_string()),
462 RestrictedOp::LtEq,
463 Operand::Value(Value::UInt32(10)),
464 "a <= 10",
465 ),
466 (
467 Operand::Column("a".to_string()),
468 RestrictedOp::Gt,
469 Operand::Value(Value::UInt32(10)),
470 "a > 10",
471 ),
472 (
473 Operand::Column("a".to_string()),
474 RestrictedOp::GtEq,
475 Operand::Value(Value::UInt32(10)),
476 "a >= 10",
477 ),
478 (
479 Operand::Column("a".to_string()),
480 RestrictedOp::And,
481 Operand::Column("b".to_string()),
482 "a AND b",
483 ),
484 (
485 Operand::Column("a".to_string()),
486 RestrictedOp::Or,
487 Operand::Column("b".to_string()),
488 "a OR b",
489 ),
490 (
491 Operand::Column("a".to_string()),
492 RestrictedOp::Or,
493 Operand::Expr(PartitionExpr::new(
494 Operand::Column("c".to_string()),
495 RestrictedOp::And,
496 Operand::Column("d".to_string()),
497 )),
498 "a OR c AND d",
499 ),
500 ];
501
502 for case in cases {
503 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
504 assert_eq!(case.3, expr.to_string());
505 }
506 }
507
508 #[test]
509 fn test_try_as_logical_expr_null_equality() {
510 let eq_expr = PartitionExpr::new(
511 Operand::Column("a".to_string()),
512 RestrictedOp::Eq,
513 Operand::Value(Value::Null),
514 );
515 assert_eq!(
516 eq_expr.try_as_logical_expr().unwrap().to_string(),
517 "a IS NULL"
518 );
519
520 let neq_expr = PartitionExpr::new(
521 Operand::Column("a".to_string()),
522 RestrictedOp::NotEq,
523 Operand::Value(Value::Null),
524 );
525 assert_eq!(
526 neq_expr.try_as_logical_expr().unwrap().to_string(),
527 "a IS NOT NULL"
528 );
529 }
530
531 #[test]
532 fn test_try_as_logical_expr_null_range_comparison() {
533 let lt_expr = PartitionExpr::new(
535 Operand::Column("a".to_string()),
536 RestrictedOp::Lt,
537 Operand::Value(Value::Int64(10)),
538 );
539 assert_eq!(
540 lt_expr.try_as_logical_expr().unwrap().to_string(),
541 "a < Int64(10) OR a IS NULL"
542 );
543
544 let lt_expr_rhs_column = PartitionExpr::new(
546 Operand::Value(Value::Int64(10)),
547 RestrictedOp::Lt,
548 Operand::Column("a".to_string()),
549 );
550 assert_eq!(
551 lt_expr_rhs_column
552 .try_as_logical_expr()
553 .unwrap()
554 .to_string(),
555 "a > Int64(10) OR a IS NULL"
556 );
557
558 let gt_expr = PartitionExpr::new(
560 Operand::Column("a".to_string()),
561 RestrictedOp::Gt,
562 Operand::Value(Value::Int64(10)),
563 );
564 assert_eq!(
565 gt_expr.try_as_logical_expr().unwrap().to_string(),
566 "a > Int64(10) OR a IS NULL"
567 );
568
569 let gt_expr_rhs_column = PartitionExpr::new(
571 Operand::Value(Value::Int64(10)),
572 RestrictedOp::Gt,
573 Operand::Column("a".to_string()),
574 );
575 assert_eq!(
576 gt_expr_rhs_column
577 .try_as_logical_expr()
578 .unwrap()
579 .to_string(),
580 "a < Int64(10) OR a IS NULL"
581 );
582
583 let gteq_expr = PartitionExpr::new(
585 Operand::Column("a".to_string()),
586 RestrictedOp::GtEq,
587 Operand::Value(Value::Int64(10)),
588 );
589 assert_eq!(
590 gteq_expr.try_as_logical_expr().unwrap().to_string(),
591 "a >= Int64(10) OR a IS NULL"
592 );
593
594 let lteq_expr = PartitionExpr::new(
596 Operand::Column("a".to_string()),
597 RestrictedOp::LtEq,
598 Operand::Value(Value::Int64(10)),
599 );
600 assert_eq!(
601 lteq_expr.try_as_logical_expr().unwrap().to_string(),
602 "a <= Int64(10) OR a IS NULL"
603 );
604 }
605
606 #[test]
607 fn test_serde_partition_expr() {
608 let expr = PartitionExpr::new(
609 Operand::Column("a".to_string()),
610 RestrictedOp::Eq,
611 Operand::Value(Value::UInt32(10)),
612 );
613 let json = expr.as_json_str().unwrap();
614 assert_eq!(
615 json,
616 "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
617 );
618
619 let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
620 let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
621 let expected = PartitionExpr::new(
622 Operand::Column("a".to_string()),
623 RestrictedOp::GtEq,
624 Operand::Value(Value::UInt32(10)),
625 );
626 assert_eq!(expr2, expected);
627
628 let json = "";
630 let expr3 = PartitionExpr::from_json_str(json).unwrap();
631 assert!(expr3.is_none());
632
633 let json = r#""MaxValue""#;
635 let expr4 = PartitionExpr::from_json_str(json).unwrap();
636 assert!(expr4.is_none());
637
638 let json = r#"{"Value":{"UInt32":10}}"#;
639 let expr5 = PartitionExpr::from_json_str(json).unwrap();
640 assert!(expr5.is_none());
641 }
642
643 #[test]
644 fn test_collect_column_names() {
645 let expr = col("a").eq(Value::Int64(1));
647 let mut columns = HashSet::new();
648 expr.collect_column_names(&mut columns);
649 assert_eq!(columns.len(), 1);
650 assert!(columns.contains("a"));
651
652 let expr = col("a")
654 .gt_eq(Value::Int64(0))
655 .and(col("a").lt(Value::Int64(10)));
656 let mut columns = HashSet::new();
657 expr.collect_column_names(&mut columns);
658 assert_eq!(columns.len(), 1);
659 assert!(columns.contains("a"));
660
661 let expr = col("a")
663 .gt_eq(Value::Int64(0))
664 .and(col("b").lt(Value::Int64(10)));
665 let mut columns = HashSet::new();
666 expr.collect_column_names(&mut columns);
667 assert_eq!(columns.len(), 2);
668 assert!(columns.contains("a"));
669 assert!(columns.contains("b"));
670
671 let expr = col("a")
673 .gt_eq(Value::Int64(0))
674 .and(col("b").lt(Value::Int64(10)))
675 .and(col("c").eq(Value::Int64(5)));
676 let mut columns = HashSet::new();
677 expr.collect_column_names(&mut columns);
678 assert_eq!(columns.len(), 3);
679 assert!(columns.contains("a"));
680 assert!(columns.contains("b"));
681 assert!(columns.contains("c"));
682 }
683}