1use std::collections::HashSet;
16use std::fmt::{Debug, Display, Formatter};
17use std::sync::Arc;
18
19use api::v1::meta::Partition;
20use datafusion_common::{ScalarValue, ToDFSchema};
21use datafusion_expr::Expr;
22use datafusion_expr::execution_props::ExecutionProps;
23use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
24use datatypes::arrow;
25use datatypes::value::{
26 Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
27};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30use sql::statements::value_to_sql_value;
31use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
32
33use crate::error;
34use crate::partition::PartitionBound;
35
36#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
41pub struct PartitionExpr {
42 pub lhs: Box<Operand>,
43 pub op: RestrictedOp,
44 pub rhs: Box<Operand>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
48pub enum Operand {
49 Column(String),
50 Value(Value),
51 Expr(PartitionExpr),
52}
53
54pub fn col(column_name: impl Into<String>) -> Operand {
55 Operand::Column(column_name.into())
56}
57
58impl From<Value> for Operand {
59 fn from(value: Value) -> Self {
60 Operand::Value(value)
61 }
62}
63
64impl Operand {
65 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
66 match self {
67 Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
68 Self::Value(v) => {
69 let scalar_value = match v {
70 Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
71 Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
72 Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
73 Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
74 Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
75 Value::Int8(v) => ScalarValue::Int8(Some(*v)),
76 Value::Int16(v) => ScalarValue::Int16(Some(*v)),
77 Value::Int32(v) => ScalarValue::Int32(Some(*v)),
78 Value::Int64(v) => ScalarValue::Int64(Some(*v)),
79 Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
80 Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
81 Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
82 Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
83 Value::Date(v) => ScalarValue::Date32(Some(v.val())),
84 Value::Null => ScalarValue::Null,
85 Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
86 Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))
87 .context(error::ConvertPartitionExprValueSnafu { value: v.clone() })?,
88 Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
89 Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
90 Value::IntervalMonthDayNano(v) => {
91 ScalarValue::IntervalMonthDayNano(Some((*v).into()))
92 }
93 Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
94 Value::Decimal128(d) => {
95 let (v, p, s) = d.to_scalar_value();
96 ScalarValue::Decimal128(v, p, s)
97 }
98 other => {
99 return error::UnsupportedPartitionExprValueSnafu {
100 value: other.clone(),
101 }
102 .fail();
103 }
104 };
105 Ok(datafusion_expr::lit(scalar_value))
106 }
107 Self::Expr(e) => e.try_as_logical_expr(),
108 }
109 }
110
111 pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
112 PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
113 }
114
115 pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
116 PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
117 }
118
119 pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
120 PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
121 }
122
123 pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
124 PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
125 }
126
127 pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
128 PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
129 }
130
131 pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
132 PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
133 }
134}
135
136impl Display for Operand {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 match self {
139 Self::Column(v) => write!(f, "{v}"),
140 Self::Value(v) => write!(f, "{v}"),
141 Self::Expr(v) => write!(f, "{v}"),
142 }
143 }
144}
145
146#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
149pub enum RestrictedOp {
150 Eq,
152 NotEq,
153 Lt,
154 LtEq,
155 Gt,
156 GtEq,
157
158 And,
160 Or,
161}
162
163impl RestrictedOp {
164 pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
165 match op {
166 ParserBinaryOperator::Eq => Some(Self::Eq),
167 ParserBinaryOperator::NotEq => Some(Self::NotEq),
168 ParserBinaryOperator::Lt => Some(Self::Lt),
169 ParserBinaryOperator::LtEq => Some(Self::LtEq),
170 ParserBinaryOperator::Gt => Some(Self::Gt),
171 ParserBinaryOperator::GtEq => Some(Self::GtEq),
172 ParserBinaryOperator::And => Some(Self::And),
173 ParserBinaryOperator::Or => Some(Self::Or),
174 _ => None,
175 }
176 }
177
178 pub fn to_parser_op(&self) -> ParserBinaryOperator {
179 match self {
180 Self::Eq => ParserBinaryOperator::Eq,
181 Self::NotEq => ParserBinaryOperator::NotEq,
182 Self::Lt => ParserBinaryOperator::Lt,
183 Self::LtEq => ParserBinaryOperator::LtEq,
184 Self::Gt => ParserBinaryOperator::Gt,
185 Self::GtEq => ParserBinaryOperator::GtEq,
186 Self::And => ParserBinaryOperator::And,
187 Self::Or => ParserBinaryOperator::Or,
188 }
189 }
190
191 fn invert_for_swap(&self) -> Self {
192 match self {
193 Self::Eq => Self::Eq,
194 Self::NotEq => Self::NotEq,
195 Self::Lt => Self::Gt,
196 Self::LtEq => Self::GtEq,
197 Self::Gt => Self::Lt,
198 Self::GtEq => Self::LtEq,
199 Self::And => Self::And,
200 Self::Or => Self::Or,
201 }
202 }
203}
204impl Display for RestrictedOp {
205 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
206 match self {
207 Self::Eq => write!(f, "="),
208 Self::NotEq => write!(f, "<>"),
209 Self::Lt => write!(f, "<"),
210 Self::LtEq => write!(f, "<="),
211 Self::Gt => write!(f, ">"),
212 Self::GtEq => write!(f, ">="),
213 Self::And => write!(f, "AND"),
214 Self::Or => write!(f, "OR"),
215 }
216 }
217}
218
219impl PartitionExpr {
220 pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
221 Self {
222 lhs: Box::new(lhs),
223 op,
224 rhs: Box::new(rhs),
225 }
226 .canonicalize()
227 }
228
229 pub fn canonicalize(self) -> Self {
231 let lhs = Self::canonicalize_operand(*self.lhs);
232 let rhs = Self::canonicalize_operand(*self.rhs);
233 let mut expr = Self {
234 lhs: Box::new(lhs),
235 op: self.op,
236 rhs: Box::new(rhs),
237 };
238
239 if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
240 std::mem::swap(&mut expr.lhs, &mut expr.rhs);
241 expr.op = expr.op.invert_for_swap();
242 }
243
244 expr
245 }
246
247 fn canonicalize_operand(operand: Operand) -> Operand {
248 match operand {
249 Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
250 other => other,
251 }
252 }
253
254 pub fn to_parser_expr(&self) -> ParserExpr {
258 let lhs = match &*self.lhs {
261 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
262 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
263 Operand::Expr(e) => e.to_parser_expr(),
264 };
265
266 let rhs = match &*self.rhs {
267 Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
268 Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
269 Operand::Expr(e) => e.to_parser_expr(),
270 };
271
272 ParserExpr::BinaryOp {
273 left: Box::new(lhs),
274 op: self.op.to_parser_op(),
275 right: Box::new(rhs),
276 }
277 }
278
279 pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
280 let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
283 let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
284
285 match (self.op.clone(), lhs_is_null, rhs_is_null) {
286 (RestrictedOp::Eq, _, true) => {
287 return Ok(self.lhs.try_as_logical_expr()?.is_null());
288 }
289 (RestrictedOp::Eq, true, _) => {
290 return Ok(self.rhs.try_as_logical_expr()?.is_null());
291 }
292 (RestrictedOp::NotEq, _, true) => {
293 return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
294 }
295 (RestrictedOp::NotEq, true, _) => {
296 return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
297 }
298 _ => {}
299 }
300
301 if matches!(
302 self.op,
303 RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
304 ) {
305 if matches!(self.lhs.as_ref(), Operand::Column(_)) {
309 let column_expr = self.lhs.try_as_logical_expr()?;
310 let other_expr = self.rhs.try_as_logical_expr()?;
311 let base = match self.op {
312 RestrictedOp::Lt => {
313 column_expr.clone().lt(other_expr).or(column_expr.is_null())
314 }
315 RestrictedOp::LtEq => column_expr
316 .clone()
317 .lt_eq(other_expr)
318 .or(column_expr.is_null()),
319 RestrictedOp::Gt => column_expr
320 .clone()
321 .gt(other_expr)
322 .and(column_expr.is_not_null()),
323 RestrictedOp::GtEq => column_expr
324 .clone()
325 .gt_eq(other_expr)
326 .and(column_expr.is_not_null()),
327 _ => unreachable!(),
328 };
329 return Ok(base);
330 } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
331 let other_expr = self.lhs.try_as_logical_expr()?;
332 let column_expr = self.rhs.try_as_logical_expr()?;
333 let base = match self.op {
334 RestrictedOp::Lt => other_expr
335 .lt(column_expr.clone())
336 .and(column_expr.is_not_null()),
337 RestrictedOp::LtEq => other_expr
338 .lt_eq(column_expr.clone())
339 .and(column_expr.is_not_null()),
340 RestrictedOp::Gt => {
341 other_expr.gt(column_expr.clone()).or(column_expr.is_null())
342 }
343 RestrictedOp::GtEq => other_expr
344 .gt_eq(column_expr.clone())
345 .or(column_expr.is_null()),
346 _ => unreachable!(),
347 };
348 return Ok(base);
349 }
350 }
351
352 let lhs = self.lhs.try_as_logical_expr()?;
354 let rhs = self.rhs.try_as_logical_expr()?;
355
356 let expr = match &self.op {
357 RestrictedOp::And => datafusion_expr::and(lhs, rhs),
358 RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
359 RestrictedOp::Gt => lhs.gt(rhs),
360 RestrictedOp::GtEq => lhs.gt_eq(rhs),
361 RestrictedOp::Lt => lhs.lt(rhs),
362 RestrictedOp::LtEq => lhs.lt_eq(rhs),
363 RestrictedOp::Eq => lhs.eq(rhs),
364 RestrictedOp::NotEq => lhs.not_eq(rhs),
365 };
366 Ok(expr)
367 }
368
369 pub fn lhs(&self) -> &Operand {
371 &self.lhs
372 }
373
374 pub fn rhs(&self) -> &Operand {
376 &self.rhs
377 }
378
379 pub fn op(&self) -> &RestrictedOp {
381 &self.op
382 }
383
384 pub fn try_as_physical_expr(
385 &self,
386 schema: &arrow::datatypes::SchemaRef,
387 ) -> error::Result<Arc<dyn PhysicalExpr>> {
388 let df_schema = schema
389 .clone()
390 .to_dfschema_ref()
391 .context(error::ToDFSchemaSnafu)?;
392 let execution_props = &ExecutionProps::default();
393 let expr = self.try_as_logical_expr()?;
394 create_physical_expr(&expr, &df_schema, execution_props)
395 .context(error::CreatePhysicalExprSnafu)
396 }
397
398 pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
399 PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
400 }
401
402 pub fn as_json_str(&self) -> error::Result<String> {
406 serde_json::to_string(&PartitionBound::Expr(self.clone()))
407 .context(error::SerializeJsonSnafu)
408 }
409
410 pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
414 if s.is_empty() {
415 return Ok(None);
416 }
417
418 let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
419 match bound {
420 PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
421 _ => Ok(None),
422 }
423 }
424
425 pub fn as_pb_partition(&self) -> error::Result<Partition> {
427 Ok(Partition {
428 expression: self.as_json_str()?,
429 ..Default::default()
430 })
431 }
432
433 pub fn collect_column_names(&self, columns: &mut HashSet<String>) {
435 Self::collect_operand_columns(&self.lhs, columns);
436 Self::collect_operand_columns(&self.rhs, columns);
437 }
438
439 fn collect_operand_columns(operand: &Operand, columns: &mut HashSet<String>) {
440 match operand {
441 Operand::Column(c) => {
442 columns.insert(c.clone());
443 }
444 Operand::Expr(e) => {
445 e.collect_column_names(columns);
446 }
447 Operand::Value(_) => {}
448 }
449 }
450}
451
452impl Display for PartitionExpr {
453 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
454 write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
455 }
456}
457
458#[cfg(test)]
459mod tests {
460 use super::*;
461
462 #[test]
463 fn test_partition_expr() {
464 let cases = [
465 (
466 Operand::Column("a".to_string()),
467 RestrictedOp::Eq,
468 Operand::Value(Value::UInt32(10)),
469 "a = 10",
470 ),
471 (
472 Operand::Column("a".to_string()),
473 RestrictedOp::NotEq,
474 Operand::Value(Value::UInt32(10)),
475 "a <> 10",
476 ),
477 (
478 Operand::Column("a".to_string()),
479 RestrictedOp::Lt,
480 Operand::Value(Value::UInt32(10)),
481 "a < 10",
482 ),
483 (
484 Operand::Column("a".to_string()),
485 RestrictedOp::LtEq,
486 Operand::Value(Value::UInt32(10)),
487 "a <= 10",
488 ),
489 (
490 Operand::Column("a".to_string()),
491 RestrictedOp::Gt,
492 Operand::Value(Value::UInt32(10)),
493 "a > 10",
494 ),
495 (
496 Operand::Column("a".to_string()),
497 RestrictedOp::GtEq,
498 Operand::Value(Value::UInt32(10)),
499 "a >= 10",
500 ),
501 (
502 Operand::Column("a".to_string()),
503 RestrictedOp::And,
504 Operand::Column("b".to_string()),
505 "a AND b",
506 ),
507 (
508 Operand::Column("a".to_string()),
509 RestrictedOp::Or,
510 Operand::Column("b".to_string()),
511 "a OR b",
512 ),
513 (
514 Operand::Column("a".to_string()),
515 RestrictedOp::Or,
516 Operand::Expr(PartitionExpr::new(
517 Operand::Column("c".to_string()),
518 RestrictedOp::And,
519 Operand::Column("d".to_string()),
520 )),
521 "a OR c AND d",
522 ),
523 ];
524
525 for case in cases {
526 let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
527 assert_eq!(case.3, expr.to_string());
528 }
529 }
530
531 #[test]
532 fn test_try_as_logical_expr_null_equality() {
533 let eq_expr = PartitionExpr::new(
534 Operand::Column("a".to_string()),
535 RestrictedOp::Eq,
536 Operand::Value(Value::Null),
537 );
538 assert_eq!(
539 eq_expr.try_as_logical_expr().unwrap().to_string(),
540 "a IS NULL"
541 );
542
543 let neq_expr = PartitionExpr::new(
544 Operand::Column("a".to_string()),
545 RestrictedOp::NotEq,
546 Operand::Value(Value::Null),
547 );
548 assert_eq!(
549 neq_expr.try_as_logical_expr().unwrap().to_string(),
550 "a IS NOT NULL"
551 );
552 }
553
554 #[test]
555 fn test_try_as_logical_expr_null_range_comparison() {
556 let lt_expr = PartitionExpr::new(
558 Operand::Column("a".to_string()),
559 RestrictedOp::Lt,
560 Operand::Value(Value::Int64(10)),
561 );
562 assert_eq!(
563 lt_expr.try_as_logical_expr().unwrap().to_string(),
564 "a < Int64(10) OR a IS NULL"
565 );
566
567 let lt_expr_rhs_column = PartitionExpr::new(
569 Operand::Value(Value::Int64(10)),
570 RestrictedOp::Lt,
571 Operand::Column("a".to_string()),
572 );
573 assert_eq!(
574 lt_expr_rhs_column
575 .try_as_logical_expr()
576 .unwrap()
577 .to_string(),
578 "a > Int64(10) AND a IS NOT NULL"
579 );
580
581 let gt_expr = PartitionExpr::new(
583 Operand::Column("a".to_string()),
584 RestrictedOp::Gt,
585 Operand::Value(Value::Int64(10)),
586 );
587 assert_eq!(
588 gt_expr.try_as_logical_expr().unwrap().to_string(),
589 "a > Int64(10) AND a IS NOT NULL"
590 );
591
592 let gt_expr_rhs_column = PartitionExpr::new(
594 Operand::Value(Value::Int64(10)),
595 RestrictedOp::Gt,
596 Operand::Column("a".to_string()),
597 );
598 assert_eq!(
599 gt_expr_rhs_column
600 .try_as_logical_expr()
601 .unwrap()
602 .to_string(),
603 "a < Int64(10) OR a IS NULL"
604 );
605
606 let gteq_expr = PartitionExpr::new(
608 Operand::Column("a".to_string()),
609 RestrictedOp::GtEq,
610 Operand::Value(Value::Int64(10)),
611 );
612 assert_eq!(
613 gteq_expr.try_as_logical_expr().unwrap().to_string(),
614 "a >= Int64(10) AND a IS NOT NULL"
615 );
616
617 let lteq_expr = PartitionExpr::new(
619 Operand::Column("a".to_string()),
620 RestrictedOp::LtEq,
621 Operand::Value(Value::Int64(10)),
622 );
623 assert_eq!(
624 lteq_expr.try_as_logical_expr().unwrap().to_string(),
625 "a <= Int64(10) OR a IS NULL"
626 );
627
628 let gteq_expr_rhs_column = PartitionExpr::new(
629 Operand::Value(Value::Int64(10)),
630 RestrictedOp::GtEq,
631 Operand::Column("a".to_string()),
632 );
633 assert_eq!(
634 gteq_expr_rhs_column
635 .try_as_logical_expr()
636 .unwrap()
637 .to_string(),
638 "a <= Int64(10) OR a IS NULL"
639 );
640
641 let lteq_expr_rhs_column = PartitionExpr::new(
642 Operand::Value(Value::Int64(10)),
643 RestrictedOp::LtEq,
644 Operand::Column("a".to_string()),
645 );
646 assert_eq!(
647 lteq_expr_rhs_column
648 .try_as_logical_expr()
649 .unwrap()
650 .to_string(),
651 "a >= Int64(10) AND a IS NOT NULL"
652 );
653
654 let and_expr = PartitionExpr::new(
655 Operand::Expr(PartitionExpr::new(
656 Operand::Column("a".to_string()),
657 RestrictedOp::LtEq,
658 Operand::Value(Value::Int64(10)),
659 )),
660 RestrictedOp::And,
661 Operand::Expr(PartitionExpr::new(
662 Operand::Column("b".to_string()),
663 RestrictedOp::Gt,
664 Operand::Value(Value::Int64(5)),
665 )),
666 );
667 assert_eq!(
668 and_expr.try_as_logical_expr().unwrap().to_string(),
669 "(a <= Int64(10) OR a IS NULL) AND b > Int64(5) AND b IS NOT NULL"
670 );
671
672 let and_expr = PartitionExpr::new(
673 Operand::Expr(PartitionExpr::new(
674 Operand::Column("a".to_string()),
675 RestrictedOp::LtEq,
676 Operand::Value(Value::Int64(10)),
677 )),
678 RestrictedOp::And,
679 Operand::Expr(PartitionExpr::new(
680 Operand::Column("a".to_string()),
681 RestrictedOp::Gt,
682 Operand::Value(Value::Int64(5)),
683 )),
684 );
685 assert_eq!(
686 and_expr.try_as_logical_expr().unwrap().to_string(),
687 "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
688 );
689
690 let and_expr_strict_lower = PartitionExpr::new(
691 Operand::Expr(PartitionExpr::new(
692 Operand::Column("a".to_string()),
693 RestrictedOp::Lt,
694 Operand::Value(Value::Int64(10)),
695 )),
696 RestrictedOp::And,
697 Operand::Expr(PartitionExpr::new(
698 Operand::Column("a".to_string()),
699 RestrictedOp::GtEq,
700 Operand::Value(Value::Int64(5)),
701 )),
702 );
703 assert_eq!(
704 and_expr_strict_lower
705 .try_as_logical_expr()
706 .unwrap()
707 .to_string(),
708 "(a < Int64(10) OR a IS NULL) AND a >= Int64(5) AND a IS NOT NULL"
709 );
710
711 let and_expr_rhs_column = PartitionExpr::new(
712 Operand::Expr(PartitionExpr::new(
713 Operand::Value(Value::Int64(10)),
714 RestrictedOp::GtEq,
715 Operand::Column("a".to_string()),
716 )),
717 RestrictedOp::And,
718 Operand::Expr(PartitionExpr::new(
719 Operand::Value(Value::Int64(5)),
720 RestrictedOp::Lt,
721 Operand::Column("a".to_string()),
722 )),
723 );
724 assert_eq!(
725 and_expr_rhs_column
726 .try_as_logical_expr()
727 .unwrap()
728 .to_string(),
729 "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
730 );
731
732 let or_expr_same_column = PartitionExpr::new(
733 Operand::Expr(PartitionExpr::new(
734 Operand::Column("a".to_string()),
735 RestrictedOp::LtEq,
736 Operand::Value(Value::Int64(10)),
737 )),
738 RestrictedOp::Or,
739 Operand::Expr(PartitionExpr::new(
740 Operand::Column("a".to_string()),
741 RestrictedOp::Gt,
742 Operand::Value(Value::Int64(5)),
743 )),
744 );
745 assert_eq!(
746 or_expr_same_column
747 .try_as_logical_expr()
748 .unwrap()
749 .to_string(),
750 "a <= Int64(10) OR a IS NULL OR a > Int64(5) AND a IS NOT NULL"
751 );
752 }
753
754 #[test]
755 fn test_try_as_logical_expr_rhs_column_without_canonicalize() {
756 let gt_expr_rhs_column = PartitionExpr {
757 lhs: Box::new(Operand::Value(Value::Int64(10))),
758 op: RestrictedOp::Gt,
759 rhs: Box::new(Operand::Column("a".to_string())),
760 };
761 assert_eq!(
762 gt_expr_rhs_column
763 .try_as_logical_expr()
764 .unwrap()
765 .to_string(),
766 "Int64(10) > a OR a IS NULL"
767 );
768
769 let gteq_expr_rhs_column = PartitionExpr {
770 lhs: Box::new(Operand::Value(Value::Int64(10))),
771 op: RestrictedOp::GtEq,
772 rhs: Box::new(Operand::Column("a".to_string())),
773 };
774 assert_eq!(
775 gteq_expr_rhs_column
776 .try_as_logical_expr()
777 .unwrap()
778 .to_string(),
779 "Int64(10) >= a OR a IS NULL"
780 );
781 }
782
783 #[test]
784 fn test_serde_partition_expr() {
785 let expr = PartitionExpr::new(
786 Operand::Column("a".to_string()),
787 RestrictedOp::Eq,
788 Operand::Value(Value::UInt32(10)),
789 );
790 let json = expr.as_json_str().unwrap();
791 assert_eq!(
792 json,
793 "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
794 );
795
796 let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
797 let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
798 let expected = PartitionExpr::new(
799 Operand::Column("a".to_string()),
800 RestrictedOp::GtEq,
801 Operand::Value(Value::UInt32(10)),
802 );
803 assert_eq!(expr2, expected);
804
805 let json = "";
807 let expr3 = PartitionExpr::from_json_str(json).unwrap();
808 assert!(expr3.is_none());
809
810 let json = r#""MaxValue""#;
812 let expr4 = PartitionExpr::from_json_str(json).unwrap();
813 assert!(expr4.is_none());
814
815 let json = r#"{"Value":{"UInt32":10}}"#;
816 let expr5 = PartitionExpr::from_json_str(json).unwrap();
817 assert!(expr5.is_none());
818 }
819
820 #[test]
821 fn test_collect_column_names() {
822 let expr = col("a").eq(Value::Int64(1));
824 let mut columns = HashSet::new();
825 expr.collect_column_names(&mut columns);
826 assert_eq!(columns.len(), 1);
827 assert!(columns.contains("a"));
828
829 let expr = col("a")
831 .gt_eq(Value::Int64(0))
832 .and(col("a").lt(Value::Int64(10)));
833 let mut columns = HashSet::new();
834 expr.collect_column_names(&mut columns);
835 assert_eq!(columns.len(), 1);
836 assert!(columns.contains("a"));
837
838 let expr = col("a")
840 .gt_eq(Value::Int64(0))
841 .and(col("b").lt(Value::Int64(10)));
842 let mut columns = HashSet::new();
843 expr.collect_column_names(&mut columns);
844 assert_eq!(columns.len(), 2);
845 assert!(columns.contains("a"));
846 assert!(columns.contains("b"));
847
848 let expr = col("a")
850 .gt_eq(Value::Int64(0))
851 .and(col("b").lt(Value::Int64(10)))
852 .and(col("c").eq(Value::Int64(5)));
853 let mut columns = HashSet::new();
854 expr.collect_column_names(&mut columns);
855 assert_eq!(columns.len(), 3);
856 assert!(columns.contains("a"));
857 assert!(columns.contains("b"));
858 assert!(columns.contains("c"));
859 }
860}