Skip to main content

partition/utils/
split.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Expression split utilities for partition rules.
16//!
17//! This module provides a conservative way to split one partition expression `R`
18//! by a split expression `S` into:
19//! - `left = R AND S`
20//! - `right = R AND NOT(S)`
21//!
22//! The implementation intentionally reuses existing partition components
23//! (`Collider`, `simplify`, `PartitionChecker`) and degrades to no-split when an
24//! unsupported shape/type is encountered.
25
26use std::collections::{BTreeMap, HashSet};
27
28use datatypes::value::Value;
29use snafu::ensure;
30
31use crate::collider::Collider;
32use crate::error::{self, Result};
33use crate::expr::{Operand, PartitionExpr, RestrictedOp};
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ExprSplitDegradeReason {
37    UnsupportedType,
38    UnsupportedNotExpansion,
39    ColliderRejected,
40    EmptyBranch,
41}
42
43/// Splits one partition expression with a split predicate.
44///
45/// Returns `(left, right)` on success, where:
46/// - `left = R AND S`
47/// - `right = R AND NOT(S)`
48///
49/// Supported shape:
50/// - `split_expr` must be a single atomic range predicate (`<`, `<=`, `>`, `>=`).
51/// - `base_expr` must be a pure `AND` tree of atomic range predicates, possibly
52///   across unrelated columns.
53///
54/// Returns [`ExprSplitDegradeReason`] when this cannot safely process the shape/type.
55pub fn split_partition_expr(
56    base_expr: PartitionExpr,
57    split_expr: PartitionExpr,
58) -> std::result::Result<(PartitionExpr, PartitionExpr), ExprSplitDegradeReason> {
59    let base = base_expr.canonicalize();
60    let split = split_expr.canonicalize();
61
62    if validate_supported_expr(&base).is_err() || validate_supported_expr(&split).is_err() {
63        return Err(ExprSplitDegradeReason::UnsupportedType);
64    }
65
66    if !validate_base_expr_shape(&base) || !validate_split_expr_shape(&split) {
67        return Err(ExprSplitDegradeReason::UnsupportedType);
68    }
69
70    let not_split = match negate_split_expr(&split) {
71        Ok(expr) => expr,
72        Err(_) => {
73            return Err(ExprSplitDegradeReason::UnsupportedNotExpansion);
74        }
75    };
76
77    let left_raw = base.clone().and(split);
78    let right_raw = base.clone().and(not_split);
79
80    if Collider::new(std::slice::from_ref(&left_raw)).is_err()
81        || Collider::new(std::slice::from_ref(&right_raw)).is_err()
82    {
83        return Err(ExprSplitDegradeReason::ColliderRejected);
84    }
85
86    let left_expr = simplify_and_bounds(left_raw);
87    let right_expr = simplify_and_bounds(right_raw);
88
89    if is_empty_and_conjunction(&left_expr) || is_empty_and_conjunction(&right_expr) {
90        return Err(ExprSplitDegradeReason::EmptyBranch);
91    }
92
93    Ok((left_expr, right_expr))
94}
95
96/// Detects whether a pure conjunction expression is definitely unsatisfiable.
97///
98/// Scope and intent:
99/// - This checker is intentionally conservative.
100/// - It only analyzes expressions that can be flattened into:
101///   `atom1 AND atom2 AND ...`
102/// - If any `OR` is present, it returns `false` (unknown / not handled here).
103///
104/// Strategy:
105/// - For each column, keep only the tightest lower bound (`>` / `>=`) and
106///   tightest upper bound (`<` / `<=`).
107/// - `=` is treated as both lower and upper bound at the same value.
108/// - `!=` is tracked per column to catch direct conflicts with `=`.
109/// - After bounds are collected, the conjunction is empty iff for any column:
110///   - lower value is greater than upper value, or
111///   - lower value equals upper value but at least one bound is exclusive.
112/// - For discrete domains (`Int*`, `UInt*`), adjacent open bounds with no
113///   representable value in between are also treated as empty.
114///
115/// Notes:
116/// - This is still a conservative fast path focused on conjunction emptiness
117///   detection for split degradation.
118/// - `split_partition_expr` currently restricts its main path to range-only
119///   conjunctions, but this helper remains slightly more general so shared
120///   bound collection and direct conflict checks stay reusable.
121fn is_empty_and_conjunction(expr: &PartitionExpr) -> bool {
122    let Some(collected) = collect_conjunction_bounds(expr) else {
123        return false;
124    };
125
126    if collected.has_conflict {
127        return true;
128    }
129
130    let CollectedConjunction {
131        lowers,
132        uppers,
133        not_equals,
134        passthrough: _,
135        has_conflict: _,
136    } = collected;
137
138    if lowers
139        .iter()
140        .any(|(col, lower)| !uppers.contains_key(col) && is_strictly_greater_than_domain_max(lower))
141    {
142        return true;
143    }
144
145    // Check for contradiction between collected lower/upper bounds per column.
146    lowers.into_iter().any(|(col, lower)| {
147        let Some(upper) = uppers.get(&col) else {
148            return false;
149        };
150
151        match lower.value.partial_cmp(&upper.value) {
152            Some(std::cmp::Ordering::Greater) => true,
153            Some(std::cmp::Ordering::Equal) => {
154                if !lower.inclusive || !upper.inclusive {
155                    true
156                } else {
157                    not_equals
158                        .get(&col)
159                        .is_some_and(|excluded| excluded.contains(&lower.value))
160                }
161            }
162            Some(std::cmp::Ordering::Less) => {
163                match (
164                    discrete_value_index(&lower.value),
165                    discrete_value_index(&upper.value),
166                ) {
167                    (Some(lower_idx), Some(upper_idx)) => {
168                        let min_candidate = if lower.inclusive {
169                            Some(lower_idx)
170                        } else {
171                            lower_idx.checked_add(1)
172                        };
173                        let max_candidate = if upper.inclusive {
174                            Some(upper_idx)
175                        } else {
176                            upper_idx.checked_sub(1)
177                        };
178                        match (min_candidate, max_candidate) {
179                            (Some(min_val), Some(max_val)) => min_val > max_val,
180                            _ => true,
181                        }
182                    }
183                    _ => false,
184                }
185            }
186            _ => false,
187        }
188    })
189}
190
191fn discrete_value_index(v: &Value) -> Option<i128> {
192    match v {
193        Value::Int8(x) => Some(*x as i128),
194        Value::Int16(x) => Some(*x as i128),
195        Value::Int32(x) => Some(*x as i128),
196        Value::Int64(x) => Some(*x as i128),
197        Value::UInt8(x) => Some(*x as i128),
198        Value::UInt16(x) => Some(*x as i128),
199        Value::UInt32(x) => Some(*x as i128),
200        Value::UInt64(x) => Some(*x as i128),
201        _ => None,
202    }
203}
204
205fn is_strictly_greater_than_domain_max(bound: &LowerBound) -> bool {
206    if bound.inclusive {
207        return false;
208    }
209
210    is_domain_max_value(&bound.value)
211}
212
213fn is_domain_max_value(v: &Value) -> bool {
214    match v {
215        Value::Float32(v) => v.0 == f32::MAX,
216        Value::Float64(v) => v.0 == f64::MAX,
217        Value::UInt8(v) => *v == u8::MAX,
218        Value::UInt16(v) => *v == u16::MAX,
219        Value::UInt32(v) => *v == u32::MAX,
220        Value::UInt64(v) => *v == u64::MAX,
221        Value::Int8(v) => *v == i8::MAX,
222        Value::Int16(v) => *v == i16::MAX,
223        Value::Int32(v) => *v == i32::MAX,
224        Value::Int64(v) => *v == i64::MAX,
225        _ => false,
226    }
227}
228
229/// Rewrites `NOT(expr)` into an equivalent `PartitionExpr` without introducing a unary NOT node.
230///
231/// Why this function exists:
232/// - `PartitionExpr` only models binary operators.
233/// - Cut logic needs `R AND NOT(S)`.
234/// - We therefore rewrite `NOT(S)` into an equivalent binary-expression tree.
235///
236/// Rewrite rules:
237/// - Atomic comparisons:
238///   - `=`  <-> `!=`
239///   - `<`  <-> `>=`
240///   - `<=` <-> `>`
241///   - `>`  <-> `<=`
242///   - `>=` <-> `<`
243/// - Boolean composition:
244///   - `NOT(A AND B)` => `NOT(A) OR NOT(B)`
245///   - `NOT(A OR B)`  => `NOT(A) AND NOT(B)`
246///
247/// Failure behavior:
248/// - For `AND/OR`, both sides must be `Operand::Expr`; otherwise returns `NoExprOperand`.
249/// - Any unsupported shape bubbles up as an error and the caller degrades to no-split.
250pub fn negate_split_expr(expr: &PartitionExpr) -> Result<PartitionExpr> {
251    match expr.op() {
252        RestrictedOp::Eq
253        | RestrictedOp::NotEq
254        | RestrictedOp::Lt
255        | RestrictedOp::LtEq
256        | RestrictedOp::Gt
257        | RestrictedOp::GtEq => {
258            // Atomic negate by operator inversion.
259            let op = match expr.op() {
260                RestrictedOp::Eq => RestrictedOp::NotEq,
261                RestrictedOp::NotEq => RestrictedOp::Eq,
262                RestrictedOp::Lt => RestrictedOp::GtEq,
263                RestrictedOp::LtEq => RestrictedOp::Gt,
264                RestrictedOp::Gt => RestrictedOp::LtEq,
265                RestrictedOp::GtEq => RestrictedOp::Lt,
266                RestrictedOp::And | RestrictedOp::Or => unreachable!(),
267            };
268            Ok(PartitionExpr::new(
269                expr.lhs().clone(),
270                op,
271                expr.rhs().clone(),
272            ))
273        }
274        RestrictedOp::And | RestrictedOp::Or => {
275            // De Morgan transform on recursive sub-expressions.
276            let lhs = match expr.lhs() {
277                Operand::Expr(lhs) => lhs,
278                other => {
279                    return error::NoExprOperandSnafu {
280                        operand: other.clone(),
281                    }
282                    .fail();
283                }
284            };
285            let rhs = match expr.rhs() {
286                Operand::Expr(rhs) => rhs,
287                other => {
288                    return error::NoExprOperandSnafu {
289                        operand: other.clone(),
290                    }
291                    .fail();
292                }
293            };
294            let not_lhs = negate_split_expr(lhs)?;
295            let not_rhs = negate_split_expr(rhs)?;
296            let op = match expr.op() {
297                // NOT(A AND B) => NOT(A) OR NOT(B)
298                RestrictedOp::And => RestrictedOp::Or,
299                // NOT(A OR B) => NOT(A) AND NOT(B)
300                RestrictedOp::Or => RestrictedOp::And,
301                _ => unreachable!(),
302            };
303            Ok(PartitionExpr::new(
304                Operand::Expr(not_lhs),
305                op,
306                Operand::Expr(not_rhs),
307            ))
308        }
309    }
310}
311
312pub fn validate_supported_expr(expr: &PartitionExpr) -> Result<()> {
313    match expr.op() {
314        RestrictedOp::And | RestrictedOp::Or => {
315            let lhs = match expr.lhs() {
316                Operand::Expr(lhs) => lhs,
317                other => {
318                    return error::NoExprOperandSnafu {
319                        operand: other.clone(),
320                    }
321                    .fail();
322                }
323            };
324            let rhs = match expr.rhs() {
325                Operand::Expr(rhs) => rhs,
326                other => {
327                    return error::NoExprOperandSnafu {
328                        operand: other.clone(),
329                    }
330                    .fail();
331                }
332            };
333            validate_supported_expr(lhs)?;
334            validate_supported_expr(rhs)?;
335            Ok(())
336        }
337        _ => validate_atomic(expr),
338    }
339}
340
341fn validate_atomic(expr: &PartitionExpr) -> Result<()> {
342    let (lhs, rhs) = (expr.lhs(), expr.rhs());
343    match (lhs, rhs) {
344        (Operand::Column(_), Operand::Value(v)) | (Operand::Value(v), Operand::Column(_)) => {
345            ensure!(
346                is_supported_value(v),
347                error::InvalidExprSnafu { expr: expr.clone() }
348            );
349            if is_nan_value(v) || is_infinite_value(v) {
350                return error::InvalidExprSnafu { expr: expr.clone() }.fail();
351            }
352            Ok(())
353        }
354        _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
355    }
356}
357
358/// Validates that `base_expr` stays within the range-only split contract.
359///
360/// Scope and intent:
361/// - The split utility only handles interval-style partition predicates.
362/// - `base_expr` may mention multiple columns, but it must remain a pure `AND`
363///   tree of atomic range predicates.
364fn validate_base_expr_shape(expr: &PartitionExpr) -> bool {
365    let mut atoms = Vec::new();
366    if !collect_and_atoms(expr, &mut atoms) {
367        return false;
368    }
369
370    atoms
371        .into_iter()
372        .all(|atom| is_atomic_range_expr(&atom.canonicalize()))
373}
374
375/// Validates that `split_expr` is a single atomic range predicate.
376///
377/// This restriction keeps `NOT(split_expr)` in the same range-only subset so the
378/// resulting left/right branches stay within the supported contract.
379fn validate_split_expr_shape(expr: &PartitionExpr) -> bool {
380    is_atomic_range_expr(expr)
381}
382
383/// Returns whether `expr` is an atomic `column op value` range predicate.
384///
385/// Supported operators are limited to `<`, `<=`, `>`, and `>=`.
386fn is_atomic_range_expr(expr: &PartitionExpr) -> bool {
387    atom_col_op_val(expr).is_some_and(|(_, op, _)| {
388        matches!(
389            op,
390            RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
391        )
392    })
393}
394
395fn is_supported_value(v: &Value) -> bool {
396    matches!(
397        v,
398        Value::Int8(_)
399            | Value::Int16(_)
400            | Value::Int32(_)
401            | Value::Int64(_)
402            | Value::UInt8(_)
403            | Value::UInt16(_)
404            | Value::UInt32(_)
405            | Value::UInt64(_)
406            | Value::Float32(_)
407            | Value::Float64(_)
408            | Value::String(_)
409    )
410}
411
412fn is_nan_value(v: &Value) -> bool {
413    match v {
414        Value::Float32(x) => x.0.is_nan(),
415        Value::Float64(x) => x.0.is_nan(),
416        _ => false,
417    }
418}
419
420fn is_infinite_value(v: &Value) -> bool {
421    match v {
422        Value::Float32(x) => x.0.is_infinite(),
423        Value::Float64(x) => x.0.is_infinite(),
424        _ => false,
425    }
426}
427
428#[derive(Debug, Clone)]
429struct LowerBound {
430    value: Value,
431    inclusive: bool,
432}
433
434#[derive(Debug, Clone)]
435struct UpperBound {
436    value: Value,
437    inclusive: bool,
438}
439
440struct CollectedConjunction {
441    lowers: BTreeMap<String, LowerBound>,
442    uppers: BTreeMap<String, UpperBound>,
443    not_equals: BTreeMap<String, HashSet<Value>>,
444    passthrough: Vec<PartitionExpr>,
445    has_conflict: bool,
446}
447
448/// Simplifies conjunction-only range predicates by keeping the tightest bounds per column.
449///
450/// This pass is intentionally conservative and only runs when the whole expression
451/// can be flattened into `atom1 AND atom2 AND ...` without any `OR` node.
452///
453/// Behavior:
454/// - For each column, collect all lower-bound predicates (`>` / `>=`) and keep the
455///   tightest one.
456/// - For each column, collect all upper-bound predicates (`<` / `<=`) and keep the
457///   tightest one.
458/// - Non-range predicates (for example `=` / `!=`) are preserved as-is.
459/// - If the expression contains `OR`, this function returns the original expression.
460///
461/// Tightness rules:
462/// - Upper bound: smaller value is tighter; if equal value, exclusive (`<`) is tighter.
463/// - Lower bound: larger value is tighter; if equal value, exclusive (`>`) is tighter.
464///
465/// Examples:
466/// - `a <= 10 AND a < 10` => `a < 10`
467/// - `a >= 10 AND a > 10` => `a > 10`
468/// - `a < 10 AND a < 5`   => `a < 5`
469fn simplify_and_bounds(expr: PartitionExpr) -> PartitionExpr {
470    let Some(collected) = collect_conjunction_bounds(&expr) else {
471        return expr;
472    };
473
474    let CollectedConjunction {
475        lowers,
476        uppers,
477        not_equals: _,
478        passthrough,
479        has_conflict: _,
480    } = collected;
481
482    let mut out = passthrough;
483    out.extend(lowers.into_iter().map(|(col, lower)| {
484        PartitionExpr::new(
485            Operand::Column(col),
486            if lower.inclusive {
487                RestrictedOp::GtEq
488            } else {
489                RestrictedOp::Gt
490            },
491            Operand::Value(lower.value),
492        )
493    }));
494    out.extend(uppers.into_iter().map(|(col, upper)| {
495        PartitionExpr::new(
496            Operand::Column(col),
497            if upper.inclusive {
498                RestrictedOp::LtEq
499            } else {
500                RestrictedOp::Lt
501            },
502            Operand::Value(upper.value),
503        )
504    }));
505
506    fold_and_exprs(out).unwrap_or(expr)
507}
508
509/// Flattens an expression into atomic terms when it is a pure conjunction tree.
510///
511/// Returns `false` if any `OR` is encountered, signaling caller to skip this
512/// simplification path.
513fn collect_and_atoms(expr: &PartitionExpr, out: &mut Vec<PartitionExpr>) -> bool {
514    match expr.op() {
515        RestrictedOp::And => {
516            let lhs = match expr.lhs() {
517                Operand::Expr(lhs) => lhs,
518                _ => return false,
519            };
520            let rhs = match expr.rhs() {
521                Operand::Expr(rhs) => rhs,
522                _ => return false,
523            };
524            collect_and_atoms(lhs, out) && collect_and_atoms(rhs, out)
525        }
526        RestrictedOp::Or => false,
527        _ => {
528            out.push(expr.clone());
529            true
530        }
531    }
532}
533
534/// Extracts `(column, op, value)` from a canonicalized atomic expression.
535fn atom_col_op_val(expr: &PartitionExpr) -> Option<(String, RestrictedOp, Value)> {
536    let lhs = expr.lhs();
537    let rhs = expr.rhs();
538    match (lhs, rhs) {
539        (Operand::Column(col), Operand::Value(v)) => {
540            Some((col.clone(), expr.op().clone(), v.clone()))
541        }
542        _ => None,
543    }
544}
545
546/// Collects per-column bounds and passthrough atoms from a pure `AND` tree.
547///
548/// Scope and intent:
549/// - This helper is shared by [`is_empty_and_conjunction`] and
550///   [`simplify_and_bounds`] so both paths interpret conjunction atoms the same
551///   way.
552/// - It only handles conjunction-only expressions. If any `OR` is present, it
553///   returns `None` and lets callers keep their conservative fallback behavior.
554///
555/// Behavior:
556/// - Tightest lower/upper bounds are recorded per column.
557/// - `=` contributes both a lower and an upper bound at the same value.
558/// - `!=` and non-range atoms are preserved in `passthrough` for callers that
559///   need to rebuild the conjunction.
560/// - `has_conflict` is set when atomic constraints already contradict each
561///   other (for example `a = 1 AND a <> 1`).
562///
563/// Notes:
564/// - This helper is intentionally a bit more general than the current
565///   `split_partition_expr` contract, which now only feeds range-only
566///   conjunctions into the main split path.
567fn collect_conjunction_bounds(expr: &PartitionExpr) -> Option<CollectedConjunction> {
568    let mut atoms = Vec::new();
569    if !collect_and_atoms(expr, &mut atoms) {
570        return None;
571    }
572
573    let mut lowers = BTreeMap::new();
574    let mut uppers = BTreeMap::new();
575    let mut equals = BTreeMap::new();
576    let mut not_equals: BTreeMap<String, HashSet<Value>> = BTreeMap::new();
577    let mut passthrough = Vec::new();
578    let mut seen = HashSet::new();
579    let mut has_conflict = false;
580
581    for atom in atoms {
582        let atom = atom.canonicalize();
583        let Some((col, op, val)) = atom_col_op_val(&atom) else {
584            push_unique_expr(&mut passthrough, &mut seen, atom);
585            continue;
586        };
587
588        match op {
589            RestrictedOp::Lt | RestrictedOp::LtEq => update_upper_bound(
590                &mut uppers,
591                col,
592                UpperBound {
593                    value: val,
594                    inclusive: matches!(op, RestrictedOp::LtEq),
595                },
596            ),
597            RestrictedOp::Gt | RestrictedOp::GtEq => update_lower_bound(
598                &mut lowers,
599                col,
600                LowerBound {
601                    value: val,
602                    inclusive: matches!(op, RestrictedOp::GtEq),
603                },
604            ),
605            RestrictedOp::Eq => {
606                if let Some(existing) = equals.get(&col)
607                    && existing != &val
608                {
609                    has_conflict = true;
610                }
611                if not_equals
612                    .get(&col)
613                    .is_some_and(|excluded| excluded.contains(&val))
614                {
615                    has_conflict = true;
616                }
617                equals.insert(col.clone(), val.clone());
618                update_lower_bound(
619                    &mut lowers,
620                    col.clone(),
621                    LowerBound {
622                        value: val.clone(),
623                        inclusive: true,
624                    },
625                );
626                update_upper_bound(
627                    &mut uppers,
628                    col,
629                    UpperBound {
630                        value: val,
631                        inclusive: true,
632                    },
633                );
634                push_unique_expr(&mut passthrough, &mut seen, atom);
635            }
636            RestrictedOp::NotEq => {
637                if equals.get(&col).is_some_and(|eq| eq == &val) {
638                    has_conflict = true;
639                }
640                not_equals.entry(col).or_default().insert(val);
641                push_unique_expr(&mut passthrough, &mut seen, atom);
642            }
643            RestrictedOp::And | RestrictedOp::Or => {
644                push_unique_expr(&mut passthrough, &mut seen, atom);
645            }
646        }
647    }
648
649    Some(CollectedConjunction {
650        lowers,
651        uppers,
652        not_equals,
653        passthrough,
654        has_conflict,
655    })
656}
657
658fn push_unique_expr(out: &mut Vec<PartitionExpr>, seen: &mut HashSet<String>, expr: PartitionExpr) {
659    let key = expr.to_string();
660    if seen.insert(key) {
661        out.push(expr);
662    }
663}
664
665fn update_upper_bound(
666    uppers: &mut BTreeMap<String, UpperBound>,
667    col: String,
668    candidate: UpperBound,
669) {
670    match uppers.get_mut(&col) {
671        Some(current) => {
672            if prefer_upper(&candidate, current) {
673                *current = candidate;
674            }
675        }
676        None => {
677            uppers.insert(col, candidate);
678        }
679    }
680}
681
682fn update_lower_bound(
683    lowers: &mut BTreeMap<String, LowerBound>,
684    col: String,
685    candidate: LowerBound,
686) {
687    match lowers.get_mut(&col) {
688        Some(current) => {
689            if prefer_lower(&candidate, current) {
690                *current = candidate;
691            }
692        }
693        None => {
694            lowers.insert(col, candidate);
695        }
696    }
697}
698
699fn prefer_upper(candidate: &UpperBound, current: &UpperBound) -> bool {
700    // "Smaller" upper bound is tighter. For equal value, exclusive is tighter.
701    match candidate.value.partial_cmp(&current.value) {
702        Some(std::cmp::Ordering::Less) => true,
703        Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive,
704        _ => false,
705    }
706}
707
708fn prefer_lower(candidate: &LowerBound, current: &LowerBound) -> bool {
709    // "Larger" lower bound is tighter. For equal value, exclusive is tighter.
710    match candidate.value.partial_cmp(&current.value) {
711        Some(std::cmp::Ordering::Greater) => true,
712        Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive,
713        _ => false,
714    }
715}
716
717/// Folds a list of expressions into a left-associated AND tree.
718/// Returns `None` if the input list is empty.
719fn fold_and_exprs(mut exprs: Vec<PartitionExpr>) -> Option<PartitionExpr> {
720    exprs.drain(..).reduce(|acc, next| acc.and(next))
721}
722
723#[cfg(test)]
724mod tests {
725    use datatypes::value::{OrderedFloat, Value};
726    use store_api::storage::RegionNumber;
727
728    use super::*;
729    use crate::checker::PartitionChecker;
730    use crate::expr::col;
731    use crate::multi_dim::MultiDimPartitionRule;
732
733    fn validate_cut_result_with_checker(
734        original_rule_exprs: &[PartitionExpr],
735        replaced_index: usize,
736        left: &Option<PartitionExpr>,
737        right: &Option<PartitionExpr>,
738        partition_columns: Vec<String>,
739        regions: Vec<RegionNumber>,
740    ) -> Result<()> {
741        ensure!(
742            replaced_index < original_rule_exprs.len(),
743            error::UnexpectedSnafu {
744                err_msg: format!(
745                    "replaced index out of bounds: {replaced_index} >= {}",
746                    original_rule_exprs.len()
747                )
748            }
749        );
750
751        let mut exprs = original_rule_exprs.to_vec();
752        exprs.remove(replaced_index);
753        exprs.extend(left.iter().cloned());
754        exprs.extend(right.iter().cloned());
755
756        ensure!(
757            !exprs.is_empty(),
758            error::UnexpectedSnafu {
759                err_msg: "empty rule exprs after split".to_string()
760            }
761        );
762
763        let final_regions = if regions.len() == exprs.len() {
764            regions
765        } else {
766            (0..exprs.len() as RegionNumber).collect()
767        };
768
769        let rule = MultiDimPartitionRule::try_new(partition_columns, final_regions, exprs, false)?;
770        let checker = PartitionChecker::try_new(&rule)?;
771        checker.check()?;
772        Ok(())
773    }
774
775    #[test]
776    fn test_split_simple_range() {
777        // R: a < 10
778        let base = col("a").lt(Value::Int64(10));
779        // S: a < 5
780        let split = col("a").lt(Value::Int64(5));
781        let (left, right) = split_partition_expr(base, split).unwrap();
782        // left = R AND S = a < 5
783        assert_eq!(left.to_string(), "a < 5");
784        // right = R AND NOT(S) = a >= 5 AND a < 10
785        assert_eq!(right.to_string(), "a >= 5 AND a < 10");
786    }
787
788    #[test]
789    fn test_split_string_interval() {
790        // R: v > 'm' AND v < 'n'
791        let base = col("v")
792            .gt(Value::String("m".into()))
793            .and(col("v").lt(Value::String("n".into())));
794        // S: v < 'm~'
795        let split = col("v").lt(Value::String("m~".into()));
796        let (left, right) = split_partition_expr(base, split).unwrap();
797        // left = (v > m AND v < n) AND (v < m~) -> v > m AND v < m~
798        assert_eq!(left.to_string(), "v > m AND v < m~");
799        // right = (v > m AND v < n) AND (v >= m~) -> v >= m~ AND v < n
800        assert_eq!(right.to_string(), "v >= m~ AND v < n");
801    }
802
803    #[test]
804    fn test_split_numeric_interval_mid_split() {
805        // R: a > 3 AND a < 10
806        let base = col("a")
807            .gt(Value::Int64(3))
808            .and(col("a").lt(Value::Int64(10)));
809        // S: a < 5
810        let split = col("a").lt(Value::Int64(5));
811
812        let (left, right) = split_partition_expr(base, split).unwrap();
813
814        // left = (a > 3 AND a < 10) AND (a < 5) -> a > 3 AND a < 5
815        assert_eq!(left.to_string(), "a > 3 AND a < 5");
816        // right = (a > 3 AND a < 10) AND (a >= 5) -> a >= 5 AND a < 10
817        assert_eq!(right.to_string(), "a >= 5 AND a < 10");
818    }
819
820    #[test]
821    fn test_split_base_expr_allows_unrelated_range_columns() {
822        // R: a > 20 AND b < 20
823        let base = col("a")
824            .gt(Value::Int64(20))
825            .and(col("b").lt(Value::Int64(20)));
826        // S: a < 30
827        let split = col("a").lt(Value::Int64(30));
828
829        let (left, right) = split_partition_expr(base, split).unwrap();
830
831        // left keeps the unrelated `b < 20` bound while splitting column `a`.
832        assert_eq!(left.to_string(), "a > 20 AND a < 30 AND b < 20");
833        // right also preserves the unrelated column bound.
834        assert_eq!(right.to_string(), "a >= 30 AND b < 20");
835    }
836
837    #[test]
838    fn test_split_degrade_on_unsupported_type() {
839        // intentionally excludes boolean from split-able value types.
840        let base = col("a").eq(Value::Boolean(true));
841        let split = col("a").eq(Value::Boolean(true));
842
843        let result = split_partition_expr(base, split);
844        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
845    }
846
847    #[test]
848    fn test_validate_cut_result_with_checker() {
849        // Original partition set: a < 10, a >= 10
850        let original = vec![
851            col("a").lt(Value::Int64(10)),
852            col("a").gt_eq(Value::Int64(10)),
853        ];
854        let left = Some(col("a").lt(Value::Int64(5)));
855        let right = Some(
856            col("a")
857                .gt_eq(Value::Int64(5))
858                .and(col("a").lt(Value::Int64(10))),
859        );
860
861        validate_cut_result_with_checker(
862            &original,
863            0,
864            &left,
865            &right,
866            vec!["a".to_string()],
867            vec![1, 2, 3],
868        )
869        .unwrap();
870    }
871
872    #[test]
873    fn test_split_degrade_on_empty_branch() {
874        // R: a < 10
875        let base = col("a").lt(Value::Int64(10));
876        // S: a < 20
877        let split = col("a").lt(Value::Int64(20));
878
879        // right = (a < 10) AND (a >= 20) is unsatisfiable, should degrade.
880        let result = split_partition_expr(base, split);
881        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
882    }
883
884    #[test]
885    fn test_split_rejects_eq_in_base_expr() {
886        // R: a = 5 falls outside the range-only base_expr contract.
887        let base = col("a").eq(Value::Int64(5));
888        // S: a < 6 remains a valid range split.
889        let split = col("a").lt(Value::Int64(6));
890
891        let result = split_partition_expr(base, split);
892        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
893    }
894
895    #[test]
896    fn test_split_degrade_on_discrete_gap_int() {
897        // R: a < 5
898        let base = col("a").lt(Value::Int64(5));
899        // S: a <= 4
900        let split = col("a").lt_eq(Value::Int64(4));
901
902        // right = (a < 5) AND (a > 4) has no integer solution, should degrade.
903        let result = split_partition_expr(base, split);
904        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
905    }
906
907    #[test]
908    fn test_split_degrade_on_unsupported_date_type() {
909        // Date is intentionally excluded from split-supported value types.
910        let base = col("d").lt(Value::Date(5.into()));
911        let split = col("d").lt_eq(Value::Date(4.into()));
912
913        let result = split_partition_expr(base, split);
914        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
915    }
916
917    #[test]
918    fn test_split_degrade_on_unsupported_timestamp_type() {
919        // Timestamp is intentionally excluded from split-supported value types.
920        let base = col("ts").lt(Value::Timestamp(0.into()));
921        let split = col("ts").lt_eq(Value::Timestamp(1.into()));
922
923        let result = split_partition_expr(base, split);
924        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
925    }
926
927    #[test]
928    fn test_split_rejects_not_eq_in_split_expr() {
929        // R: a >= 5 AND a <= 5
930        let base = col("a")
931            .gt_eq(Value::Int64(5))
932            .and(col("a").lt_eq(Value::Int64(5)));
933        // S: a <> 5 falls outside the range-only split_expr contract.
934        let split = col("a").not_eq(Value::Int64(5));
935
936        let result = split_partition_expr(base, split);
937        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
938    }
939
940    #[test]
941    fn test_split_rejects_eq_in_split_expr() {
942        // R: a >= 5 AND a <= 5
943        let base = col("a")
944            .gt_eq(Value::Int64(5))
945            .and(col("a").lt_eq(Value::Int64(5)));
946        // S: a = 5 falls outside the range-only split_expr contract.
947        let split = col("a").eq(Value::Int64(5));
948
949        let result = split_partition_expr(base, split);
950        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
951    }
952
953    #[test]
954    fn test_split_degrade_on_uint_one_sided_impossible_upper_bound() {
955        // R: a < 10 (UInt64 domain)
956        let base = col("a").lt(Value::UInt64(10));
957        // S: a < 0 is still satisfiable by NULL under null-first partition semantics.
958        // The split keeps a nullable left branch instead of degrading it as empty.
959        let split = col("a").lt(Value::UInt64(0));
960
961        let (left, right) = split_partition_expr(base, split).unwrap();
962        assert_eq!(left.to_string(), "a < 0");
963        assert_eq!(right.to_string(), "a >= 0 AND a < 10");
964    }
965
966    #[test]
967    fn test_split_degrade_on_uint_one_sided_impossible_lower_bound() {
968        // R: a < 10 (UInt64 domain)
969        let base = col("a").lt(Value::UInt64(10));
970        // S: a > u64::MAX (impossible on UInt64)
971        let split = col("a").gt(Value::UInt64(u64::MAX));
972
973        // left = (a < 10) AND (a > u64::MAX) is unsatisfiable on UInt64, should degrade.
974        let result = split_partition_expr(base, split);
975        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
976    }
977
978    #[test]
979    fn test_split_degrade_on_int_one_sided_impossible_upper_bound() {
980        // R: a < 10 (Int64 domain)
981        let base = col("a").lt(Value::Int64(10));
982        // S: a < i64::MIN is still satisfiable by NULL under null-first partition semantics.
983        // The split keeps a nullable left branch instead of degrading it as empty.
984        let split = col("a").lt(Value::Int64(i64::MIN));
985
986        let (left, right) = split_partition_expr(base, split).unwrap();
987        assert_eq!(left.to_string(), format!("a < {}", i64::MIN));
988        assert_eq!(right.to_string(), format!("a >= {} AND a < 10", i64::MIN));
989    }
990
991    #[test]
992    fn test_split_degrade_on_int_one_sided_impossible_lower_bound() {
993        // R: a < 10 (Int64 domain)
994        let base = col("a").lt(Value::Int64(10));
995        // S: a > i64::MAX (impossible on Int64)
996        let split = col("a").gt(Value::Int64(i64::MAX));
997
998        // left = (a < 10) AND (a > i64::MAX) is unsatisfiable on Int64, should degrade.
999        let result = split_partition_expr(base, split);
1000        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1001    }
1002
1003    #[test]
1004    fn test_split_degrade_on_string_one_sided_impossible_upper_bound() {
1005        // R: s < "z" (String domain)
1006        let base = col("s").lt(Value::String("z".into()));
1007        // S: s < "" is still satisfiable by NULL under null-first partition semantics.
1008        // The split keeps a nullable left branch instead of degrading it as empty.
1009        let split = col("s").lt(Value::String("".into()));
1010
1011        let (left, right) = split_partition_expr(base, split).unwrap();
1012        assert_eq!(left.to_string(), "s < ");
1013        assert_eq!(right.to_string(), "s >=  AND s < z");
1014    }
1015
1016    #[test]
1017    fn test_split_degrade_on_float64_one_sided_impossible_upper_bound() {
1018        // R: a < 10.0 (Float64 domain)
1019        let base = col("a").lt(Value::Float64(OrderedFloat(10.0)));
1020        // S: a < f64::MIN is still satisfiable by NULL under null-first partition semantics.
1021        // The split keeps a nullable left branch instead of degrading it as empty.
1022        let split = col("a").lt(Value::Float64(OrderedFloat(f64::MIN)));
1023
1024        let (left, right) = split_partition_expr(base, split).unwrap();
1025        assert_eq!(left.to_string(), format!("a < {}", f64::MIN));
1026        assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f64::MIN));
1027    }
1028
1029    #[test]
1030    fn test_split_degrade_on_float64_one_sided_impossible_lower_bound() {
1031        // R: a < 10.0 (Float64 domain)
1032        let base = col("a").lt(Value::Float64(OrderedFloat(10.0)));
1033        // S: a > f64::MAX (impossible with finite-only float policy)
1034        let split = col("a").gt(Value::Float64(OrderedFloat(f64::MAX)));
1035
1036        // left = (a < 10.0) AND (a > f64::MAX) is unsatisfiable, should degrade.
1037        let result = split_partition_expr(base, split);
1038        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1039    }
1040
1041    #[test]
1042    fn test_split_degrade_on_float32_one_sided_impossible_upper_bound() {
1043        // R: a < 10.0f32 (Float32 domain)
1044        let base = col("a").lt(Value::Float32(OrderedFloat(10.0)));
1045        // S: a < f32::MIN is still satisfiable by NULL under null-first partition semantics.
1046        // The split keeps a nullable left branch instead of degrading it as empty.
1047        let split = col("a").lt(Value::Float32(OrderedFloat(f32::MIN)));
1048
1049        let (left, right) = split_partition_expr(base, split).unwrap();
1050        assert_eq!(left.to_string(), format!("a < {}", f32::MIN));
1051        assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f32::MIN));
1052    }
1053
1054    #[test]
1055    fn test_split_degrade_on_float32_one_sided_impossible_lower_bound() {
1056        // R: a < 10.0f32 (Float32 domain)
1057        let base = col("a").lt(Value::Float32(OrderedFloat(10.0)));
1058        // S: a > f32::MAX (impossible with finite-only float policy)
1059        let split = col("a").gt(Value::Float32(OrderedFloat(f32::MAX)));
1060
1061        // left = (a < 10.0f32) AND (a > f32::MAX) is unsatisfiable, should degrade.
1062        let result = split_partition_expr(base, split);
1063        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1064    }
1065
1066    #[test]
1067    fn test_simplify_same_upper_bound_prefers_strict() {
1068        // a <= 10 AND a < 10 => a < 10
1069        let expr = col("a")
1070            .lt_eq(Value::Int64(10))
1071            .and(col("a").lt(Value::Int64(10)));
1072
1073        let simplified = simplify_and_bounds(expr);
1074        assert_eq!(simplified.to_string(), "a < 10");
1075    }
1076
1077    #[test]
1078    fn test_simplify_same_lower_bound_prefers_strict() {
1079        // a >= 10 AND a > 10 => a > 10
1080        let expr = col("a")
1081            .gt_eq(Value::Int64(10))
1082            .and(col("a").gt(Value::Int64(10)));
1083
1084        let simplified = simplify_and_bounds(expr);
1085        assert_eq!(simplified.to_string(), "a > 10");
1086    }
1087
1088    #[test]
1089    fn test_negate_split_expr_demorgan_and() {
1090        // expr: (a < 10) AND (a >= 3)
1091        let expr = col("a")
1092            .lt(Value::Int64(10))
1093            .and(col("a").gt_eq(Value::Int64(3)));
1094        let not_expr = negate_split_expr(&expr).unwrap();
1095        // NOT(expr) => (a >= 10) OR (a < 3)
1096        assert_eq!(not_expr.to_string(), "a >= 10 OR a < 3");
1097    }
1098
1099    #[test]
1100    fn test_negate_split_expr_demorgan_or() {
1101        // expr: (a = 1) OR (a <> 2)
1102        let expr = PartitionExpr::new(
1103            Operand::Expr(col("a").eq(Value::Int64(1))),
1104            RestrictedOp::Or,
1105            Operand::Expr(col("a").not_eq(Value::Int64(2))),
1106        );
1107        let not_expr = negate_split_expr(&expr).unwrap();
1108        // NOT(expr) => (a <> 1) AND (a = 2)
1109        assert_eq!(not_expr.to_string(), "a <> 1 AND a = 2");
1110    }
1111
1112    #[test]
1113    fn test_negate_split_expr_invalid_and_operand() {
1114        // malformed AND: rhs is a scalar value, not an Expr subtree.
1115        let malformed = PartitionExpr {
1116            lhs: Box::new(Operand::Expr(col("a").lt(Value::Int64(10)))),
1117            op: RestrictedOp::And,
1118            rhs: Box::new(Operand::Value(Value::Int64(1))),
1119        };
1120        assert!(negate_split_expr(&malformed).is_err());
1121    }
1122
1123    #[test]
1124    fn test_validate_supported_expr_value_column_allowed() {
1125        // Canonicalization can flip to column-value; validator must accept value-column input.
1126        let expr = PartitionExpr::new(
1127            Operand::Value(Value::Int64(10)),
1128            RestrictedOp::Lt,
1129            Operand::Column("a".to_string()),
1130        );
1131        assert!(validate_supported_expr(&expr).is_ok());
1132    }
1133
1134    #[test]
1135    fn test_validate_supported_expr_invalid_atomic_shape() {
1136        // column-column atomic comparison is out of shape.
1137        let expr = PartitionExpr::new(
1138            Operand::Column("a".to_string()),
1139            RestrictedOp::Eq,
1140            Operand::Column("b".to_string()),
1141        );
1142        assert!(validate_supported_expr(&expr).is_err());
1143    }
1144
1145    #[test]
1146    fn test_validate_supported_expr_nan_comparison_rejected() {
1147        // NaN cannot be used in any supported comparison predicate.
1148        let expr = col("a").lt(Value::Float64(OrderedFloat(f64::NAN)));
1149        assert!(validate_supported_expr(&expr).is_err());
1150    }
1151
1152    #[test]
1153    fn test_validate_supported_expr_infinite_comparison_rejected() {
1154        // Infinity cannot be used in any supported comparison predicate under
1155        // finite-only float policy.
1156        let pos_inf = col("a").gt(Value::Float64(OrderedFloat(f64::INFINITY)));
1157        let neg_inf = col("a").lt(Value::Float32(OrderedFloat(f32::NEG_INFINITY)));
1158        assert!(validate_supported_expr(&pos_inf).is_err());
1159        assert!(validate_supported_expr(&neg_inf).is_err());
1160    }
1161
1162    #[test]
1163    fn test_validate_supported_expr_nan_eq_rejected() {
1164        let expr = col("a").eq(Value::Float64(OrderedFloat(f64::NAN)));
1165        assert!(validate_supported_expr(&expr).is_err());
1166    }
1167
1168    #[test]
1169    fn test_validate_supported_expr_infinite_eq_rejected() {
1170        let pos_inf = col("a").eq(Value::Float64(OrderedFloat(f64::INFINITY)));
1171        let neg_inf = col("a").not_eq(Value::Float32(OrderedFloat(f32::NEG_INFINITY)));
1172        assert!(validate_supported_expr(&pos_inf).is_err());
1173        assert!(validate_supported_expr(&neg_inf).is_err());
1174    }
1175
1176    #[test]
1177    fn test_simplify_and_bounds_or_keeps_original() {
1178        // OR tree is intentionally not flattened by AND-only simplifier.
1179        let expr = PartitionExpr::new(
1180            Operand::Expr(col("a").lt(Value::Int64(10))),
1181            RestrictedOp::Or,
1182            Operand::Expr(col("a").gt_eq(Value::Int64(20))),
1183        );
1184        let simplified = simplify_and_bounds(expr.clone());
1185        assert_eq!(simplified.to_string(), expr.to_string());
1186    }
1187
1188    #[test]
1189    fn test_simplify_and_bounds_keep_stronger_when_weaker_seen_later() {
1190        // upper: stronger bound first, weaker later -> keep stronger (< 5).
1191        let upper = col("a")
1192            .lt(Value::Int64(5))
1193            .and(col("a").lt(Value::Int64(10)));
1194        assert_eq!(simplify_and_bounds(upper).to_string(), "a < 5");
1195
1196        // lower: stronger bound first, weaker later -> keep stronger (> 10).
1197        let lower = col("a")
1198            .gt(Value::Int64(10))
1199            .and(col("a").gt(Value::Int64(5)));
1200        assert_eq!(simplify_and_bounds(lower).to_string(), "a > 10");
1201    }
1202
1203    #[test]
1204    fn test_internal_helpers_uncovered_branches() {
1205        // Empty AND fold should return None.
1206        assert!(fold_and_exprs(vec![]).is_none());
1207
1208        // Any OR in tree disables AND-bound simplification path.
1209        let mut out = Vec::new();
1210        let or_expr = PartitionExpr::new(
1211            Operand::Expr(col("a").lt(Value::Int64(10))),
1212            RestrictedOp::Or,
1213            Operand::Expr(col("a").gt_eq(Value::Int64(20))),
1214        );
1215        assert!(!collect_and_atoms(&or_expr, &mut out));
1216
1217        // value-value atom has no (column, op, value) projection.
1218        let value_value = PartitionExpr::new(
1219            Operand::Value(Value::Int64(1)),
1220            RestrictedOp::Eq,
1221            Operand::Value(Value::Int64(2)),
1222        );
1223        assert!(atom_col_op_val(&value_value).is_none());
1224    }
1225
1226    #[test]
1227    fn test_split_rejects_or_in_base_expr() {
1228        // R: (a < 10) OR (a >= 20 AND a < 30) falls outside the AND-only base_expr contract.
1229        let base = PartitionExpr::new(
1230            Operand::Expr(col("a").lt(Value::Int64(10))),
1231            RestrictedOp::Or,
1232            Operand::Expr(
1233                col("a")
1234                    .gt_eq(Value::Int64(20))
1235                    .and(col("a").lt(Value::Int64(30))),
1236            ),
1237        );
1238        // S: a < 5
1239        let split = col("a").lt(Value::Int64(5));
1240
1241        let result = split_partition_expr(base, split);
1242        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
1243    }
1244
1245    #[test]
1246    fn test_split_rejects_or_in_split_expr() {
1247        // R: a < 10
1248        let base = col("a").lt(Value::Int64(10));
1249        // S: (a < 5) OR (a >= 8 AND a < 9) falls outside the atomic split_expr contract.
1250        let split = PartitionExpr::new(
1251            Operand::Expr(col("a").lt(Value::Int64(5))),
1252            RestrictedOp::Or,
1253            Operand::Expr(
1254                col("a")
1255                    .gt_eq(Value::Int64(8))
1256                    .and(col("a").lt(Value::Int64(9))),
1257            ),
1258        );
1259
1260        let result = split_partition_expr(base, split);
1261        assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
1262    }
1263}