1use std::collections::HashSet;
20
21use arrow::datatypes::DataType;
22use common_telemetry::debug;
23use datafusion_common::Result as DfResult;
24use datafusion_expr::{Expr, LogicalPlan, Operator};
25use datatypes::value::Value;
26use partition::expr::{Operand, PartitionExpr, RestrictedOp};
27
28pub struct PredicateExtractor;
30
31impl PredicateExtractor {
32 pub fn extract_partition_expressions(
35 plan: &LogicalPlan,
36 partition_columns: &[String],
37 ) -> DfResult<Vec<PartitionExpr>> {
38 let mut filter_exprs = Vec::new();
40 Self::collect_filter_expressions(plan, &mut filter_exprs)?;
41
42 if filter_exprs.is_empty() {
43 return Ok(Vec::new());
44 }
45
46 let mut partition_exprs = Vec::with_capacity(filter_exprs.len());
48 let partition_set: HashSet<String> = partition_columns.iter().cloned().collect();
49
50 for filter_expr in filter_exprs {
51 match DataFusionExprConverter::convert(&filter_expr) {
52 Ok(partition_expr) => {
53 match ExpressionChecker::check_expression_for_pruning(
55 &partition_expr,
56 &partition_set,
57 ) {
58 ExpressionCheckResult::UseAsIs(expr) => {
59 partition_exprs.push(expr);
60 }
61 ExpressionCheckResult::UsePartial(exprs) => {
62 partition_exprs.extend(exprs);
63 }
64 ExpressionCheckResult::Drop => {
65 debug!(
66 "Dropping mixed expression for correctness: {}",
67 partition_expr
68 );
69 }
70 }
71 }
72 Err(err) => {
73 debug!(
74 "Failed to convert filter expression to PartitionExpr: {}, skipping",
75 err
76 );
77 continue;
78 }
79 }
80 }
81
82 debug!(
83 "Extracted {} partition expressions from logical plan for partition columns: {:?}",
84 partition_exprs.len(),
85 partition_columns
86 );
87
88 Ok(partition_exprs)
89 }
90
91 fn collect_filter_expressions(plan: &LogicalPlan, expressions: &mut Vec<Expr>) -> DfResult<()> {
93 if let LogicalPlan::Filter(filter) = plan {
94 expressions.push(filter.predicate.clone());
95 }
96
97 for child in plan.inputs() {
99 Self::collect_filter_expressions(child, expressions)?;
100 }
101
102 if plan.inputs().len() > 1 {
104 expressions.clear();
105 }
106
107 Ok(())
108 }
109}
110
111#[derive(Debug, Clone)]
113enum ExpressionCheckResult {
114 UseAsIs(PartitionExpr),
116 UsePartial(Vec<PartitionExpr>),
118 Drop,
120}
121
122struct ExpressionChecker;
124
125impl ExpressionChecker {
126 fn check_expression_for_pruning(
128 expr: &PartitionExpr,
129 partition_columns: &HashSet<String>,
130 ) -> ExpressionCheckResult {
131 match expr.op() {
132 RestrictedOp::And => {
133 let mut partition_constraints = Vec::new();
136 Self::extract_and_constraints(expr, partition_columns, &mut partition_constraints);
137
138 if partition_constraints.is_empty() {
139 ExpressionCheckResult::Drop
140 } else if Self::expr_only_involves_partition_columns(expr, partition_columns) {
141 ExpressionCheckResult::UseAsIs(expr.clone())
143 } else {
144 ExpressionCheckResult::UsePartial(partition_constraints)
146 }
147 }
148 RestrictedOp::Or => {
149 if Self::expr_only_involves_partition_columns(expr, partition_columns) {
152 ExpressionCheckResult::UseAsIs(expr.clone())
153 } else {
154 ExpressionCheckResult::Drop
156 }
157 }
158 _ => {
159 if Self::expr_only_involves_partition_columns(expr, partition_columns) {
161 ExpressionCheckResult::UseAsIs(expr.clone())
162 } else {
163 ExpressionCheckResult::Drop
164 }
165 }
166 }
167 }
168
169 fn extract_and_constraints(
171 expr: &PartitionExpr,
172 partition_columns: &HashSet<String>,
173 result: &mut Vec<PartitionExpr>,
174 ) {
175 if let RestrictedOp::And = expr.op() {
176 Self::extract_constraints_from_operand(expr.lhs(), partition_columns, result);
178 Self::extract_constraints_from_operand(expr.rhs(), partition_columns, result);
179 } else {
180 if Self::expr_only_involves_partition_columns(expr, partition_columns) {
182 result.push(expr.clone());
183 }
184 }
185 }
186
187 fn extract_constraints_from_operand(
189 operand: &Operand,
190 partition_columns: &HashSet<String>,
191 result: &mut Vec<PartitionExpr>,
192 ) {
193 match operand {
194 Operand::Column(_) | Operand::Value(_) => {
195 }
197 Operand::Expr(expr) => {
198 Self::extract_and_constraints(expr, partition_columns, result);
199 }
200 }
201 }
202
203 fn expr_only_involves_partition_columns(
205 expr: &PartitionExpr,
206 partition_columns: &HashSet<String>,
207 ) -> bool {
208 Self::operand_only_involves_partition_columns(expr.lhs(), partition_columns)
209 && Self::operand_only_involves_partition_columns(expr.rhs(), partition_columns)
210 }
211
212 fn operand_only_involves_partition_columns(
214 operand: &Operand,
215 partition_columns: &HashSet<String>,
216 ) -> bool {
217 match operand {
218 Operand::Column(col) => partition_columns.contains(col),
219 Operand::Value(_) => true, Operand::Expr(expr) => {
221 Self::expr_only_involves_partition_columns(expr, partition_columns)
222 }
223 }
224 }
225}
226
227struct DataFusionExprConverter;
229
230impl DataFusionExprConverter {
231 pub fn convert(expr: &Expr) -> DfResult<PartitionExpr> {
233 match expr {
234 Expr::BinaryExpr(binary_expr) => {
235 let lhs = Self::convert_to_operand(&binary_expr.left)?;
236 let rhs = Self::convert_to_operand(&binary_expr.right)?;
237 let op = Self::convert_operator(&binary_expr.op)?;
238
239 Ok(PartitionExpr::new(lhs, op, rhs))
240 }
241 Expr::InList(inlist_expr) => {
242 let column_operand = Self::convert_to_operand(&inlist_expr.expr)?;
245
246 if inlist_expr.list.is_empty() {
247 return Err(datafusion_common::DataFusionError::Plan(
248 "InList with empty list is not supported".to_string(),
249 ));
250 }
251
252 let op = if inlist_expr.negated {
253 RestrictedOp::NotEq
254 } else {
255 RestrictedOp::Eq
256 };
257
258 let connector_op = if inlist_expr.negated {
259 RestrictedOp::And } else {
261 RestrictedOp::Or };
263
264 let mut expressions = Vec::new();
266 for value_expr in &inlist_expr.list {
267 let value_operand = Self::convert_to_operand(value_expr)?;
268 expressions.push(PartitionExpr::new(
269 column_operand.clone(),
270 op.clone(),
271 value_operand,
272 ));
273 }
274
275 let mut expr_iter = expressions.into_iter();
277 let mut result = expr_iter.next().unwrap();
278 for expr in expr_iter {
279 result = PartitionExpr::new(
280 Operand::Expr(result),
281 connector_op.clone(),
282 Operand::Expr(expr),
283 );
284 }
285
286 Ok(result)
287 }
288 Expr::Between(between_expr) => {
289 let column_operand = Self::convert_to_operand(&between_expr.expr)?;
292 let low_operand = Self::convert_to_operand(&between_expr.low)?;
293 let high_operand = Self::convert_to_operand(&between_expr.high)?;
294
295 if between_expr.negated {
296 let left_expr =
298 PartitionExpr::new(column_operand.clone(), RestrictedOp::Lt, low_operand);
299 let right_expr =
300 PartitionExpr::new(column_operand, RestrictedOp::Gt, high_operand);
301 Ok(PartitionExpr::new(
302 Operand::Expr(left_expr),
303 RestrictedOp::Or,
304 Operand::Expr(right_expr),
305 ))
306 } else {
307 let left_expr =
309 PartitionExpr::new(column_operand.clone(), RestrictedOp::GtEq, low_operand);
310 let right_expr =
311 PartitionExpr::new(column_operand, RestrictedOp::LtEq, high_operand);
312 Ok(PartitionExpr::new(
313 Operand::Expr(left_expr),
314 RestrictedOp::And,
315 Operand::Expr(right_expr),
316 ))
317 }
318 }
319 Expr::IsNull(expr) => {
320 let column_operand = Self::convert_to_operand(expr)?;
322 Ok(PartitionExpr::new(
323 column_operand,
324 RestrictedOp::Eq,
325 Operand::Value(Value::Null),
326 ))
327 }
328 Expr::IsNotNull(expr) => {
329 let column_operand = Self::convert_to_operand(expr)?;
331 Ok(PartitionExpr::new(
332 column_operand,
333 RestrictedOp::NotEq,
334 Operand::Value(Value::Null),
335 ))
336 }
337 Expr::Not(expr) => {
338 match expr.as_ref() {
340 Expr::BinaryExpr(binary_expr) => {
341 let lhs = Self::convert_to_operand(&binary_expr.left)?;
342 let rhs = Self::convert_to_operand(&binary_expr.right)?;
343 let inverted_op = Self::invert_operator(&binary_expr.op)?;
344
345 Ok(PartitionExpr::new(lhs, inverted_op, rhs))
346 }
347 Expr::IsNull(inner_expr) => {
348 let column_operand = Self::convert_to_operand(inner_expr)?;
350 Ok(PartitionExpr::new(
351 column_operand,
352 RestrictedOp::NotEq,
353 Operand::Value(Value::Null),
354 ))
355 }
356 Expr::IsNotNull(inner_expr) => {
357 let column_operand = Self::convert_to_operand(inner_expr)?;
359 Ok(PartitionExpr::new(
360 column_operand,
361 RestrictedOp::Eq,
362 Operand::Value(Value::Null),
363 ))
364 }
365 _ => {
366 debug!(
367 "Unsupported NOT expression for partition pruning: {:?}",
368 expr
369 );
370 Err(datafusion_common::DataFusionError::Plan(format!(
371 "NOT expression with inner type {:?} not supported for partition pruning",
372 expr
373 )))
374 }
375 }
376 }
377 _ => Err(datafusion_common::DataFusionError::Plan(format!(
378 "Unsupported expression type for conversion: {:?}",
379 expr
380 ))),
381 }
382 }
383
384 fn convert_to_operand(expr: &Expr) -> DfResult<Operand> {
386 match expr {
387 Expr::Column(col) => {
388 let column_name = if let Some(relation) = &col.relation {
391 debug!(
392 "Using qualified column reference: {}.{}",
393 relation, col.name
394 );
395 col.name.clone()
396 } else {
397 col.name.clone()
398 };
399 Ok(Operand::Column(column_name))
400 }
401 Expr::Literal(scalar_value, _) => {
402 let value = Value::try_from(scalar_value.clone()).unwrap();
403 Ok(Operand::Value(value))
404 }
405 Expr::Alias(alias_expr) => {
406 Self::convert_to_operand(&alias_expr.expr)
408 }
409 Expr::Cast(cast_expr) => {
410 if Self::is_safe_cast_for_partition_pruning(&cast_expr.data_type) {
413 Self::convert_to_operand(&cast_expr.expr)
414 } else {
415 debug!(
416 "Skipping unsafe cast for partition pruning: {:?}",
417 cast_expr.data_type
418 );
419 Err(datafusion_common::DataFusionError::Plan(format!(
420 "Cast to {:?} not supported for partition pruning",
421 cast_expr.data_type
422 )))
423 }
424 }
425 other => {
426 let partition_expr = Self::convert(other)?;
427 Ok(Operand::Expr(partition_expr))
428 }
429 }
430 }
431
432 fn convert_operator(op: &Operator) -> DfResult<RestrictedOp> {
434 match op {
435 Operator::Eq => Ok(RestrictedOp::Eq),
436 Operator::NotEq => Ok(RestrictedOp::NotEq),
437 Operator::Lt => Ok(RestrictedOp::Lt),
438 Operator::LtEq => Ok(RestrictedOp::LtEq),
439 Operator::Gt => Ok(RestrictedOp::Gt),
440 Operator::GtEq => Ok(RestrictedOp::GtEq),
441 Operator::And => Ok(RestrictedOp::And),
442 Operator::Or => Ok(RestrictedOp::Or),
443 _ => Err(datafusion_common::DataFusionError::Plan(format!(
444 "Unsupported operator: {:?}",
445 op
446 ))),
447 }
448 }
449
450 fn invert_operator(op: &Operator) -> DfResult<RestrictedOp> {
452 let Some(negated) = op.negate() else {
453 return Err(datafusion_common::DataFusionError::Plan(format!(
454 "Cannot invert operator: {:?}",
455 op
456 )));
457 };
458 Self::convert_operator(&negated)
459 }
460
461 fn is_safe_cast_for_partition_pruning(data_type: &DataType) -> bool {
464 match data_type {
465 DataType::Int8 => true,
467 DataType::Int16 => true,
468 DataType::Int32 => true,
469 DataType::Int64 => true,
470 DataType::UInt8 => true,
471 DataType::UInt16 => true,
472 DataType::UInt32 => true,
473 DataType::UInt64 => true,
474
475 DataType::Float32 => true,
477 DataType::Float64 => true,
478
479 DataType::Utf8 => true,
481 DataType::LargeUtf8 => true,
482
483 DataType::Date32 => true,
485 DataType::Date64 => true,
486 DataType::Timestamp(_, _) => true,
487
488 DataType::Boolean => true,
490
491 _ => false,
493 }
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use std::sync::Arc;
500
501 use datafusion::arrow::datatypes::{DataType, Field, Schema};
502 use datafusion::common::Column;
503 use datafusion::datasource::DefaultTableSource;
504 use datafusion_expr::{col, lit, LogicalPlanBuilder};
505 use datatypes::value::Value;
506 use partition::expr::{Operand, PartitionExpr, RestrictedOp};
507
508 use super::*;
509
510 fn create_test_table_scan() -> LogicalPlan {
511 let schema = Arc::new(Schema::new(vec![
512 Field::new(
513 "timestamp",
514 DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
515 false,
516 ),
517 Field::new("user_id", DataType::Int64, false),
518 Field::new("value", DataType::Int64, false),
519 ]));
520
521 let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
522 let table_source = Arc::new(DefaultTableSource::new(Arc::new(empty_table)));
523
524 LogicalPlanBuilder::scan("test", table_source, None)
525 .unwrap()
526 .build()
527 .unwrap()
528 }
529
530 struct FilterTestCase {
531 name: &'static str,
532 filter_expr: Expr,
533 expected_partition_exprs: Vec<PartitionExpr>,
534 partition_columns: Vec<&'static str>,
535 }
536
537 impl FilterTestCase {
538 fn new(
539 name: &'static str,
540 filter_expr: Expr,
541 expected_partition_exprs: Vec<PartitionExpr>,
542 partition_columns: Vec<&'static str>,
543 ) -> Self {
544 Self {
545 name,
546 filter_expr,
547 expected_partition_exprs,
548 partition_columns,
549 }
550 }
551 }
552
553 fn check_partition_expressions(cases: Vec<FilterTestCase>) {
555 for case in cases {
556 let table_scan = create_test_table_scan();
557 let filter = case.filter_expr.clone();
558
559 let plan = LogicalPlanBuilder::from(table_scan)
560 .filter(filter)
561 .unwrap()
562 .build()
563 .unwrap();
564
565 let partition_columns: Vec<String> = case
566 .partition_columns
567 .iter()
568 .map(|s| s.to_string())
569 .collect();
570 let partition_exprs =
571 PredicateExtractor::extract_partition_expressions(&plan, &partition_columns)
572 .unwrap();
573 let expected = case.expected_partition_exprs.clone();
574 assert_eq!(
575 partition_exprs, expected,
576 "Test case '{}': expected partition expressions {:?}, got {:?}",
577 case.name, expected, partition_exprs
578 );
579 }
580 }
581
582 #[test]
583 fn test_basic_constraints_extraction() {
584 let cases = vec![
585 FilterTestCase::new(
586 "non_partition_column_ignored",
587 col("value").gt_eq(lit(100i64)),
588 vec![],
589 vec!["user_id"],
590 ),
591 FilterTestCase::new(
592 "simple_constraint",
593 col("user_id").gt_eq(lit(100i64)),
594 vec![PartitionExpr::new(
595 Operand::Column("user_id".to_string()),
596 RestrictedOp::GtEq,
597 Operand::Value(Value::Int64(100)),
598 )],
599 vec!["user_id"],
600 ),
601 FilterTestCase::new(
602 "or_expression",
603 col("user_id")
604 .eq(lit(100i64))
605 .or(col("user_id").eq(lit(200i64))),
606 vec![PartitionExpr::new(
607 Operand::Expr(PartitionExpr::new(
608 Operand::Column("user_id".to_string()),
609 RestrictedOp::Eq,
610 Operand::Value(Value::Int64(100)),
611 )),
612 RestrictedOp::Or,
613 Operand::Expr(PartitionExpr::new(
614 Operand::Column("user_id".to_string()),
615 RestrictedOp::Eq,
616 Operand::Value(Value::Int64(200)),
617 )),
618 )],
619 vec!["user_id"],
620 ),
621 FilterTestCase::new(
622 "complex_and_or",
623 col("user_id")
624 .gt_eq(lit(100i64))
625 .and(col("user_id").lt(lit(200i64)))
626 .or(col("user_id")
627 .gt_eq(lit(300i64))
628 .and(col("user_id").lt(lit(400i64)))),
629 vec![PartitionExpr::new(
630 Operand::Expr(PartitionExpr::new(
631 Operand::Expr(PartitionExpr::new(
632 Operand::Column("user_id".to_string()),
633 RestrictedOp::GtEq,
634 Operand::Value(Value::Int64(100)),
635 )),
636 RestrictedOp::And,
637 Operand::Expr(PartitionExpr::new(
638 Operand::Column("user_id".to_string()),
639 RestrictedOp::Lt,
640 Operand::Value(Value::Int64(200)),
641 )),
642 )),
643 RestrictedOp::Or,
644 Operand::Expr(PartitionExpr::new(
645 Operand::Expr(PartitionExpr::new(
646 Operand::Column("user_id".to_string()),
647 RestrictedOp::GtEq,
648 Operand::Value(Value::Int64(300)),
649 )),
650 RestrictedOp::And,
651 Operand::Expr(PartitionExpr::new(
652 Operand::Column("user_id".to_string()),
653 RestrictedOp::Lt,
654 Operand::Value(Value::Int64(400)),
655 )),
656 )),
657 )],
658 vec!["user_id"],
659 ),
660 ];
661 check_partition_expressions(cases);
662 }
663
664 #[test]
665 fn test_alias_expressions() {
666 let cases = vec![
667 FilterTestCase::new(
668 "simple_alias",
669 col("user_id").alias("uid").eq(lit(100i64)),
670 vec![PartitionExpr::new(
671 Operand::Column("user_id".to_string()),
672 RestrictedOp::Eq,
673 Operand::Value(Value::Int64(100)),
674 )],
675 vec!["user_id"],
676 ),
677 FilterTestCase::new(
678 "nested_alias",
679 col("user_id").alias("uid").alias("u").gt_eq(lit(50i64)),
680 vec![PartitionExpr::new(
681 Operand::Column("user_id".to_string()),
682 RestrictedOp::GtEq,
683 Operand::Value(Value::Int64(50)),
684 )],
685 vec!["user_id"],
686 ),
687 FilterTestCase::new(
688 "complex_alias_with_and_or",
689 col("user_id")
690 .alias("uid")
691 .gt_eq(lit(100i64))
692 .and(col("user_id").alias("u").lt(lit(200i64)))
693 .or(col("user_id").alias("id").eq(lit(300i64))),
694 vec![PartitionExpr::new(
695 Operand::Expr(PartitionExpr::new(
696 Operand::Expr(PartitionExpr::new(
697 Operand::Column("user_id".to_string()),
698 RestrictedOp::GtEq,
699 Operand::Value(Value::Int64(100)),
700 )),
701 RestrictedOp::And,
702 Operand::Expr(PartitionExpr::new(
703 Operand::Column("user_id".to_string()),
704 RestrictedOp::Lt,
705 Operand::Value(Value::Int64(200)),
706 )),
707 )),
708 RestrictedOp::Or,
709 Operand::Expr(PartitionExpr::new(
710 Operand::Column("user_id".to_string()),
711 RestrictedOp::Eq,
712 Operand::Value(Value::Int64(300)),
713 )),
714 )],
715 vec!["user_id"],
716 ),
717 ];
718 check_partition_expressions(cases);
719 }
720
721 #[test]
722 fn test_inlist_expressions() {
723 let cases = vec![
724 FilterTestCase::new(
725 "simple_inlist",
726 col("user_id").in_list(vec![lit(100i64), lit(200i64), lit(300i64)], false),
727 vec![PartitionExpr::new(
728 Operand::Expr(PartitionExpr::new(
729 Operand::Expr(PartitionExpr::new(
730 Operand::Column("user_id".to_string()),
731 RestrictedOp::Eq,
732 Operand::Value(Value::Int64(100)),
733 )),
734 RestrictedOp::Or,
735 Operand::Expr(PartitionExpr::new(
736 Operand::Column("user_id".to_string()),
737 RestrictedOp::Eq,
738 Operand::Value(Value::Int64(200)),
739 )),
740 )),
741 RestrictedOp::Or,
742 Operand::Expr(PartitionExpr::new(
743 Operand::Column("user_id".to_string()),
744 RestrictedOp::Eq,
745 Operand::Value(Value::Int64(300)),
746 )),
747 )],
748 vec!["user_id"],
749 ),
750 FilterTestCase::new(
751 "negated_inlist",
752 col("user_id").in_list(vec![lit(100i64), lit(200i64)], true),
753 vec![PartitionExpr::new(
754 Operand::Expr(PartitionExpr::new(
755 Operand::Column("user_id".to_string()),
756 RestrictedOp::NotEq,
757 Operand::Value(Value::Int64(100)),
758 )),
759 RestrictedOp::And,
760 Operand::Expr(PartitionExpr::new(
761 Operand::Column("user_id".to_string()),
762 RestrictedOp::NotEq,
763 Operand::Value(Value::Int64(200)),
764 )),
765 )],
766 vec!["user_id"],
767 ),
768 FilterTestCase::new(
769 "inlist_with_alias",
770 col("user_id")
771 .alias("uid")
772 .in_list(vec![lit(100i64), lit(200i64)], false),
773 vec![PartitionExpr::new(
774 Operand::Expr(PartitionExpr::new(
775 Operand::Column("user_id".to_string()),
776 RestrictedOp::Eq,
777 Operand::Value(Value::Int64(100)),
778 )),
779 RestrictedOp::Or,
780 Operand::Expr(PartitionExpr::new(
781 Operand::Column("user_id".to_string()),
782 RestrictedOp::Eq,
783 Operand::Value(Value::Int64(200)),
784 )),
785 )],
786 vec!["user_id"],
787 ),
788 ];
789 check_partition_expressions(cases);
790 }
791
792 #[test]
793 fn test_between_expressions() {
794 let cases = vec![
795 FilterTestCase::new(
796 "simple_between",
797 col("user_id").between(lit(100i64), lit(200i64)),
798 vec![PartitionExpr::new(
799 Operand::Expr(PartitionExpr::new(
800 Operand::Column("user_id".to_string()),
801 RestrictedOp::GtEq,
802 Operand::Value(Value::Int64(100)),
803 )),
804 RestrictedOp::And,
805 Operand::Expr(PartitionExpr::new(
806 Operand::Column("user_id".to_string()),
807 RestrictedOp::LtEq,
808 Operand::Value(Value::Int64(200)),
809 )),
810 )],
811 vec!["user_id"],
812 ),
813 FilterTestCase::new(
814 "negated_between",
815 Expr::Between(datafusion_expr::Between {
816 expr: Box::new(col("user_id")),
817 negated: true,
818 low: Box::new(lit(100i64)),
819 high: Box::new(lit(200i64)),
820 }),
821 vec![PartitionExpr::new(
822 Operand::Expr(PartitionExpr::new(
823 Operand::Column("user_id".to_string()),
824 RestrictedOp::Lt,
825 Operand::Value(Value::Int64(100)),
826 )),
827 RestrictedOp::Or,
828 Operand::Expr(PartitionExpr::new(
829 Operand::Column("user_id".to_string()),
830 RestrictedOp::Gt,
831 Operand::Value(Value::Int64(200)),
832 )),
833 )],
834 vec!["user_id"],
835 ),
836 FilterTestCase::new(
837 "between_with_alias",
838 col("user_id")
839 .alias("uid")
840 .between(lit(100i64), lit(200i64)),
841 vec![PartitionExpr::new(
842 Operand::Expr(PartitionExpr::new(
843 Operand::Column("user_id".to_string()),
844 RestrictedOp::GtEq,
845 Operand::Value(Value::Int64(100)),
846 )),
847 RestrictedOp::And,
848 Operand::Expr(PartitionExpr::new(
849 Operand::Column("user_id".to_string()),
850 RestrictedOp::LtEq,
851 Operand::Value(Value::Int64(200)),
852 )),
853 )],
854 vec!["user_id"],
855 ),
856 ];
857 check_partition_expressions(cases);
858 }
859
860 #[test]
861 fn test_null_expressions() {
862 let cases = vec![
863 FilterTestCase::new(
864 "is_null",
865 col("user_id").is_null(),
866 vec![PartitionExpr::new(
867 Operand::Column("user_id".to_string()),
868 RestrictedOp::Eq,
869 Operand::Value(Value::Null),
870 )],
871 vec!["user_id"],
872 ),
873 FilterTestCase::new(
874 "is_not_null",
875 col("user_id").is_not_null(),
876 vec![PartitionExpr::new(
877 Operand::Column("user_id".to_string()),
878 RestrictedOp::NotEq,
879 Operand::Value(Value::Null),
880 )],
881 vec!["user_id"],
882 ),
883 FilterTestCase::new(
884 "null_with_alias",
885 col("user_id").alias("uid").is_null(),
886 vec![PartitionExpr::new(
887 Operand::Column("user_id".to_string()),
888 RestrictedOp::Eq,
889 Operand::Value(Value::Null),
890 )],
891 vec!["user_id"],
892 ),
893 ];
894 check_partition_expressions(cases);
895 }
896
897 #[test]
898 fn test_cast_expressions() {
899 let cases = vec![
900 FilterTestCase::new(
901 "safe_cast",
902 Expr::Cast(datafusion_expr::Cast {
903 expr: Box::new(col("user_id")),
904 data_type: DataType::Int64,
905 })
906 .eq(lit(100i64)),
907 vec![PartitionExpr::new(
908 Operand::Column("user_id".to_string()),
909 RestrictedOp::Eq,
910 Operand::Value(Value::Int64(100)),
911 )],
912 vec!["user_id"],
913 ),
914 FilterTestCase::new(
915 "cast_with_alias",
916 Expr::Cast(datafusion_expr::Cast {
917 expr: Box::new(col("user_id").alias("uid")),
918 data_type: DataType::Int64,
919 })
920 .eq(lit(100i64)),
921 vec![PartitionExpr::new(
922 Operand::Column("user_id".to_string()),
923 RestrictedOp::Eq,
924 Operand::Value(Value::Int64(100)),
925 )],
926 vec!["user_id"],
927 ),
928 FilterTestCase::new(
929 "unsafe_cast",
930 Expr::Cast(datafusion_expr::Cast {
931 expr: Box::new(col("user_id")),
932 data_type: DataType::List(std::sync::Arc::new(
933 datafusion::arrow::datatypes::Field::new("item", DataType::Int32, true),
934 )),
935 })
936 .eq(lit(100i64)),
937 vec![],
938 vec!["user_id"],
939 ),
940 ];
941 check_partition_expressions(cases);
942 }
943
944 #[test]
945 fn test_not_expressions() {
946 let cases = vec![
947 FilterTestCase::new(
948 "not_equality",
949 Expr::Not(Box::new(col("user_id").eq(lit(100i64)))),
950 vec![PartitionExpr::new(
951 Operand::Column("user_id".to_string()),
952 RestrictedOp::NotEq,
953 Operand::Value(Value::Int64(100)),
954 )],
955 vec!["user_id"],
956 ),
957 FilterTestCase::new(
958 "not_comparison",
959 Expr::Not(Box::new(col("user_id").lt(lit(100i64)))),
960 vec![PartitionExpr::new(
961 Operand::Column("user_id".to_string()),
962 RestrictedOp::GtEq,
963 Operand::Value(Value::Int64(100)),
964 )],
965 vec!["user_id"],
966 ),
967 FilterTestCase::new(
968 "not_is_null",
969 Expr::Not(Box::new(col("user_id").is_null())),
970 vec![PartitionExpr::new(
971 Operand::Column("user_id".to_string()),
972 RestrictedOp::NotEq,
973 Operand::Value(Value::Null),
974 )],
975 vec!["user_id"],
976 ),
977 FilterTestCase::new(
978 "not_with_alias",
979 Expr::Not(Box::new(col("user_id").alias("uid").eq(lit(100i64)))),
980 vec![PartitionExpr::new(
981 Operand::Column("user_id".to_string()),
982 RestrictedOp::NotEq,
983 Operand::Value(Value::Int64(100)),
984 )],
985 vec!["user_id"],
986 ),
987 ];
988 check_partition_expressions(cases);
989 }
990
991 #[test]
992 fn test_edge_cases() {
993 let cases = vec![
994 FilterTestCase::new(
995 "qualified_column_name",
996 {
997 let qualified_col = Expr::Column(Column::new(Some("test"), "user_id"));
998 qualified_col.eq(lit(100i64))
999 },
1000 vec![PartitionExpr::new(
1001 Operand::Column("user_id".to_string()),
1002 RestrictedOp::Eq,
1003 Operand::Value(Value::Int64(100)),
1004 )],
1005 vec!["user_id"],
1006 ),
1007 FilterTestCase::new(
1008 "comprehensive_combinations",
1009 {
1010 let in_expr = col("user_id")
1011 .alias("uid")
1012 .in_list(vec![lit(100i64), lit(200i64)], false);
1013 let cast_expr = Expr::Cast(datafusion_expr::Cast {
1014 expr: Box::new(col("user_id")),
1015 data_type: DataType::Int64,
1016 });
1017 let between_expr = cast_expr.between(lit(300i64), lit(400i64));
1018 in_expr.or(between_expr)
1019 },
1020 vec![PartitionExpr::new(
1021 Operand::Expr(PartitionExpr::new(
1022 Operand::Expr(PartitionExpr::new(
1023 Operand::Column("user_id".to_string()),
1024 RestrictedOp::Eq,
1025 Operand::Value(Value::Int64(100)),
1026 )),
1027 RestrictedOp::Or,
1028 Operand::Expr(PartitionExpr::new(
1029 Operand::Column("user_id".to_string()),
1030 RestrictedOp::Eq,
1031 Operand::Value(Value::Int64(200)),
1032 )),
1033 )),
1034 RestrictedOp::Or,
1035 Operand::Expr(PartitionExpr::new(
1036 Operand::Expr(PartitionExpr::new(
1037 Operand::Column("user_id".to_string()),
1038 RestrictedOp::GtEq,
1039 Operand::Value(Value::Int64(300)),
1040 )),
1041 RestrictedOp::And,
1042 Operand::Expr(PartitionExpr::new(
1043 Operand::Column("user_id".to_string()),
1044 RestrictedOp::LtEq,
1045 Operand::Value(Value::Int64(400)),
1046 )),
1047 )),
1048 )],
1049 vec!["user_id"],
1050 ),
1051 ];
1052 check_partition_expressions(cases);
1053 }
1054
1055 #[test]
1056 fn test_mixed_partition_non_partition_expressions() {
1057 let cases = vec![
1058 FilterTestCase::new(
1060 "mixed_and_expression",
1061 col("user_id")
1062 .eq(lit(100i64))
1063 .and(col("value").gt(lit(50i64))),
1064 vec![PartitionExpr::new(
1065 Operand::Column("user_id".to_string()),
1066 RestrictedOp::Eq,
1067 Operand::Value(Value::Int64(100)),
1068 )],
1069 vec!["user_id"],
1070 ),
1071 FilterTestCase::new(
1073 "mixed_or_expression",
1074 col("user_id")
1075 .between(lit(1i64), lit(10i64))
1076 .or(col("value").gt(lit(50i64))),
1077 vec![], vec!["user_id"],
1079 ),
1080 FilterTestCase::new(
1082 "complex_mixed_and",
1083 col("user_id")
1084 .gt_eq(lit(100i64))
1085 .and(col("value").eq(lit(200i64)))
1086 .and(col("timestamp").lt(lit(1000i64))),
1087 vec![
1088 PartitionExpr::new(
1089 Operand::Column("user_id".to_string()),
1090 RestrictedOp::GtEq,
1091 Operand::Value(Value::Int64(100)),
1092 ),
1093 PartitionExpr::new(
1094 Operand::Column("timestamp".to_string()),
1095 RestrictedOp::Lt,
1096 Operand::Value(Value::Int64(1000)),
1097 ),
1098 ],
1099 vec!["user_id", "timestamp"], ),
1101 FilterTestCase::new(
1103 "pure_partition_and",
1104 col("user_id")
1105 .gt_eq(lit(100i64))
1106 .and(col("timestamp").lt(lit(1000i64))),
1107 vec![PartitionExpr::new(
1108 Operand::Expr(PartitionExpr::new(
1109 Operand::Column("user_id".to_string()),
1110 RestrictedOp::GtEq,
1111 Operand::Value(Value::Int64(100)),
1112 )),
1113 RestrictedOp::And,
1114 Operand::Expr(PartitionExpr::new(
1115 Operand::Column("timestamp".to_string()),
1116 RestrictedOp::Lt,
1117 Operand::Value(Value::Int64(1000)),
1118 )),
1119 )],
1120 vec!["user_id", "timestamp"],
1121 ),
1122 FilterTestCase::new(
1124 "pure_partition_or",
1125 col("user_id")
1126 .eq(lit(100i64))
1127 .or(col("user_id").eq(lit(200i64))),
1128 vec![PartitionExpr::new(
1129 Operand::Expr(PartitionExpr::new(
1130 Operand::Column("user_id".to_string()),
1131 RestrictedOp::Eq,
1132 Operand::Value(Value::Int64(100)),
1133 )),
1134 RestrictedOp::Or,
1135 Operand::Expr(PartitionExpr::new(
1136 Operand::Column("user_id".to_string()),
1137 RestrictedOp::Eq,
1138 Operand::Value(Value::Int64(200)),
1139 )),
1140 )],
1141 vec!["user_id"],
1142 ),
1143 FilterTestCase::new(
1145 "pure_non_partition",
1146 col("value").gt_eq(lit(100i64)),
1147 vec![], vec!["user_id"],
1149 ),
1150 FilterTestCase::new(
1152 "nested_mixed_expression",
1153 (col("user_id")
1154 .eq(lit(100i64))
1155 .and(col("value").gt(lit(50i64))))
1156 .or(col("user_id").eq(lit(200i64))),
1157 vec![], vec!["user_id"],
1159 ),
1160 FilterTestCase::new(
1162 "and_with_nested_mixed_or",
1163 col("user_id")
1164 .gt_eq(lit(100i64))
1165 .and(col("value").eq(lit(1i64)).or(col("value").eq(lit(2i64)))),
1166 vec![PartitionExpr::new(
1167 Operand::Column("user_id".to_string()),
1168 RestrictedOp::GtEq,
1169 Operand::Value(Value::Int64(100)),
1170 )],
1171 vec!["user_id"],
1172 ),
1173 ];
1174 check_partition_expressions(cases);
1175 }
1176}