partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Debug, Display, Formatter};
17use std::sync::Arc;
18
19use api::v1::meta::Partition;
20use datafusion_common::{ScalarValue, ToDFSchema};
21use datafusion_expr::Expr;
22use datafusion_expr::execution_props::ExecutionProps;
23use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
24use datatypes::arrow;
25use datatypes::value::{
26    Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
27};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30use sql::statements::value_to_sql_value;
31use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
32
33use crate::error;
34use crate::partition::PartitionBound;
35
36/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
37/// by [`Self::to_parser_expr`].
38///
39/// [Expr]: sqlparser::ast::Expr
40#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
41pub struct PartitionExpr {
42    pub lhs: Box<Operand>,
43    pub op: RestrictedOp,
44    pub rhs: Box<Operand>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
48pub enum Operand {
49    Column(String),
50    Value(Value),
51    Expr(PartitionExpr),
52}
53
54pub fn col(column_name: impl Into<String>) -> Operand {
55    Operand::Column(column_name.into())
56}
57
58impl From<Value> for Operand {
59    fn from(value: Value) -> Self {
60        Operand::Value(value)
61    }
62}
63
64impl Operand {
65    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
66        match self {
67            Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
68            Self::Value(v) => {
69                let scalar_value = match v {
70                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
71                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
72                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
73                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
74                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
75                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
76                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
77                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
78                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
79                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
80                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
81                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
82                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
83                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
84                    Value::Null => ScalarValue::Null,
85                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
86                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value()))
87                        .context(error::ConvertPartitionExprValueSnafu { value: v.clone() })?,
88                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
89                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
90                    Value::IntervalMonthDayNano(v) => {
91                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
92                    }
93                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
94                    Value::Decimal128(d) => {
95                        let (v, p, s) = d.to_scalar_value();
96                        ScalarValue::Decimal128(v, p, s)
97                    }
98                    other => {
99                        return error::UnsupportedPartitionExprValueSnafu {
100                            value: other.clone(),
101                        }
102                        .fail();
103                    }
104                };
105                Ok(datafusion_expr::lit(scalar_value))
106            }
107            Self::Expr(e) => e.try_as_logical_expr(),
108        }
109    }
110
111    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
112        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
113    }
114
115    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
116        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
117    }
118
119    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
120        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
121    }
122
123    pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
124        PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
125    }
126
127    pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
128        PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
129    }
130
131    pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
132        PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
133    }
134}
135
136impl Display for Operand {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        match self {
139            Self::Column(v) => write!(f, "{v}"),
140            Self::Value(v) => write!(f, "{v}"),
141            Self::Expr(v) => write!(f, "{v}"),
142        }
143    }
144}
145
146/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
147/// partition expressions.
148#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
149pub enum RestrictedOp {
150    // Evaluate to binary
151    Eq,
152    NotEq,
153    Lt,
154    LtEq,
155    Gt,
156    GtEq,
157
158    // Conjunction
159    And,
160    Or,
161}
162
163impl RestrictedOp {
164    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
165        match op {
166            ParserBinaryOperator::Eq => Some(Self::Eq),
167            ParserBinaryOperator::NotEq => Some(Self::NotEq),
168            ParserBinaryOperator::Lt => Some(Self::Lt),
169            ParserBinaryOperator::LtEq => Some(Self::LtEq),
170            ParserBinaryOperator::Gt => Some(Self::Gt),
171            ParserBinaryOperator::GtEq => Some(Self::GtEq),
172            ParserBinaryOperator::And => Some(Self::And),
173            ParserBinaryOperator::Or => Some(Self::Or),
174            _ => None,
175        }
176    }
177
178    pub fn to_parser_op(&self) -> ParserBinaryOperator {
179        match self {
180            Self::Eq => ParserBinaryOperator::Eq,
181            Self::NotEq => ParserBinaryOperator::NotEq,
182            Self::Lt => ParserBinaryOperator::Lt,
183            Self::LtEq => ParserBinaryOperator::LtEq,
184            Self::Gt => ParserBinaryOperator::Gt,
185            Self::GtEq => ParserBinaryOperator::GtEq,
186            Self::And => ParserBinaryOperator::And,
187            Self::Or => ParserBinaryOperator::Or,
188        }
189    }
190
191    fn invert_for_swap(&self) -> Self {
192        match self {
193            Self::Eq => Self::Eq,
194            Self::NotEq => Self::NotEq,
195            Self::Lt => Self::Gt,
196            Self::LtEq => Self::GtEq,
197            Self::Gt => Self::Lt,
198            Self::GtEq => Self::LtEq,
199            Self::And => Self::And,
200            Self::Or => Self::Or,
201        }
202    }
203}
204impl Display for RestrictedOp {
205    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
206        match self {
207            Self::Eq => write!(f, "="),
208            Self::NotEq => write!(f, "<>"),
209            Self::Lt => write!(f, "<"),
210            Self::LtEq => write!(f, "<="),
211            Self::Gt => write!(f, ">"),
212            Self::GtEq => write!(f, ">="),
213            Self::And => write!(f, "AND"),
214            Self::Or => write!(f, "OR"),
215        }
216    }
217}
218
219impl PartitionExpr {
220    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
221        Self {
222            lhs: Box::new(lhs),
223            op,
224            rhs: Box::new(rhs),
225        }
226        .canonicalize()
227    }
228
229    /// Canonicalize to `Column op Value` form when possible for consistent equality checks.
230    pub fn canonicalize(self) -> Self {
231        let lhs = Self::canonicalize_operand(*self.lhs);
232        let rhs = Self::canonicalize_operand(*self.rhs);
233        let mut expr = Self {
234            lhs: Box::new(lhs),
235            op: self.op,
236            rhs: Box::new(rhs),
237        };
238
239        if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
240            std::mem::swap(&mut expr.lhs, &mut expr.rhs);
241            expr.op = expr.op.invert_for_swap();
242        }
243
244        expr
245    }
246
247    fn canonicalize_operand(operand: Operand) -> Operand {
248        match operand {
249            Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
250            other => other,
251        }
252    }
253
254    /// Convert [Self] back to sqlparser's [Expr]
255    ///
256    /// [Expr]: ParserExpr
257    pub fn to_parser_expr(&self) -> ParserExpr {
258        // Safety: Partition rule won't contains unsupported value type.
259        // Otherwise it will be rejected by the parser.
260        let lhs = match &*self.lhs {
261            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
262            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
263            Operand::Expr(e) => e.to_parser_expr(),
264        };
265
266        let rhs = match &*self.rhs {
267            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
268            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
269            Operand::Expr(e) => e.to_parser_expr(),
270        };
271
272        ParserExpr::BinaryOp {
273            left: Box::new(lhs),
274            op: self.op.to_parser_op(),
275            right: Box::new(rhs),
276        }
277    }
278
279    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
280        // Special handling for null equality.
281        // `col = NULL` -> `col IS NULL` to match SQL (DataFusion) semantics.
282        let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
283        let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
284
285        match (self.op.clone(), lhs_is_null, rhs_is_null) {
286            (RestrictedOp::Eq, _, true) => {
287                return Ok(self.lhs.try_as_logical_expr()?.is_null());
288            }
289            (RestrictedOp::Eq, true, _) => {
290                return Ok(self.rhs.try_as_logical_expr()?.is_null());
291            }
292            (RestrictedOp::NotEq, _, true) => {
293                return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
294            }
295            (RestrictedOp::NotEq, true, _) => {
296                return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
297            }
298            _ => {}
299        }
300
301        if matches!(
302            self.op,
303            RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
304        ) {
305            // Keep filtering semantics aligned with direct PartitionExpr evaluation (null-first ordering).
306            // In DataFusion SQL semantics, range comparisons with NULL yield NULL, so we inject
307            // `OR col IS NULL` on the null-first side of the comparison.
308            if matches!(self.lhs.as_ref(), Operand::Column(_)) {
309                let column_expr = self.lhs.try_as_logical_expr()?;
310                let other_expr = self.rhs.try_as_logical_expr()?;
311                let base = match self.op {
312                    RestrictedOp::Lt => {
313                        column_expr.clone().lt(other_expr).or(column_expr.is_null())
314                    }
315                    RestrictedOp::LtEq => column_expr
316                        .clone()
317                        .lt_eq(other_expr)
318                        .or(column_expr.is_null()),
319                    RestrictedOp::Gt => column_expr
320                        .clone()
321                        .gt(other_expr)
322                        .and(column_expr.is_not_null()),
323                    RestrictedOp::GtEq => column_expr
324                        .clone()
325                        .gt_eq(other_expr)
326                        .and(column_expr.is_not_null()),
327                    _ => unreachable!(),
328                };
329                return Ok(base);
330            } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
331                let other_expr = self.lhs.try_as_logical_expr()?;
332                let column_expr = self.rhs.try_as_logical_expr()?;
333                let base = match self.op {
334                    RestrictedOp::Lt => other_expr
335                        .lt(column_expr.clone())
336                        .and(column_expr.is_not_null()),
337                    RestrictedOp::LtEq => other_expr
338                        .lt_eq(column_expr.clone())
339                        .and(column_expr.is_not_null()),
340                    RestrictedOp::Gt => {
341                        other_expr.gt(column_expr.clone()).or(column_expr.is_null())
342                    }
343                    RestrictedOp::GtEq => other_expr
344                        .gt_eq(column_expr.clone())
345                        .or(column_expr.is_null()),
346                    _ => unreachable!(),
347                };
348                return Ok(base);
349            }
350        }
351
352        // Normal cases handling, without NULL
353        let lhs = self.lhs.try_as_logical_expr()?;
354        let rhs = self.rhs.try_as_logical_expr()?;
355
356        let expr = match &self.op {
357            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
358            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
359            RestrictedOp::Gt => lhs.gt(rhs),
360            RestrictedOp::GtEq => lhs.gt_eq(rhs),
361            RestrictedOp::Lt => lhs.lt(rhs),
362            RestrictedOp::LtEq => lhs.lt_eq(rhs),
363            RestrictedOp::Eq => lhs.eq(rhs),
364            RestrictedOp::NotEq => lhs.not_eq(rhs),
365        };
366        Ok(expr)
367    }
368
369    /// Get the left-hand side operand
370    pub fn lhs(&self) -> &Operand {
371        &self.lhs
372    }
373
374    /// Get the right-hand side operand
375    pub fn rhs(&self) -> &Operand {
376        &self.rhs
377    }
378
379    /// Get the operation
380    pub fn op(&self) -> &RestrictedOp {
381        &self.op
382    }
383
384    pub fn try_as_physical_expr(
385        &self,
386        schema: &arrow::datatypes::SchemaRef,
387    ) -> error::Result<Arc<dyn PhysicalExpr>> {
388        let df_schema = schema
389            .clone()
390            .to_dfschema_ref()
391            .context(error::ToDFSchemaSnafu)?;
392        let execution_props = &ExecutionProps::default();
393        let expr = self.try_as_logical_expr()?;
394        create_physical_expr(&expr, &df_schema, execution_props)
395            .context(error::CreatePhysicalExprSnafu)
396    }
397
398    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
399        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
400    }
401
402    /// Serializes `PartitionExpr` to json string.
403    ///
404    /// Wraps `PartitionBound::Expr` for compatibility.
405    pub fn as_json_str(&self) -> error::Result<String> {
406        serde_json::to_string(&PartitionBound::Expr(self.clone()))
407            .context(error::SerializeJsonSnafu)
408    }
409
410    /// Deserializes `PartitionExpr` from json string.
411    ///
412    /// Deserializes to `PartitionBound` for compatibility.
413    pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
414        if s.is_empty() {
415            return Ok(None);
416        }
417
418        let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
419        match bound {
420            PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
421            _ => Ok(None),
422        }
423    }
424
425    /// Converts [Self] to [Partition].
426    pub fn as_pb_partition(&self) -> error::Result<Partition> {
427        Ok(Partition {
428            expression: self.as_json_str()?,
429            ..Default::default()
430        })
431    }
432
433    /// Collects all column names referenced by this expression.
434    pub fn collect_column_names(&self, columns: &mut HashSet<String>) {
435        Self::collect_operand_columns(&self.lhs, columns);
436        Self::collect_operand_columns(&self.rhs, columns);
437    }
438
439    fn collect_operand_columns(operand: &Operand, columns: &mut HashSet<String>) {
440        match operand {
441            Operand::Column(c) => {
442                columns.insert(c.clone());
443            }
444            Operand::Expr(e) => {
445                e.collect_column_names(columns);
446            }
447            Operand::Value(_) => {}
448        }
449    }
450}
451
452impl Display for PartitionExpr {
453    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
454        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
455    }
456}
457
458#[cfg(test)]
459mod tests {
460    use super::*;
461
462    #[test]
463    fn test_partition_expr() {
464        let cases = [
465            (
466                Operand::Column("a".to_string()),
467                RestrictedOp::Eq,
468                Operand::Value(Value::UInt32(10)),
469                "a = 10",
470            ),
471            (
472                Operand::Column("a".to_string()),
473                RestrictedOp::NotEq,
474                Operand::Value(Value::UInt32(10)),
475                "a <> 10",
476            ),
477            (
478                Operand::Column("a".to_string()),
479                RestrictedOp::Lt,
480                Operand::Value(Value::UInt32(10)),
481                "a < 10",
482            ),
483            (
484                Operand::Column("a".to_string()),
485                RestrictedOp::LtEq,
486                Operand::Value(Value::UInt32(10)),
487                "a <= 10",
488            ),
489            (
490                Operand::Column("a".to_string()),
491                RestrictedOp::Gt,
492                Operand::Value(Value::UInt32(10)),
493                "a > 10",
494            ),
495            (
496                Operand::Column("a".to_string()),
497                RestrictedOp::GtEq,
498                Operand::Value(Value::UInt32(10)),
499                "a >= 10",
500            ),
501            (
502                Operand::Column("a".to_string()),
503                RestrictedOp::And,
504                Operand::Column("b".to_string()),
505                "a AND b",
506            ),
507            (
508                Operand::Column("a".to_string()),
509                RestrictedOp::Or,
510                Operand::Column("b".to_string()),
511                "a OR b",
512            ),
513            (
514                Operand::Column("a".to_string()),
515                RestrictedOp::Or,
516                Operand::Expr(PartitionExpr::new(
517                    Operand::Column("c".to_string()),
518                    RestrictedOp::And,
519                    Operand::Column("d".to_string()),
520                )),
521                "a OR c AND d",
522            ),
523        ];
524
525        for case in cases {
526            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
527            assert_eq!(case.3, expr.to_string());
528        }
529    }
530
531    #[test]
532    fn test_try_as_logical_expr_null_equality() {
533        let eq_expr = PartitionExpr::new(
534            Operand::Column("a".to_string()),
535            RestrictedOp::Eq,
536            Operand::Value(Value::Null),
537        );
538        assert_eq!(
539            eq_expr.try_as_logical_expr().unwrap().to_string(),
540            "a IS NULL"
541        );
542
543        let neq_expr = PartitionExpr::new(
544            Operand::Column("a".to_string()),
545            RestrictedOp::NotEq,
546            Operand::Value(Value::Null),
547        );
548        assert_eq!(
549            neq_expr.try_as_logical_expr().unwrap().to_string(),
550            "a IS NOT NULL"
551        );
552    }
553
554    #[test]
555    fn test_try_as_logical_expr_null_range_comparison() {
556        // Test Lt with column on LHS
557        let lt_expr = PartitionExpr::new(
558            Operand::Column("a".to_string()),
559            RestrictedOp::Lt,
560            Operand::Value(Value::Int64(10)),
561        );
562        assert_eq!(
563            lt_expr.try_as_logical_expr().unwrap().to_string(),
564            "a < Int64(10) OR a IS NULL"
565        );
566
567        // Test Lt with column on RHS
568        let lt_expr_rhs_column = PartitionExpr::new(
569            Operand::Value(Value::Int64(10)),
570            RestrictedOp::Lt,
571            Operand::Column("a".to_string()),
572        );
573        assert_eq!(
574            lt_expr_rhs_column
575                .try_as_logical_expr()
576                .unwrap()
577                .to_string(),
578            "a > Int64(10) AND a IS NOT NULL"
579        );
580
581        // Test Gt with column on LHS
582        let gt_expr = PartitionExpr::new(
583            Operand::Column("a".to_string()),
584            RestrictedOp::Gt,
585            Operand::Value(Value::Int64(10)),
586        );
587        assert_eq!(
588            gt_expr.try_as_logical_expr().unwrap().to_string(),
589            "a > Int64(10) AND a IS NOT NULL"
590        );
591
592        // Test Gt with column on RHS
593        let gt_expr_rhs_column = PartitionExpr::new(
594            Operand::Value(Value::Int64(10)),
595            RestrictedOp::Gt,
596            Operand::Column("a".to_string()),
597        );
598        assert_eq!(
599            gt_expr_rhs_column
600                .try_as_logical_expr()
601                .unwrap()
602                .to_string(),
603            "a < Int64(10) OR a IS NULL"
604        );
605
606        // Test GtEq with column on LHS
607        let gteq_expr = PartitionExpr::new(
608            Operand::Column("a".to_string()),
609            RestrictedOp::GtEq,
610            Operand::Value(Value::Int64(10)),
611        );
612        assert_eq!(
613            gteq_expr.try_as_logical_expr().unwrap().to_string(),
614            "a >= Int64(10) AND a IS NOT NULL"
615        );
616
617        // Test LtEq with column on LHS
618        let lteq_expr = PartitionExpr::new(
619            Operand::Column("a".to_string()),
620            RestrictedOp::LtEq,
621            Operand::Value(Value::Int64(10)),
622        );
623        assert_eq!(
624            lteq_expr.try_as_logical_expr().unwrap().to_string(),
625            "a <= Int64(10) OR a IS NULL"
626        );
627
628        let gteq_expr_rhs_column = PartitionExpr::new(
629            Operand::Value(Value::Int64(10)),
630            RestrictedOp::GtEq,
631            Operand::Column("a".to_string()),
632        );
633        assert_eq!(
634            gteq_expr_rhs_column
635                .try_as_logical_expr()
636                .unwrap()
637                .to_string(),
638            "a <= Int64(10) OR a IS NULL"
639        );
640
641        let lteq_expr_rhs_column = PartitionExpr::new(
642            Operand::Value(Value::Int64(10)),
643            RestrictedOp::LtEq,
644            Operand::Column("a".to_string()),
645        );
646        assert_eq!(
647            lteq_expr_rhs_column
648                .try_as_logical_expr()
649                .unwrap()
650                .to_string(),
651            "a >= Int64(10) AND a IS NOT NULL"
652        );
653
654        let and_expr = PartitionExpr::new(
655            Operand::Expr(PartitionExpr::new(
656                Operand::Column("a".to_string()),
657                RestrictedOp::LtEq,
658                Operand::Value(Value::Int64(10)),
659            )),
660            RestrictedOp::And,
661            Operand::Expr(PartitionExpr::new(
662                Operand::Column("b".to_string()),
663                RestrictedOp::Gt,
664                Operand::Value(Value::Int64(5)),
665            )),
666        );
667        assert_eq!(
668            and_expr.try_as_logical_expr().unwrap().to_string(),
669            "(a <= Int64(10) OR a IS NULL) AND b > Int64(5) AND b IS NOT NULL"
670        );
671
672        let and_expr = PartitionExpr::new(
673            Operand::Expr(PartitionExpr::new(
674                Operand::Column("a".to_string()),
675                RestrictedOp::LtEq,
676                Operand::Value(Value::Int64(10)),
677            )),
678            RestrictedOp::And,
679            Operand::Expr(PartitionExpr::new(
680                Operand::Column("a".to_string()),
681                RestrictedOp::Gt,
682                Operand::Value(Value::Int64(5)),
683            )),
684        );
685        assert_eq!(
686            and_expr.try_as_logical_expr().unwrap().to_string(),
687            "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
688        );
689
690        let and_expr_strict_lower = PartitionExpr::new(
691            Operand::Expr(PartitionExpr::new(
692                Operand::Column("a".to_string()),
693                RestrictedOp::Lt,
694                Operand::Value(Value::Int64(10)),
695            )),
696            RestrictedOp::And,
697            Operand::Expr(PartitionExpr::new(
698                Operand::Column("a".to_string()),
699                RestrictedOp::GtEq,
700                Operand::Value(Value::Int64(5)),
701            )),
702        );
703        assert_eq!(
704            and_expr_strict_lower
705                .try_as_logical_expr()
706                .unwrap()
707                .to_string(),
708            "(a < Int64(10) OR a IS NULL) AND a >= Int64(5) AND a IS NOT NULL"
709        );
710
711        let and_expr_rhs_column = PartitionExpr::new(
712            Operand::Expr(PartitionExpr::new(
713                Operand::Value(Value::Int64(10)),
714                RestrictedOp::GtEq,
715                Operand::Column("a".to_string()),
716            )),
717            RestrictedOp::And,
718            Operand::Expr(PartitionExpr::new(
719                Operand::Value(Value::Int64(5)),
720                RestrictedOp::Lt,
721                Operand::Column("a".to_string()),
722            )),
723        );
724        assert_eq!(
725            and_expr_rhs_column
726                .try_as_logical_expr()
727                .unwrap()
728                .to_string(),
729            "(a <= Int64(10) OR a IS NULL) AND a > Int64(5) AND a IS NOT NULL"
730        );
731
732        let or_expr_same_column = PartitionExpr::new(
733            Operand::Expr(PartitionExpr::new(
734                Operand::Column("a".to_string()),
735                RestrictedOp::LtEq,
736                Operand::Value(Value::Int64(10)),
737            )),
738            RestrictedOp::Or,
739            Operand::Expr(PartitionExpr::new(
740                Operand::Column("a".to_string()),
741                RestrictedOp::Gt,
742                Operand::Value(Value::Int64(5)),
743            )),
744        );
745        assert_eq!(
746            or_expr_same_column
747                .try_as_logical_expr()
748                .unwrap()
749                .to_string(),
750            "a <= Int64(10) OR a IS NULL OR a > Int64(5) AND a IS NOT NULL"
751        );
752    }
753
754    #[test]
755    fn test_try_as_logical_expr_rhs_column_without_canonicalize() {
756        let gt_expr_rhs_column = PartitionExpr {
757            lhs: Box::new(Operand::Value(Value::Int64(10))),
758            op: RestrictedOp::Gt,
759            rhs: Box::new(Operand::Column("a".to_string())),
760        };
761        assert_eq!(
762            gt_expr_rhs_column
763                .try_as_logical_expr()
764                .unwrap()
765                .to_string(),
766            "Int64(10) > a OR a IS NULL"
767        );
768
769        let gteq_expr_rhs_column = PartitionExpr {
770            lhs: Box::new(Operand::Value(Value::Int64(10))),
771            op: RestrictedOp::GtEq,
772            rhs: Box::new(Operand::Column("a".to_string())),
773        };
774        assert_eq!(
775            gteq_expr_rhs_column
776                .try_as_logical_expr()
777                .unwrap()
778                .to_string(),
779            "Int64(10) >= a OR a IS NULL"
780        );
781    }
782
783    #[test]
784    fn test_serde_partition_expr() {
785        let expr = PartitionExpr::new(
786            Operand::Column("a".to_string()),
787            RestrictedOp::Eq,
788            Operand::Value(Value::UInt32(10)),
789        );
790        let json = expr.as_json_str().unwrap();
791        assert_eq!(
792            json,
793            "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
794        );
795
796        let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
797        let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
798        let expected = PartitionExpr::new(
799            Operand::Column("a".to_string()),
800            RestrictedOp::GtEq,
801            Operand::Value(Value::UInt32(10)),
802        );
803        assert_eq!(expr2, expected);
804
805        // empty string
806        let json = "";
807        let expr3 = PartitionExpr::from_json_str(json).unwrap();
808        assert!(expr3.is_none());
809
810        // variants other than Expr
811        let json = r#""MaxValue""#;
812        let expr4 = PartitionExpr::from_json_str(json).unwrap();
813        assert!(expr4.is_none());
814
815        let json = r#"{"Value":{"UInt32":10}}"#;
816        let expr5 = PartitionExpr::from_json_str(json).unwrap();
817        assert!(expr5.is_none());
818    }
819
820    #[test]
821    fn test_collect_column_names() {
822        // Simple expression: col_a = 1 should give {col_a}
823        let expr = col("a").eq(Value::Int64(1));
824        let mut columns = HashSet::new();
825        expr.collect_column_names(&mut columns);
826        assert_eq!(columns.len(), 1);
827        assert!(columns.contains("a"));
828
829        // Compound AND with same column: col_a >= 0 AND col_a < 10 should give {col_a}
830        let expr = col("a")
831            .gt_eq(Value::Int64(0))
832            .and(col("a").lt(Value::Int64(10)));
833        let mut columns = HashSet::new();
834        expr.collect_column_names(&mut columns);
835        assert_eq!(columns.len(), 1);
836        assert!(columns.contains("a"));
837
838        // Multiple columns: col_a >= 0 AND col_b < 10 should give {col_a, col_b}
839        let expr = col("a")
840            .gt_eq(Value::Int64(0))
841            .and(col("b").lt(Value::Int64(10)));
842        let mut columns = HashSet::new();
843        expr.collect_column_names(&mut columns);
844        assert_eq!(columns.len(), 2);
845        assert!(columns.contains("a"));
846        assert!(columns.contains("b"));
847
848        // Nested expression: (col_a >= 0 AND col_b < 10) AND col_c = 5
849        let expr = col("a")
850            .gt_eq(Value::Int64(0))
851            .and(col("b").lt(Value::Int64(10)))
852            .and(col("c").eq(Value::Int64(5)));
853        let mut columns = HashSet::new();
854        expr.collect_column_names(&mut columns);
855        assert_eq!(columns.len(), 3);
856        assert!(columns.contains("a"));
857        assert!(columns.contains("b"));
858        assert!(columns.contains("c"));
859    }
860}