partition/
expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::fmt::{Debug, Display, Formatter};
17use std::sync::Arc;
18
19use api::v1::meta::Partition;
20use datafusion_common::{ScalarValue, ToDFSchema};
21use datafusion_expr::Expr;
22use datafusion_expr::execution_props::ExecutionProps;
23use datafusion_physical_expr::{PhysicalExpr, create_physical_expr};
24use datatypes::arrow;
25use datatypes::value::{
26    Value, duration_to_scalar_value, time_to_scalar_value, timestamp_to_scalar_value,
27};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30use sql::statements::value_to_sql_value;
31use sqlparser::ast::{BinaryOperator as ParserBinaryOperator, Expr as ParserExpr, Ident};
32
33use crate::error;
34use crate::partition::PartitionBound;
35
36/// Struct for partition expression. This can be converted back to sqlparser's [Expr].
37/// by [`Self::to_parser_expr`].
38///
39/// [Expr]: sqlparser::ast::Expr
40#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
41pub struct PartitionExpr {
42    pub lhs: Box<Operand>,
43    pub op: RestrictedOp,
44    pub rhs: Box<Operand>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
48pub enum Operand {
49    Column(String),
50    Value(Value),
51    Expr(PartitionExpr),
52}
53
54pub fn col(column_name: impl Into<String>) -> Operand {
55    Operand::Column(column_name.into())
56}
57
58impl From<Value> for Operand {
59    fn from(value: Value) -> Self {
60        Operand::Value(value)
61    }
62}
63
64impl Operand {
65    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
66        match self {
67            Self::Column(c) => Ok(datafusion_expr::col(format!(r#""{}""#, c))),
68            Self::Value(v) => {
69                let scalar_value = match v {
70                    Value::Boolean(v) => ScalarValue::Boolean(Some(*v)),
71                    Value::UInt8(v) => ScalarValue::UInt8(Some(*v)),
72                    Value::UInt16(v) => ScalarValue::UInt16(Some(*v)),
73                    Value::UInt32(v) => ScalarValue::UInt32(Some(*v)),
74                    Value::UInt64(v) => ScalarValue::UInt64(Some(*v)),
75                    Value::Int8(v) => ScalarValue::Int8(Some(*v)),
76                    Value::Int16(v) => ScalarValue::Int16(Some(*v)),
77                    Value::Int32(v) => ScalarValue::Int32(Some(*v)),
78                    Value::Int64(v) => ScalarValue::Int64(Some(*v)),
79                    Value::Float32(v) => ScalarValue::Float32(Some(v.0)),
80                    Value::Float64(v) => ScalarValue::Float64(Some(v.0)),
81                    Value::String(v) => ScalarValue::Utf8(Some(v.as_utf8().to_string())),
82                    Value::Binary(v) => ScalarValue::Binary(Some(v.to_vec())),
83                    Value::Date(v) => ScalarValue::Date32(Some(v.val())),
84                    Value::Null => ScalarValue::Null,
85                    Value::Timestamp(t) => timestamp_to_scalar_value(t.unit(), Some(t.value())),
86                    Value::Time(t) => time_to_scalar_value(*t.unit(), Some(t.value())).unwrap(),
87                    Value::IntervalYearMonth(v) => ScalarValue::IntervalYearMonth(Some(v.to_i32())),
88                    Value::IntervalDayTime(v) => ScalarValue::IntervalDayTime(Some((*v).into())),
89                    Value::IntervalMonthDayNano(v) => {
90                        ScalarValue::IntervalMonthDayNano(Some((*v).into()))
91                    }
92                    Value::Duration(d) => duration_to_scalar_value(d.unit(), Some(d.value())),
93                    Value::Decimal128(d) => {
94                        let (v, p, s) = d.to_scalar_value();
95                        ScalarValue::Decimal128(v, p, s)
96                    }
97                    other => {
98                        return error::UnsupportedPartitionExprValueSnafu {
99                            value: other.clone(),
100                        }
101                        .fail();
102                    }
103                };
104                Ok(datafusion_expr::lit(scalar_value))
105            }
106            Self::Expr(e) => e.try_as_logical_expr(),
107        }
108    }
109
110    pub fn lt(self, rhs: impl Into<Self>) -> PartitionExpr {
111        PartitionExpr::new(self, RestrictedOp::Lt, rhs.into())
112    }
113
114    pub fn gt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
115        PartitionExpr::new(self, RestrictedOp::GtEq, rhs.into())
116    }
117
118    pub fn eq(self, rhs: impl Into<Self>) -> PartitionExpr {
119        PartitionExpr::new(self, RestrictedOp::Eq, rhs.into())
120    }
121
122    pub fn not_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
123        PartitionExpr::new(self, RestrictedOp::NotEq, rhs.into())
124    }
125
126    pub fn gt(self, rhs: impl Into<Self>) -> PartitionExpr {
127        PartitionExpr::new(self, RestrictedOp::Gt, rhs.into())
128    }
129
130    pub fn lt_eq(self, rhs: impl Into<Self>) -> PartitionExpr {
131        PartitionExpr::new(self, RestrictedOp::LtEq, rhs.into())
132    }
133}
134
135impl Display for Operand {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        match self {
138            Self::Column(v) => write!(f, "{v}"),
139            Self::Value(v) => write!(f, "{v}"),
140            Self::Expr(v) => write!(f, "{v}"),
141        }
142    }
143}
144
145/// A restricted set of [Operator](datafusion_expr::Operator) that can be used in
146/// partition expressions.
147#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
148pub enum RestrictedOp {
149    // Evaluate to binary
150    Eq,
151    NotEq,
152    Lt,
153    LtEq,
154    Gt,
155    GtEq,
156
157    // Conjunction
158    And,
159    Or,
160}
161
162impl RestrictedOp {
163    pub fn try_from_parser(op: &ParserBinaryOperator) -> Option<Self> {
164        match op {
165            ParserBinaryOperator::Eq => Some(Self::Eq),
166            ParserBinaryOperator::NotEq => Some(Self::NotEq),
167            ParserBinaryOperator::Lt => Some(Self::Lt),
168            ParserBinaryOperator::LtEq => Some(Self::LtEq),
169            ParserBinaryOperator::Gt => Some(Self::Gt),
170            ParserBinaryOperator::GtEq => Some(Self::GtEq),
171            ParserBinaryOperator::And => Some(Self::And),
172            ParserBinaryOperator::Or => Some(Self::Or),
173            _ => None,
174        }
175    }
176
177    pub fn to_parser_op(&self) -> ParserBinaryOperator {
178        match self {
179            Self::Eq => ParserBinaryOperator::Eq,
180            Self::NotEq => ParserBinaryOperator::NotEq,
181            Self::Lt => ParserBinaryOperator::Lt,
182            Self::LtEq => ParserBinaryOperator::LtEq,
183            Self::Gt => ParserBinaryOperator::Gt,
184            Self::GtEq => ParserBinaryOperator::GtEq,
185            Self::And => ParserBinaryOperator::And,
186            Self::Or => ParserBinaryOperator::Or,
187        }
188    }
189
190    fn invert_for_swap(&self) -> Self {
191        match self {
192            Self::Eq => Self::Eq,
193            Self::NotEq => Self::NotEq,
194            Self::Lt => Self::Gt,
195            Self::LtEq => Self::GtEq,
196            Self::Gt => Self::Lt,
197            Self::GtEq => Self::LtEq,
198            Self::And => Self::And,
199            Self::Or => Self::Or,
200        }
201    }
202}
203impl Display for RestrictedOp {
204    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
205        match self {
206            Self::Eq => write!(f, "="),
207            Self::NotEq => write!(f, "<>"),
208            Self::Lt => write!(f, "<"),
209            Self::LtEq => write!(f, "<="),
210            Self::Gt => write!(f, ">"),
211            Self::GtEq => write!(f, ">="),
212            Self::And => write!(f, "AND"),
213            Self::Or => write!(f, "OR"),
214        }
215    }
216}
217
218impl PartitionExpr {
219    pub fn new(lhs: Operand, op: RestrictedOp, rhs: Operand) -> Self {
220        Self {
221            lhs: Box::new(lhs),
222            op,
223            rhs: Box::new(rhs),
224        }
225        .canonicalize()
226    }
227
228    /// Canonicalize to `Column op Value` form when possible for consistent equality checks.
229    pub fn canonicalize(self) -> Self {
230        let lhs = Self::canonicalize_operand(*self.lhs);
231        let rhs = Self::canonicalize_operand(*self.rhs);
232        let mut expr = Self {
233            lhs: Box::new(lhs),
234            op: self.op,
235            rhs: Box::new(rhs),
236        };
237
238        if matches!(&*expr.lhs, Operand::Value(_)) && matches!(&*expr.rhs, Operand::Column(_)) {
239            std::mem::swap(&mut expr.lhs, &mut expr.rhs);
240            expr.op = expr.op.invert_for_swap();
241        }
242
243        expr
244    }
245
246    fn canonicalize_operand(operand: Operand) -> Operand {
247        match operand {
248            Operand::Expr(expr) => Operand::Expr(expr.canonicalize()),
249            other => other,
250        }
251    }
252
253    /// Convert [Self] back to sqlparser's [Expr]
254    ///
255    /// [Expr]: ParserExpr
256    pub fn to_parser_expr(&self) -> ParserExpr {
257        // Safety: Partition rule won't contains unsupported value type.
258        // Otherwise it will be rejected by the parser.
259        let lhs = match &*self.lhs {
260            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
261            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
262            Operand::Expr(e) => e.to_parser_expr(),
263        };
264
265        let rhs = match &*self.rhs {
266            Operand::Column(c) => ParserExpr::Identifier(Ident::new(c.clone())),
267            Operand::Value(v) => ParserExpr::Value(value_to_sql_value(v).unwrap().into()),
268            Operand::Expr(e) => e.to_parser_expr(),
269        };
270
271        ParserExpr::BinaryOp {
272            left: Box::new(lhs),
273            op: self.op.to_parser_op(),
274            right: Box::new(rhs),
275        }
276    }
277
278    pub fn try_as_logical_expr(&self) -> error::Result<Expr> {
279        // Special handling for null equality.
280        // `col = NULL` -> `col IS NULL` to match SQL (DataFusion) semantics.
281        let lhs_is_null = matches!(self.lhs.as_ref(), Operand::Value(Value::Null));
282        let rhs_is_null = matches!(self.rhs.as_ref(), Operand::Value(Value::Null));
283
284        match (self.op.clone(), lhs_is_null, rhs_is_null) {
285            (RestrictedOp::Eq, _, true) => {
286                return Ok(self.lhs.try_as_logical_expr()?.is_null());
287            }
288            (RestrictedOp::Eq, true, _) => {
289                return Ok(self.rhs.try_as_logical_expr()?.is_null());
290            }
291            (RestrictedOp::NotEq, _, true) => {
292                return Ok(self.lhs.try_as_logical_expr()?.is_not_null());
293            }
294            (RestrictedOp::NotEq, true, _) => {
295                return Ok(self.rhs.try_as_logical_expr()?.is_not_null());
296            }
297            _ => {}
298        }
299
300        if matches!(
301            self.op,
302            RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
303        ) {
304            if matches!(self.lhs.as_ref(), Operand::Column(_)) {
305                let column_expr = self.lhs.try_as_logical_expr()?;
306                let other_expr = self.rhs.try_as_logical_expr()?;
307                let base = match self.op {
308                    RestrictedOp::Lt => column_expr.clone().lt(other_expr),
309                    RestrictedOp::LtEq => column_expr.clone().lt_eq(other_expr),
310                    RestrictedOp::Gt => column_expr.clone().gt(other_expr),
311                    RestrictedOp::GtEq => column_expr.clone().gt_eq(other_expr),
312                    _ => unreachable!(),
313                };
314                return Ok(datafusion_expr::or(base, column_expr.is_null()));
315            } else if matches!(self.rhs.as_ref(), Operand::Column(_)) {
316                let other_expr = self.lhs.try_as_logical_expr()?;
317                let column_expr = self.rhs.try_as_logical_expr()?;
318                let base = match self.op {
319                    RestrictedOp::Lt => other_expr.lt(column_expr.clone()),
320                    RestrictedOp::LtEq => other_expr.lt_eq(column_expr.clone()),
321                    RestrictedOp::Gt => other_expr.gt(column_expr.clone()),
322                    RestrictedOp::GtEq => other_expr.gt_eq(column_expr.clone()),
323                    _ => unreachable!(),
324                };
325                return Ok(datafusion_expr::or(base, column_expr.is_null()));
326            }
327        }
328
329        // Normal cases handling, without NULL
330        let lhs = self.lhs.try_as_logical_expr()?;
331        let rhs = self.rhs.try_as_logical_expr()?;
332
333        let expr = match &self.op {
334            RestrictedOp::And => datafusion_expr::and(lhs, rhs),
335            RestrictedOp::Or => datafusion_expr::or(lhs, rhs),
336            RestrictedOp::Gt => lhs.gt(rhs),
337            RestrictedOp::GtEq => lhs.gt_eq(rhs),
338            RestrictedOp::Lt => lhs.lt(rhs),
339            RestrictedOp::LtEq => lhs.lt_eq(rhs),
340            RestrictedOp::Eq => lhs.eq(rhs),
341            RestrictedOp::NotEq => lhs.not_eq(rhs),
342        };
343        Ok(expr)
344    }
345
346    /// Get the left-hand side operand
347    pub fn lhs(&self) -> &Operand {
348        &self.lhs
349    }
350
351    /// Get the right-hand side operand
352    pub fn rhs(&self) -> &Operand {
353        &self.rhs
354    }
355
356    /// Get the operation
357    pub fn op(&self) -> &RestrictedOp {
358        &self.op
359    }
360
361    pub fn try_as_physical_expr(
362        &self,
363        schema: &arrow::datatypes::SchemaRef,
364    ) -> error::Result<Arc<dyn PhysicalExpr>> {
365        let df_schema = schema
366            .clone()
367            .to_dfschema_ref()
368            .context(error::ToDFSchemaSnafu)?;
369        let execution_props = &ExecutionProps::default();
370        let expr = self.try_as_logical_expr()?;
371        create_physical_expr(&expr, &df_schema, execution_props)
372            .context(error::CreatePhysicalExprSnafu)
373    }
374
375    pub fn and(self, rhs: PartitionExpr) -> PartitionExpr {
376        PartitionExpr::new(Operand::Expr(self), RestrictedOp::And, Operand::Expr(rhs))
377    }
378
379    /// Serializes `PartitionExpr` to json string.
380    ///
381    /// Wraps `PartitionBound::Expr` for compatibility.
382    pub fn as_json_str(&self) -> error::Result<String> {
383        serde_json::to_string(&PartitionBound::Expr(self.clone()))
384            .context(error::SerializeJsonSnafu)
385    }
386
387    /// Deserializes `PartitionExpr` from json string.
388    ///
389    /// Deserializes to `PartitionBound` for compatibility.
390    pub fn from_json_str(s: &str) -> error::Result<Option<Self>> {
391        if s.is_empty() {
392            return Ok(None);
393        }
394
395        let bound: PartitionBound = serde_json::from_str(s).context(error::DeserializeJsonSnafu)?;
396        match bound {
397            PartitionBound::Expr(expr) => Ok(Some(expr.canonicalize())),
398            _ => Ok(None),
399        }
400    }
401
402    /// Converts [Self] to [Partition].
403    pub fn as_pb_partition(&self) -> error::Result<Partition> {
404        Ok(Partition {
405            expression: self.as_json_str()?,
406            ..Default::default()
407        })
408    }
409
410    /// Collects all column names referenced by this expression.
411    pub fn collect_column_names(&self, columns: &mut HashSet<String>) {
412        Self::collect_operand_columns(&self.lhs, columns);
413        Self::collect_operand_columns(&self.rhs, columns);
414    }
415
416    fn collect_operand_columns(operand: &Operand, columns: &mut HashSet<String>) {
417        match operand {
418            Operand::Column(c) => {
419                columns.insert(c.clone());
420            }
421            Operand::Expr(e) => {
422                e.collect_column_names(columns);
423            }
424            Operand::Value(_) => {}
425        }
426    }
427}
428
429impl Display for PartitionExpr {
430    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
431        write!(f, "{} {} {}", self.lhs, self.op, self.rhs)
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_partition_expr() {
441        let cases = [
442            (
443                Operand::Column("a".to_string()),
444                RestrictedOp::Eq,
445                Operand::Value(Value::UInt32(10)),
446                "a = 10",
447            ),
448            (
449                Operand::Column("a".to_string()),
450                RestrictedOp::NotEq,
451                Operand::Value(Value::UInt32(10)),
452                "a <> 10",
453            ),
454            (
455                Operand::Column("a".to_string()),
456                RestrictedOp::Lt,
457                Operand::Value(Value::UInt32(10)),
458                "a < 10",
459            ),
460            (
461                Operand::Column("a".to_string()),
462                RestrictedOp::LtEq,
463                Operand::Value(Value::UInt32(10)),
464                "a <= 10",
465            ),
466            (
467                Operand::Column("a".to_string()),
468                RestrictedOp::Gt,
469                Operand::Value(Value::UInt32(10)),
470                "a > 10",
471            ),
472            (
473                Operand::Column("a".to_string()),
474                RestrictedOp::GtEq,
475                Operand::Value(Value::UInt32(10)),
476                "a >= 10",
477            ),
478            (
479                Operand::Column("a".to_string()),
480                RestrictedOp::And,
481                Operand::Column("b".to_string()),
482                "a AND b",
483            ),
484            (
485                Operand::Column("a".to_string()),
486                RestrictedOp::Or,
487                Operand::Column("b".to_string()),
488                "a OR b",
489            ),
490            (
491                Operand::Column("a".to_string()),
492                RestrictedOp::Or,
493                Operand::Expr(PartitionExpr::new(
494                    Operand::Column("c".to_string()),
495                    RestrictedOp::And,
496                    Operand::Column("d".to_string()),
497                )),
498                "a OR c AND d",
499            ),
500        ];
501
502        for case in cases {
503            let expr = PartitionExpr::new(case.0, case.1.clone(), case.2);
504            assert_eq!(case.3, expr.to_string());
505        }
506    }
507
508    #[test]
509    fn test_try_as_logical_expr_null_equality() {
510        let eq_expr = PartitionExpr::new(
511            Operand::Column("a".to_string()),
512            RestrictedOp::Eq,
513            Operand::Value(Value::Null),
514        );
515        assert_eq!(
516            eq_expr.try_as_logical_expr().unwrap().to_string(),
517            "a IS NULL"
518        );
519
520        let neq_expr = PartitionExpr::new(
521            Operand::Column("a".to_string()),
522            RestrictedOp::NotEq,
523            Operand::Value(Value::Null),
524        );
525        assert_eq!(
526            neq_expr.try_as_logical_expr().unwrap().to_string(),
527            "a IS NOT NULL"
528        );
529    }
530
531    #[test]
532    fn test_try_as_logical_expr_null_range_comparison() {
533        // Test Lt with column on LHS
534        let lt_expr = PartitionExpr::new(
535            Operand::Column("a".to_string()),
536            RestrictedOp::Lt,
537            Operand::Value(Value::Int64(10)),
538        );
539        assert_eq!(
540            lt_expr.try_as_logical_expr().unwrap().to_string(),
541            "a < Int64(10) OR a IS NULL"
542        );
543
544        // Test Lt with column on RHS
545        let lt_expr_rhs_column = PartitionExpr::new(
546            Operand::Value(Value::Int64(10)),
547            RestrictedOp::Lt,
548            Operand::Column("a".to_string()),
549        );
550        assert_eq!(
551            lt_expr_rhs_column
552                .try_as_logical_expr()
553                .unwrap()
554                .to_string(),
555            "a > Int64(10) OR a IS NULL"
556        );
557
558        // Test Gt with column on LHS
559        let gt_expr = PartitionExpr::new(
560            Operand::Column("a".to_string()),
561            RestrictedOp::Gt,
562            Operand::Value(Value::Int64(10)),
563        );
564        assert_eq!(
565            gt_expr.try_as_logical_expr().unwrap().to_string(),
566            "a > Int64(10) OR a IS NULL"
567        );
568
569        // Test Gt with column on RHS
570        let gt_expr_rhs_column = PartitionExpr::new(
571            Operand::Value(Value::Int64(10)),
572            RestrictedOp::Gt,
573            Operand::Column("a".to_string()),
574        );
575        assert_eq!(
576            gt_expr_rhs_column
577                .try_as_logical_expr()
578                .unwrap()
579                .to_string(),
580            "a < Int64(10) OR a IS NULL"
581        );
582
583        // Test GtEq with column on LHS
584        let gteq_expr = PartitionExpr::new(
585            Operand::Column("a".to_string()),
586            RestrictedOp::GtEq,
587            Operand::Value(Value::Int64(10)),
588        );
589        assert_eq!(
590            gteq_expr.try_as_logical_expr().unwrap().to_string(),
591            "a >= Int64(10) OR a IS NULL"
592        );
593
594        // Test LtEq with column on LHS
595        let lteq_expr = PartitionExpr::new(
596            Operand::Column("a".to_string()),
597            RestrictedOp::LtEq,
598            Operand::Value(Value::Int64(10)),
599        );
600        assert_eq!(
601            lteq_expr.try_as_logical_expr().unwrap().to_string(),
602            "a <= Int64(10) OR a IS NULL"
603        );
604    }
605
606    #[test]
607    fn test_serde_partition_expr() {
608        let expr = PartitionExpr::new(
609            Operand::Column("a".to_string()),
610            RestrictedOp::Eq,
611            Operand::Value(Value::UInt32(10)),
612        );
613        let json = expr.as_json_str().unwrap();
614        assert_eq!(
615            json,
616            "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"Eq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
617        );
618
619        let json = r#"{"Expr":{"lhs":{"Column":"a"},"op":"GtEq","rhs":{"Value":{"UInt32":10}}}}"#;
620        let expr2 = PartitionExpr::from_json_str(json).unwrap().unwrap();
621        let expected = PartitionExpr::new(
622            Operand::Column("a".to_string()),
623            RestrictedOp::GtEq,
624            Operand::Value(Value::UInt32(10)),
625        );
626        assert_eq!(expr2, expected);
627
628        // empty string
629        let json = "";
630        let expr3 = PartitionExpr::from_json_str(json).unwrap();
631        assert!(expr3.is_none());
632
633        // variants other than Expr
634        let json = r#""MaxValue""#;
635        let expr4 = PartitionExpr::from_json_str(json).unwrap();
636        assert!(expr4.is_none());
637
638        let json = r#"{"Value":{"UInt32":10}}"#;
639        let expr5 = PartitionExpr::from_json_str(json).unwrap();
640        assert!(expr5.is_none());
641    }
642
643    #[test]
644    fn test_collect_column_names() {
645        // Simple expression: col_a = 1 should give {col_a}
646        let expr = col("a").eq(Value::Int64(1));
647        let mut columns = HashSet::new();
648        expr.collect_column_names(&mut columns);
649        assert_eq!(columns.len(), 1);
650        assert!(columns.contains("a"));
651
652        // Compound AND with same column: col_a >= 0 AND col_a < 10 should give {col_a}
653        let expr = col("a")
654            .gt_eq(Value::Int64(0))
655            .and(col("a").lt(Value::Int64(10)));
656        let mut columns = HashSet::new();
657        expr.collect_column_names(&mut columns);
658        assert_eq!(columns.len(), 1);
659        assert!(columns.contains("a"));
660
661        // Multiple columns: col_a >= 0 AND col_b < 10 should give {col_a, col_b}
662        let expr = col("a")
663            .gt_eq(Value::Int64(0))
664            .and(col("b").lt(Value::Int64(10)));
665        let mut columns = HashSet::new();
666        expr.collect_column_names(&mut columns);
667        assert_eq!(columns.len(), 2);
668        assert!(columns.contains("a"));
669        assert!(columns.contains("b"));
670
671        // Nested expression: (col_a >= 0 AND col_b < 10) AND col_c = 5
672        let expr = col("a")
673            .gt_eq(Value::Int64(0))
674            .and(col("b").lt(Value::Int64(10)))
675            .and(col("c").eq(Value::Int64(5)));
676        let mut columns = HashSet::new();
677        expr.collect_column_names(&mut columns);
678        assert_eq!(columns.len(), 3);
679        assert!(columns.contains("a"));
680        assert!(columns.contains("b"));
681        assert!(columns.contains("c"));
682    }
683}