query/dist_plan/
predicate_extractor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Predicate extraction for partition pruning.
16//!
17//! [`PredicateExtractor`] extract a list of [`PartitionExpr`] from given [`LogicalPlan`].
18
19use std::collections::HashSet;
20
21use arrow::datatypes::DataType;
22use common_telemetry::debug;
23use datafusion_common::Result as DfResult;
24use datafusion_expr::{Expr, LogicalPlan, Operator};
25use datatypes::value::Value;
26use partition::expr::{Operand, PartitionExpr, RestrictedOp};
27
28/// Extracts a list of [`PartitionExpr`] from given [`LogicalPlan`]
29pub struct PredicateExtractor;
30
31impl PredicateExtractor {
32    /// Extract partition expressions for partition columns from logical plan  
33    /// This method returns PartitionExpr objects suitable for ConstraintPruner
34    pub fn extract_partition_expressions(
35        plan: &LogicalPlan,
36        partition_columns: &[String],
37    ) -> DfResult<Vec<PartitionExpr>> {
38        // Collect all filter expressions from the logical plan
39        let mut filter_exprs = Vec::new();
40        Self::collect_filter_expressions(plan, &mut filter_exprs)?;
41
42        if filter_exprs.is_empty() {
43            return Ok(Vec::new());
44        }
45
46        // Convert each DataFusion filter expression to PartitionExpr
47        let mut partition_exprs = Vec::with_capacity(filter_exprs.len());
48        let partition_set: HashSet<String> = partition_columns.iter().cloned().collect();
49
50        for filter_expr in filter_exprs {
51            match DataFusionExprConverter::convert(&filter_expr) {
52                Ok(partition_expr) => {
53                    // Check expression for safe partition pruning
54                    match ExpressionChecker::check_expression_for_pruning(
55                        &partition_expr,
56                        &partition_set,
57                    ) {
58                        ExpressionCheckResult::UseAsIs(expr) => {
59                            partition_exprs.push(expr);
60                        }
61                        ExpressionCheckResult::UsePartial(exprs) => {
62                            partition_exprs.extend(exprs);
63                        }
64                        ExpressionCheckResult::Drop => {
65                            debug!(
66                                "Dropping mixed expression for correctness: {}",
67                                partition_expr
68                            );
69                        }
70                    }
71                }
72                Err(err) => {
73                    debug!(
74                        "Failed to convert filter expression to PartitionExpr: {}, skipping",
75                        err
76                    );
77                    continue;
78                }
79            }
80        }
81
82        debug!(
83            "Extracted {} partition expressions from logical plan for partition columns: {:?}",
84            partition_exprs.len(),
85            partition_columns
86        );
87
88        Ok(partition_exprs)
89    }
90
91    /// Collect all filter expressions from a logical plan
92    fn collect_filter_expressions(plan: &LogicalPlan, expressions: &mut Vec<Expr>) -> DfResult<()> {
93        if let LogicalPlan::Filter(filter) = plan {
94            expressions.push(filter.predicate.clone());
95        }
96
97        // Recursively visit children
98        for child in plan.inputs() {
99            Self::collect_filter_expressions(child, expressions)?;
100        }
101
102        // TODO(ruihang): support plans involve multiple relations.
103        if plan.inputs().len() > 1 {
104            expressions.clear();
105        }
106
107        Ok(())
108    }
109}
110
111/// Result of analyzing an expression for partition pruning safety
112#[derive(Debug, Clone)]
113enum ExpressionCheckResult {
114    /// Expression is safe to use as-is for pruning (only involves partition columns)
115    UseAsIs(PartitionExpr),
116    /// Extract only these parts for AND expressions (mixed with non-partition columns)
117    UsePartial(Vec<PartitionExpr>),
118    /// Drop the entire expression (unsafe for pruning, e.g., OR with non-partition columns)
119    Drop,
120}
121
122/// Checks expressions to determine safe pruning strategies for mixed partition/non-partition expressions
123struct ExpressionChecker;
124
125impl ExpressionChecker {
126    /// Check a partition expression to determine how it should be handled for safe pruning
127    fn check_expression_for_pruning(
128        expr: &PartitionExpr,
129        partition_columns: &HashSet<String>,
130    ) -> ExpressionCheckResult {
131        match expr.op() {
132            RestrictedOp::And => {
133                // For AND expressions, we can extract only the partition-related parts
134                // because if any part fails, the entire AND fails
135                let mut partition_constraints = Vec::new();
136                Self::extract_and_constraints(expr, partition_columns, &mut partition_constraints);
137
138                if partition_constraints.is_empty() {
139                    ExpressionCheckResult::Drop
140                } else if Self::expr_only_involves_partition_columns(expr, partition_columns) {
141                    // If the entire expression only involves partition columns, use as-is
142                    ExpressionCheckResult::UseAsIs(expr.clone())
143                } else {
144                    // Mixed expression: return only the partition parts
145                    ExpressionCheckResult::UsePartial(partition_constraints)
146                }
147            }
148            RestrictedOp::Or => {
149                // For OR expressions, we can only use them if ALL branches involve only partition columns
150                // because if any branch involves non-partition columns, we cannot safely prune
151                if Self::expr_only_involves_partition_columns(expr, partition_columns) {
152                    ExpressionCheckResult::UseAsIs(expr.clone())
153                } else {
154                    // Mixed OR expression: must drop entirely for safety
155                    ExpressionCheckResult::Drop
156                }
157            }
158            _ => {
159                // For comparison operations (=, <, >, etc.), check if they only involve partition columns
160                if Self::expr_only_involves_partition_columns(expr, partition_columns) {
161                    ExpressionCheckResult::UseAsIs(expr.clone())
162                } else {
163                    ExpressionCheckResult::Drop
164                }
165            }
166        }
167    }
168
169    /// Extract partition-related constraints from AND expressions recursively
170    fn extract_and_constraints(
171        expr: &PartitionExpr,
172        partition_columns: &HashSet<String>,
173        result: &mut Vec<PartitionExpr>,
174    ) {
175        if let RestrictedOp::And = expr.op() {
176            // Recursively process both sides of AND
177            Self::extract_constraints_from_operand(expr.lhs(), partition_columns, result);
178            Self::extract_constraints_from_operand(expr.rhs(), partition_columns, result);
179        } else {
180            // Non-AND expression: check if it involves partition columns
181            if Self::expr_only_involves_partition_columns(expr, partition_columns) {
182                result.push(expr.clone());
183            }
184        }
185    }
186
187    /// Extract constraints from an operand (which might be a column, value, or nested expression)
188    fn extract_constraints_from_operand(
189        operand: &Operand,
190        partition_columns: &HashSet<String>,
191        result: &mut Vec<PartitionExpr>,
192    ) {
193        match operand {
194            Operand::Column(_) | Operand::Value(_) => {
195                // This shouldn't happen in well-formed expressions
196            }
197            Operand::Expr(expr) => {
198                Self::extract_and_constraints(expr, partition_columns, result);
199            }
200        }
201    }
202
203    /// Check if an expression involves ONLY partition columns
204    fn expr_only_involves_partition_columns(
205        expr: &PartitionExpr,
206        partition_columns: &HashSet<String>,
207    ) -> bool {
208        Self::operand_only_involves_partition_columns(expr.lhs(), partition_columns)
209            && Self::operand_only_involves_partition_columns(expr.rhs(), partition_columns)
210    }
211
212    /// Check if an operand involves ONLY partition columns or values
213    fn operand_only_involves_partition_columns(
214        operand: &Operand,
215        partition_columns: &HashSet<String>,
216    ) -> bool {
217        match operand {
218            Operand::Column(col) => partition_columns.contains(col),
219            Operand::Value(_) => true, // Values are always safe
220            Operand::Expr(expr) => {
221                Self::expr_only_involves_partition_columns(expr, partition_columns)
222            }
223        }
224    }
225}
226
227/// Converts DataFusion expressions to PartitionExpr
228struct DataFusionExprConverter;
229
230impl DataFusionExprConverter {
231    /// Convert DataFusion Expr to PartitionExpr
232    pub fn convert(expr: &Expr) -> DfResult<PartitionExpr> {
233        match expr {
234            Expr::BinaryExpr(binary_expr) => {
235                let lhs = Self::convert_to_operand(&binary_expr.left)?;
236                let rhs = Self::convert_to_operand(&binary_expr.right)?;
237                let op = Self::convert_operator(&binary_expr.op)?;
238
239                Ok(PartitionExpr::new(lhs, op, rhs))
240            }
241            Expr::InList(inlist_expr) => {
242                // Convert col IN (val1, val2, val3) to col = val1 OR col = val2 OR col = val3
243                // Handle negation: col NOT IN (val1, val2) to col != val1 AND col != val2
244                let column_operand = Self::convert_to_operand(&inlist_expr.expr)?;
245
246                if inlist_expr.list.is_empty() {
247                    return Err(datafusion_common::DataFusionError::Plan(
248                        "InList with empty list is not supported".to_string(),
249                    ));
250                }
251
252                let op = if inlist_expr.negated {
253                    RestrictedOp::NotEq
254                } else {
255                    RestrictedOp::Eq
256                };
257
258                let connector_op = if inlist_expr.negated {
259                    RestrictedOp::And // NOT IN becomes col != val1 AND col != val2
260                } else {
261                    RestrictedOp::Or // IN becomes col = val1 OR col = val2
262                };
263
264                // Convert each value in the list to an equality/inequality expression
265                let mut expressions = Vec::new();
266                for value_expr in &inlist_expr.list {
267                    let value_operand = Self::convert_to_operand(value_expr)?;
268                    expressions.push(PartitionExpr::new(
269                        column_operand.clone(),
270                        op.clone(),
271                        value_operand,
272                    ));
273                }
274
275                // Chain expressions with OR/AND
276                let mut expr_iter = expressions.into_iter();
277                let mut result = expr_iter.next().unwrap();
278                for expr in expr_iter {
279                    result = PartitionExpr::new(
280                        Operand::Expr(result),
281                        connector_op.clone(),
282                        Operand::Expr(expr),
283                    );
284                }
285
286                Ok(result)
287            }
288            Expr::Between(between_expr) => {
289                // Convert col BETWEEN low AND high to col >= low AND col <= high
290                // Handle negation: col NOT BETWEEN low AND high to col < low OR col > high
291                let column_operand = Self::convert_to_operand(&between_expr.expr)?;
292                let low_operand = Self::convert_to_operand(&between_expr.low)?;
293                let high_operand = Self::convert_to_operand(&between_expr.high)?;
294
295                if between_expr.negated {
296                    // NOT BETWEEN: col < low OR col > high
297                    let left_expr =
298                        PartitionExpr::new(column_operand.clone(), RestrictedOp::Lt, low_operand);
299                    let right_expr =
300                        PartitionExpr::new(column_operand, RestrictedOp::Gt, high_operand);
301                    Ok(PartitionExpr::new(
302                        Operand::Expr(left_expr),
303                        RestrictedOp::Or,
304                        Operand::Expr(right_expr),
305                    ))
306                } else {
307                    // BETWEEN: col >= low AND col <= high
308                    let left_expr =
309                        PartitionExpr::new(column_operand.clone(), RestrictedOp::GtEq, low_operand);
310                    let right_expr =
311                        PartitionExpr::new(column_operand, RestrictedOp::LtEq, high_operand);
312                    Ok(PartitionExpr::new(
313                        Operand::Expr(left_expr),
314                        RestrictedOp::And,
315                        Operand::Expr(right_expr),
316                    ))
317                }
318            }
319            Expr::IsNull(expr) => {
320                // Convert col IS NULL to a PartitionExpr
321                let column_operand = Self::convert_to_operand(expr)?;
322                Ok(PartitionExpr::new(
323                    column_operand,
324                    RestrictedOp::Eq,
325                    Operand::Value(Value::Null),
326                ))
327            }
328            Expr::IsNotNull(expr) => {
329                // Convert col IS NOT NULL to a PartitionExpr
330                let column_operand = Self::convert_to_operand(expr)?;
331                Ok(PartitionExpr::new(
332                    column_operand,
333                    RestrictedOp::NotEq,
334                    Operand::Value(Value::Null),
335                ))
336            }
337            Expr::Not(expr) => {
338                // Handle NOT expressions by inverting the inner expression
339                match expr.as_ref() {
340                    Expr::BinaryExpr(binary_expr) => {
341                        let lhs = Self::convert_to_operand(&binary_expr.left)?;
342                        let rhs = Self::convert_to_operand(&binary_expr.right)?;
343                        let inverted_op = Self::invert_operator(&binary_expr.op)?;
344
345                        Ok(PartitionExpr::new(lhs, inverted_op, rhs))
346                    }
347                    Expr::IsNull(inner_expr) => {
348                        // NOT (col IS NULL) becomes col IS NOT NULL
349                        let column_operand = Self::convert_to_operand(inner_expr)?;
350                        Ok(PartitionExpr::new(
351                            column_operand,
352                            RestrictedOp::NotEq,
353                            Operand::Value(Value::Null),
354                        ))
355                    }
356                    Expr::IsNotNull(inner_expr) => {
357                        // NOT (col IS NOT NULL) becomes col IS NULL
358                        let column_operand = Self::convert_to_operand(inner_expr)?;
359                        Ok(PartitionExpr::new(
360                            column_operand,
361                            RestrictedOp::Eq,
362                            Operand::Value(Value::Null),
363                        ))
364                    }
365                    _ => {
366                        debug!(
367                            "Unsupported NOT expression for partition pruning: {:?}",
368                            expr
369                        );
370                        Err(datafusion_common::DataFusionError::Plan(format!(
371                            "NOT expression with inner type {:?} not supported for partition pruning",
372                            expr
373                        )))
374                    }
375                }
376            }
377            _ => Err(datafusion_common::DataFusionError::Plan(format!(
378                "Unsupported expression type for conversion: {:?}",
379                expr
380            ))),
381        }
382    }
383
384    /// Convert DataFusion Expr to Operand
385    fn convert_to_operand(expr: &Expr) -> DfResult<Operand> {
386        match expr {
387            Expr::Column(col) => {
388                // Handle qualified column names (table.column) by extracting just the column name
389                // For partition pruning, we typically only care about the column name itself
390                let column_name = if let Some(relation) = &col.relation {
391                    debug!(
392                        "Using qualified column reference: {}.{}",
393                        relation, col.name
394                    );
395                    col.name.clone()
396                } else {
397                    col.name.clone()
398                };
399                Ok(Operand::Column(column_name))
400            }
401            Expr::Literal(scalar_value, _) => {
402                let value = Value::try_from(scalar_value.clone()).unwrap();
403                Ok(Operand::Value(value))
404            }
405            Expr::Alias(alias_expr) => {
406                // Unwrap alias to get the actual expression
407                Self::convert_to_operand(&alias_expr.expr)
408            }
409            Expr::Cast(cast_expr) => {
410                // For safe casts, unwrap to the inner expression
411                // For unsafe casts, skip with debug logging
412                if Self::is_safe_cast_for_partition_pruning(&cast_expr.data_type) {
413                    Self::convert_to_operand(&cast_expr.expr)
414                } else {
415                    debug!(
416                        "Skipping unsafe cast for partition pruning: {:?}",
417                        cast_expr.data_type
418                    );
419                    Err(datafusion_common::DataFusionError::Plan(format!(
420                        "Cast to {:?} not supported for partition pruning",
421                        cast_expr.data_type
422                    )))
423                }
424            }
425            other => {
426                let partition_expr = Self::convert(other)?;
427                Ok(Operand::Expr(partition_expr))
428            }
429        }
430    }
431
432    /// Convert DataFusion Operator to RestrictedOp
433    fn convert_operator(op: &Operator) -> DfResult<RestrictedOp> {
434        match op {
435            Operator::Eq => Ok(RestrictedOp::Eq),
436            Operator::NotEq => Ok(RestrictedOp::NotEq),
437            Operator::Lt => Ok(RestrictedOp::Lt),
438            Operator::LtEq => Ok(RestrictedOp::LtEq),
439            Operator::Gt => Ok(RestrictedOp::Gt),
440            Operator::GtEq => Ok(RestrictedOp::GtEq),
441            Operator::And => Ok(RestrictedOp::And),
442            Operator::Or => Ok(RestrictedOp::Or),
443            _ => Err(datafusion_common::DataFusionError::Plan(format!(
444                "Unsupported operator: {:?}",
445                op
446            ))),
447        }
448    }
449
450    /// Invert a DataFusion Operator for NOT expressions
451    fn invert_operator(op: &Operator) -> DfResult<RestrictedOp> {
452        let Some(negated) = op.negate() else {
453            return Err(datafusion_common::DataFusionError::Plan(format!(
454                "Cannot invert operator: {:?}",
455                op
456            )));
457        };
458        Self::convert_operator(&negated)
459    }
460
461    /// Determine if a cast is safe for partition pruning
462    /// Safe casts don't change the logical meaning of constraints
463    fn is_safe_cast_for_partition_pruning(data_type: &DataType) -> bool {
464        match data_type {
465            // Integer widening casts are generally safe
466            DataType::Int8 => true,
467            DataType::Int16 => true,
468            DataType::Int32 => true,
469            DataType::Int64 => true,
470            DataType::UInt8 => true,
471            DataType::UInt16 => true,
472            DataType::UInt32 => true,
473            DataType::UInt64 => true,
474
475            // Float casts are generally safe for equality/inequality comparisons
476            DataType::Float32 => true,
477            DataType::Float64 => true,
478
479            // String casts might be safe in some cases
480            DataType::Utf8 => true,
481            DataType::LargeUtf8 => true,
482
483            // Date/time casts might be safe if they don't change precision significantly
484            DataType::Date32 => true,
485            DataType::Date64 => true,
486            DataType::Timestamp(_, _) => true,
487
488            // Boolean casts are straightforward
489            DataType::Boolean => true,
490
491            // For other types, be conservative and skip
492            _ => false,
493        }
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use std::sync::Arc;
500
501    use datafusion::arrow::datatypes::{DataType, Field, Schema};
502    use datafusion::common::Column;
503    use datafusion::datasource::DefaultTableSource;
504    use datafusion_expr::{col, lit, LogicalPlanBuilder};
505    use datatypes::value::Value;
506    use partition::expr::{Operand, PartitionExpr, RestrictedOp};
507
508    use super::*;
509
510    fn create_test_table_scan() -> LogicalPlan {
511        let schema = Arc::new(Schema::new(vec![
512            Field::new(
513                "timestamp",
514                DataType::Timestamp(datafusion::arrow::datatypes::TimeUnit::Millisecond, None),
515                false,
516            ),
517            Field::new("user_id", DataType::Int64, false),
518            Field::new("value", DataType::Int64, false),
519        ]));
520
521        let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
522        let table_source = Arc::new(DefaultTableSource::new(Arc::new(empty_table)));
523
524        LogicalPlanBuilder::scan("test", table_source, None)
525            .unwrap()
526            .build()
527            .unwrap()
528    }
529
530    struct FilterTestCase {
531        name: &'static str,
532        filter_expr: Expr,
533        expected_partition_exprs: Vec<PartitionExpr>,
534        partition_columns: Vec<&'static str>,
535    }
536
537    impl FilterTestCase {
538        fn new(
539            name: &'static str,
540            filter_expr: Expr,
541            expected_partition_exprs: Vec<PartitionExpr>,
542            partition_columns: Vec<&'static str>,
543        ) -> Self {
544            Self {
545                name,
546                filter_expr,
547                expected_partition_exprs,
548                partition_columns,
549            }
550        }
551    }
552
553    /// Helper to check partition expressions for a set of test cases.
554    fn check_partition_expressions(cases: Vec<FilterTestCase>) {
555        for case in cases {
556            let table_scan = create_test_table_scan();
557            let filter = case.filter_expr.clone();
558
559            let plan = LogicalPlanBuilder::from(table_scan)
560                .filter(filter)
561                .unwrap()
562                .build()
563                .unwrap();
564
565            let partition_columns: Vec<String> = case
566                .partition_columns
567                .iter()
568                .map(|s| s.to_string())
569                .collect();
570            let partition_exprs =
571                PredicateExtractor::extract_partition_expressions(&plan, &partition_columns)
572                    .unwrap();
573            let expected = case.expected_partition_exprs.clone();
574            assert_eq!(
575                partition_exprs, expected,
576                "Test case '{}': expected partition expressions {:?}, got {:?}",
577                case.name, expected, partition_exprs
578            );
579        }
580    }
581
582    #[test]
583    fn test_basic_constraints_extraction() {
584        let cases = vec![
585            FilterTestCase::new(
586                "non_partition_column_ignored",
587                col("value").gt_eq(lit(100i64)),
588                vec![],
589                vec!["user_id"],
590            ),
591            FilterTestCase::new(
592                "simple_constraint",
593                col("user_id").gt_eq(lit(100i64)),
594                vec![PartitionExpr::new(
595                    Operand::Column("user_id".to_string()),
596                    RestrictedOp::GtEq,
597                    Operand::Value(Value::Int64(100)),
598                )],
599                vec!["user_id"],
600            ),
601            FilterTestCase::new(
602                "or_expression",
603                col("user_id")
604                    .eq(lit(100i64))
605                    .or(col("user_id").eq(lit(200i64))),
606                vec![PartitionExpr::new(
607                    Operand::Expr(PartitionExpr::new(
608                        Operand::Column("user_id".to_string()),
609                        RestrictedOp::Eq,
610                        Operand::Value(Value::Int64(100)),
611                    )),
612                    RestrictedOp::Or,
613                    Operand::Expr(PartitionExpr::new(
614                        Operand::Column("user_id".to_string()),
615                        RestrictedOp::Eq,
616                        Operand::Value(Value::Int64(200)),
617                    )),
618                )],
619                vec!["user_id"],
620            ),
621            FilterTestCase::new(
622                "complex_and_or",
623                col("user_id")
624                    .gt_eq(lit(100i64))
625                    .and(col("user_id").lt(lit(200i64)))
626                    .or(col("user_id")
627                        .gt_eq(lit(300i64))
628                        .and(col("user_id").lt(lit(400i64)))),
629                vec![PartitionExpr::new(
630                    Operand::Expr(PartitionExpr::new(
631                        Operand::Expr(PartitionExpr::new(
632                            Operand::Column("user_id".to_string()),
633                            RestrictedOp::GtEq,
634                            Operand::Value(Value::Int64(100)),
635                        )),
636                        RestrictedOp::And,
637                        Operand::Expr(PartitionExpr::new(
638                            Operand::Column("user_id".to_string()),
639                            RestrictedOp::Lt,
640                            Operand::Value(Value::Int64(200)),
641                        )),
642                    )),
643                    RestrictedOp::Or,
644                    Operand::Expr(PartitionExpr::new(
645                        Operand::Expr(PartitionExpr::new(
646                            Operand::Column("user_id".to_string()),
647                            RestrictedOp::GtEq,
648                            Operand::Value(Value::Int64(300)),
649                        )),
650                        RestrictedOp::And,
651                        Operand::Expr(PartitionExpr::new(
652                            Operand::Column("user_id".to_string()),
653                            RestrictedOp::Lt,
654                            Operand::Value(Value::Int64(400)),
655                        )),
656                    )),
657                )],
658                vec!["user_id"],
659            ),
660        ];
661        check_partition_expressions(cases);
662    }
663
664    #[test]
665    fn test_alias_expressions() {
666        let cases = vec![
667            FilterTestCase::new(
668                "simple_alias",
669                col("user_id").alias("uid").eq(lit(100i64)),
670                vec![PartitionExpr::new(
671                    Operand::Column("user_id".to_string()),
672                    RestrictedOp::Eq,
673                    Operand::Value(Value::Int64(100)),
674                )],
675                vec!["user_id"],
676            ),
677            FilterTestCase::new(
678                "nested_alias",
679                col("user_id").alias("uid").alias("u").gt_eq(lit(50i64)),
680                vec![PartitionExpr::new(
681                    Operand::Column("user_id".to_string()),
682                    RestrictedOp::GtEq,
683                    Operand::Value(Value::Int64(50)),
684                )],
685                vec!["user_id"],
686            ),
687            FilterTestCase::new(
688                "complex_alias_with_and_or",
689                col("user_id")
690                    .alias("uid")
691                    .gt_eq(lit(100i64))
692                    .and(col("user_id").alias("u").lt(lit(200i64)))
693                    .or(col("user_id").alias("id").eq(lit(300i64))),
694                vec![PartitionExpr::new(
695                    Operand::Expr(PartitionExpr::new(
696                        Operand::Expr(PartitionExpr::new(
697                            Operand::Column("user_id".to_string()),
698                            RestrictedOp::GtEq,
699                            Operand::Value(Value::Int64(100)),
700                        )),
701                        RestrictedOp::And,
702                        Operand::Expr(PartitionExpr::new(
703                            Operand::Column("user_id".to_string()),
704                            RestrictedOp::Lt,
705                            Operand::Value(Value::Int64(200)),
706                        )),
707                    )),
708                    RestrictedOp::Or,
709                    Operand::Expr(PartitionExpr::new(
710                        Operand::Column("user_id".to_string()),
711                        RestrictedOp::Eq,
712                        Operand::Value(Value::Int64(300)),
713                    )),
714                )],
715                vec!["user_id"],
716            ),
717        ];
718        check_partition_expressions(cases);
719    }
720
721    #[test]
722    fn test_inlist_expressions() {
723        let cases = vec![
724            FilterTestCase::new(
725                "simple_inlist",
726                col("user_id").in_list(vec![lit(100i64), lit(200i64), lit(300i64)], false),
727                vec![PartitionExpr::new(
728                    Operand::Expr(PartitionExpr::new(
729                        Operand::Expr(PartitionExpr::new(
730                            Operand::Column("user_id".to_string()),
731                            RestrictedOp::Eq,
732                            Operand::Value(Value::Int64(100)),
733                        )),
734                        RestrictedOp::Or,
735                        Operand::Expr(PartitionExpr::new(
736                            Operand::Column("user_id".to_string()),
737                            RestrictedOp::Eq,
738                            Operand::Value(Value::Int64(200)),
739                        )),
740                    )),
741                    RestrictedOp::Or,
742                    Operand::Expr(PartitionExpr::new(
743                        Operand::Column("user_id".to_string()),
744                        RestrictedOp::Eq,
745                        Operand::Value(Value::Int64(300)),
746                    )),
747                )],
748                vec!["user_id"],
749            ),
750            FilterTestCase::new(
751                "negated_inlist",
752                col("user_id").in_list(vec![lit(100i64), lit(200i64)], true),
753                vec![PartitionExpr::new(
754                    Operand::Expr(PartitionExpr::new(
755                        Operand::Column("user_id".to_string()),
756                        RestrictedOp::NotEq,
757                        Operand::Value(Value::Int64(100)),
758                    )),
759                    RestrictedOp::And,
760                    Operand::Expr(PartitionExpr::new(
761                        Operand::Column("user_id".to_string()),
762                        RestrictedOp::NotEq,
763                        Operand::Value(Value::Int64(200)),
764                    )),
765                )],
766                vec!["user_id"],
767            ),
768            FilterTestCase::new(
769                "inlist_with_alias",
770                col("user_id")
771                    .alias("uid")
772                    .in_list(vec![lit(100i64), lit(200i64)], false),
773                vec![PartitionExpr::new(
774                    Operand::Expr(PartitionExpr::new(
775                        Operand::Column("user_id".to_string()),
776                        RestrictedOp::Eq,
777                        Operand::Value(Value::Int64(100)),
778                    )),
779                    RestrictedOp::Or,
780                    Operand::Expr(PartitionExpr::new(
781                        Operand::Column("user_id".to_string()),
782                        RestrictedOp::Eq,
783                        Operand::Value(Value::Int64(200)),
784                    )),
785                )],
786                vec!["user_id"],
787            ),
788        ];
789        check_partition_expressions(cases);
790    }
791
792    #[test]
793    fn test_between_expressions() {
794        let cases = vec![
795            FilterTestCase::new(
796                "simple_between",
797                col("user_id").between(lit(100i64), lit(200i64)),
798                vec![PartitionExpr::new(
799                    Operand::Expr(PartitionExpr::new(
800                        Operand::Column("user_id".to_string()),
801                        RestrictedOp::GtEq,
802                        Operand::Value(Value::Int64(100)),
803                    )),
804                    RestrictedOp::And,
805                    Operand::Expr(PartitionExpr::new(
806                        Operand::Column("user_id".to_string()),
807                        RestrictedOp::LtEq,
808                        Operand::Value(Value::Int64(200)),
809                    )),
810                )],
811                vec!["user_id"],
812            ),
813            FilterTestCase::new(
814                "negated_between",
815                Expr::Between(datafusion_expr::Between {
816                    expr: Box::new(col("user_id")),
817                    negated: true,
818                    low: Box::new(lit(100i64)),
819                    high: Box::new(lit(200i64)),
820                }),
821                vec![PartitionExpr::new(
822                    Operand::Expr(PartitionExpr::new(
823                        Operand::Column("user_id".to_string()),
824                        RestrictedOp::Lt,
825                        Operand::Value(Value::Int64(100)),
826                    )),
827                    RestrictedOp::Or,
828                    Operand::Expr(PartitionExpr::new(
829                        Operand::Column("user_id".to_string()),
830                        RestrictedOp::Gt,
831                        Operand::Value(Value::Int64(200)),
832                    )),
833                )],
834                vec!["user_id"],
835            ),
836            FilterTestCase::new(
837                "between_with_alias",
838                col("user_id")
839                    .alias("uid")
840                    .between(lit(100i64), lit(200i64)),
841                vec![PartitionExpr::new(
842                    Operand::Expr(PartitionExpr::new(
843                        Operand::Column("user_id".to_string()),
844                        RestrictedOp::GtEq,
845                        Operand::Value(Value::Int64(100)),
846                    )),
847                    RestrictedOp::And,
848                    Operand::Expr(PartitionExpr::new(
849                        Operand::Column("user_id".to_string()),
850                        RestrictedOp::LtEq,
851                        Operand::Value(Value::Int64(200)),
852                    )),
853                )],
854                vec!["user_id"],
855            ),
856        ];
857        check_partition_expressions(cases);
858    }
859
860    #[test]
861    fn test_null_expressions() {
862        let cases = vec![
863            FilterTestCase::new(
864                "is_null",
865                col("user_id").is_null(),
866                vec![PartitionExpr::new(
867                    Operand::Column("user_id".to_string()),
868                    RestrictedOp::Eq,
869                    Operand::Value(Value::Null),
870                )],
871                vec!["user_id"],
872            ),
873            FilterTestCase::new(
874                "is_not_null",
875                col("user_id").is_not_null(),
876                vec![PartitionExpr::new(
877                    Operand::Column("user_id".to_string()),
878                    RestrictedOp::NotEq,
879                    Operand::Value(Value::Null),
880                )],
881                vec!["user_id"],
882            ),
883            FilterTestCase::new(
884                "null_with_alias",
885                col("user_id").alias("uid").is_null(),
886                vec![PartitionExpr::new(
887                    Operand::Column("user_id".to_string()),
888                    RestrictedOp::Eq,
889                    Operand::Value(Value::Null),
890                )],
891                vec!["user_id"],
892            ),
893        ];
894        check_partition_expressions(cases);
895    }
896
897    #[test]
898    fn test_cast_expressions() {
899        let cases = vec![
900            FilterTestCase::new(
901                "safe_cast",
902                Expr::Cast(datafusion_expr::Cast {
903                    expr: Box::new(col("user_id")),
904                    data_type: DataType::Int64,
905                })
906                .eq(lit(100i64)),
907                vec![PartitionExpr::new(
908                    Operand::Column("user_id".to_string()),
909                    RestrictedOp::Eq,
910                    Operand::Value(Value::Int64(100)),
911                )],
912                vec!["user_id"],
913            ),
914            FilterTestCase::new(
915                "cast_with_alias",
916                Expr::Cast(datafusion_expr::Cast {
917                    expr: Box::new(col("user_id").alias("uid")),
918                    data_type: DataType::Int64,
919                })
920                .eq(lit(100i64)),
921                vec![PartitionExpr::new(
922                    Operand::Column("user_id".to_string()),
923                    RestrictedOp::Eq,
924                    Operand::Value(Value::Int64(100)),
925                )],
926                vec!["user_id"],
927            ),
928            FilterTestCase::new(
929                "unsafe_cast",
930                Expr::Cast(datafusion_expr::Cast {
931                    expr: Box::new(col("user_id")),
932                    data_type: DataType::List(std::sync::Arc::new(
933                        datafusion::arrow::datatypes::Field::new("item", DataType::Int32, true),
934                    )),
935                })
936                .eq(lit(100i64)),
937                vec![],
938                vec!["user_id"],
939            ),
940        ];
941        check_partition_expressions(cases);
942    }
943
944    #[test]
945    fn test_not_expressions() {
946        let cases = vec![
947            FilterTestCase::new(
948                "not_equality",
949                Expr::Not(Box::new(col("user_id").eq(lit(100i64)))),
950                vec![PartitionExpr::new(
951                    Operand::Column("user_id".to_string()),
952                    RestrictedOp::NotEq,
953                    Operand::Value(Value::Int64(100)),
954                )],
955                vec!["user_id"],
956            ),
957            FilterTestCase::new(
958                "not_comparison",
959                Expr::Not(Box::new(col("user_id").lt(lit(100i64)))),
960                vec![PartitionExpr::new(
961                    Operand::Column("user_id".to_string()),
962                    RestrictedOp::GtEq,
963                    Operand::Value(Value::Int64(100)),
964                )],
965                vec!["user_id"],
966            ),
967            FilterTestCase::new(
968                "not_is_null",
969                Expr::Not(Box::new(col("user_id").is_null())),
970                vec![PartitionExpr::new(
971                    Operand::Column("user_id".to_string()),
972                    RestrictedOp::NotEq,
973                    Operand::Value(Value::Null),
974                )],
975                vec!["user_id"],
976            ),
977            FilterTestCase::new(
978                "not_with_alias",
979                Expr::Not(Box::new(col("user_id").alias("uid").eq(lit(100i64)))),
980                vec![PartitionExpr::new(
981                    Operand::Column("user_id".to_string()),
982                    RestrictedOp::NotEq,
983                    Operand::Value(Value::Int64(100)),
984                )],
985                vec!["user_id"],
986            ),
987        ];
988        check_partition_expressions(cases);
989    }
990
991    #[test]
992    fn test_edge_cases() {
993        let cases = vec![
994            FilterTestCase::new(
995                "qualified_column_name",
996                {
997                    let qualified_col = Expr::Column(Column::new(Some("test"), "user_id"));
998                    qualified_col.eq(lit(100i64))
999                },
1000                vec![PartitionExpr::new(
1001                    Operand::Column("user_id".to_string()),
1002                    RestrictedOp::Eq,
1003                    Operand::Value(Value::Int64(100)),
1004                )],
1005                vec!["user_id"],
1006            ),
1007            FilterTestCase::new(
1008                "comprehensive_combinations",
1009                {
1010                    let in_expr = col("user_id")
1011                        .alias("uid")
1012                        .in_list(vec![lit(100i64), lit(200i64)], false);
1013                    let cast_expr = Expr::Cast(datafusion_expr::Cast {
1014                        expr: Box::new(col("user_id")),
1015                        data_type: DataType::Int64,
1016                    });
1017                    let between_expr = cast_expr.between(lit(300i64), lit(400i64));
1018                    in_expr.or(between_expr)
1019                },
1020                vec![PartitionExpr::new(
1021                    Operand::Expr(PartitionExpr::new(
1022                        Operand::Expr(PartitionExpr::new(
1023                            Operand::Column("user_id".to_string()),
1024                            RestrictedOp::Eq,
1025                            Operand::Value(Value::Int64(100)),
1026                        )),
1027                        RestrictedOp::Or,
1028                        Operand::Expr(PartitionExpr::new(
1029                            Operand::Column("user_id".to_string()),
1030                            RestrictedOp::Eq,
1031                            Operand::Value(Value::Int64(200)),
1032                        )),
1033                    )),
1034                    RestrictedOp::Or,
1035                    Operand::Expr(PartitionExpr::new(
1036                        Operand::Expr(PartitionExpr::new(
1037                            Operand::Column("user_id".to_string()),
1038                            RestrictedOp::GtEq,
1039                            Operand::Value(Value::Int64(300)),
1040                        )),
1041                        RestrictedOp::And,
1042                        Operand::Expr(PartitionExpr::new(
1043                            Operand::Column("user_id".to_string()),
1044                            RestrictedOp::LtEq,
1045                            Operand::Value(Value::Int64(400)),
1046                        )),
1047                    )),
1048                )],
1049                vec!["user_id"],
1050            ),
1051        ];
1052        check_partition_expressions(cases);
1053    }
1054
1055    #[test]
1056    fn test_mixed_partition_non_partition_expressions() {
1057        let cases = vec![
1058            // Mixed AND expression - should extract only partition part
1059            FilterTestCase::new(
1060                "mixed_and_expression",
1061                col("user_id")
1062                    .eq(lit(100i64))
1063                    .and(col("value").gt(lit(50i64))),
1064                vec![PartitionExpr::new(
1065                    Operand::Column("user_id".to_string()),
1066                    RestrictedOp::Eq,
1067                    Operand::Value(Value::Int64(100)),
1068                )],
1069                vec!["user_id"],
1070            ),
1071            // Mixed OR expression - should be dropped entirely
1072            FilterTestCase::new(
1073                "mixed_or_expression",
1074                col("user_id")
1075                    .between(lit(1i64), lit(10i64))
1076                    .or(col("value").gt(lit(50i64))),
1077                vec![], // Empty result - expression should be dropped
1078                vec!["user_id"],
1079            ),
1080            // Complex mixed AND expression with multiple parts
1081            FilterTestCase::new(
1082                "complex_mixed_and",
1083                col("user_id")
1084                    .gt_eq(lit(100i64))
1085                    .and(col("value").eq(lit(200i64)))
1086                    .and(col("timestamp").lt(lit(1000i64))),
1087                vec![
1088                    PartitionExpr::new(
1089                        Operand::Column("user_id".to_string()),
1090                        RestrictedOp::GtEq,
1091                        Operand::Value(Value::Int64(100)),
1092                    ),
1093                    PartitionExpr::new(
1094                        Operand::Column("timestamp".to_string()),
1095                        RestrictedOp::Lt,
1096                        Operand::Value(Value::Int64(1000)),
1097                    ),
1098                ],
1099                vec!["user_id", "timestamp"], // Both partition columns
1100            ),
1101            // Pure partition expression - should be kept as-is
1102            FilterTestCase::new(
1103                "pure_partition_and",
1104                col("user_id")
1105                    .gt_eq(lit(100i64))
1106                    .and(col("timestamp").lt(lit(1000i64))),
1107                vec![PartitionExpr::new(
1108                    Operand::Expr(PartitionExpr::new(
1109                        Operand::Column("user_id".to_string()),
1110                        RestrictedOp::GtEq,
1111                        Operand::Value(Value::Int64(100)),
1112                    )),
1113                    RestrictedOp::And,
1114                    Operand::Expr(PartitionExpr::new(
1115                        Operand::Column("timestamp".to_string()),
1116                        RestrictedOp::Lt,
1117                        Operand::Value(Value::Int64(1000)),
1118                    )),
1119                )],
1120                vec!["user_id", "timestamp"],
1121            ),
1122            // Pure partition OR expression - should be kept as-is
1123            FilterTestCase::new(
1124                "pure_partition_or",
1125                col("user_id")
1126                    .eq(lit(100i64))
1127                    .or(col("user_id").eq(lit(200i64))),
1128                vec![PartitionExpr::new(
1129                    Operand::Expr(PartitionExpr::new(
1130                        Operand::Column("user_id".to_string()),
1131                        RestrictedOp::Eq,
1132                        Operand::Value(Value::Int64(100)),
1133                    )),
1134                    RestrictedOp::Or,
1135                    Operand::Expr(PartitionExpr::new(
1136                        Operand::Column("user_id".to_string()),
1137                        RestrictedOp::Eq,
1138                        Operand::Value(Value::Int64(200)),
1139                    )),
1140                )],
1141                vec!["user_id"],
1142            ),
1143            // Pure non-partition expression - should be dropped
1144            FilterTestCase::new(
1145                "pure_non_partition",
1146                col("value").gt_eq(lit(100i64)),
1147                vec![], // Empty result - no partition columns involved
1148                vec!["user_id"],
1149            ),
1150            // Complex nested mixed expression
1151            FilterTestCase::new(
1152                "nested_mixed_expression",
1153                (col("user_id")
1154                    .eq(lit(100i64))
1155                    .and(col("value").gt(lit(50i64))))
1156                .or(col("user_id").eq(lit(200i64))),
1157                vec![], // Empty result - OR with mixed sub-expression should be dropped
1158                vec!["user_id"],
1159            ),
1160            // AND with nested OR (mixed) - should extract partition parts only
1161            FilterTestCase::new(
1162                "and_with_nested_mixed_or",
1163                col("user_id")
1164                    .gt_eq(lit(100i64))
1165                    .and(col("value").eq(lit(1i64)).or(col("value").eq(lit(2i64)))),
1166                vec![PartitionExpr::new(
1167                    Operand::Column("user_id".to_string()),
1168                    RestrictedOp::GtEq,
1169                    Operand::Value(Value::Int64(100)),
1170                )],
1171                vec!["user_id"],
1172            ),
1173        ];
1174        check_partition_expressions(cases);
1175    }
1176}