1use std::collections::{BTreeMap, HashSet};
27
28use datatypes::value::Value;
29use snafu::ensure;
30
31use crate::collider::Collider;
32use crate::error::{self, Result};
33use crate::expr::{Operand, PartitionExpr, RestrictedOp};
34
35#[derive(Debug, Clone, PartialEq, Eq)]
36pub enum ExprSplitDegradeReason {
37 UnsupportedType,
38 UnsupportedNotExpansion,
39 ColliderRejected,
40 EmptyBranch,
41}
42
43pub fn split_partition_expr(
56 base_expr: PartitionExpr,
57 split_expr: PartitionExpr,
58) -> std::result::Result<(PartitionExpr, PartitionExpr), ExprSplitDegradeReason> {
59 let base = base_expr.canonicalize();
60 let split = split_expr.canonicalize();
61
62 if validate_supported_expr(&base).is_err() || validate_supported_expr(&split).is_err() {
63 return Err(ExprSplitDegradeReason::UnsupportedType);
64 }
65
66 if !validate_base_expr_shape(&base) || !validate_split_expr_shape(&split) {
67 return Err(ExprSplitDegradeReason::UnsupportedType);
68 }
69
70 let not_split = match negate_split_expr(&split) {
71 Ok(expr) => expr,
72 Err(_) => {
73 return Err(ExprSplitDegradeReason::UnsupportedNotExpansion);
74 }
75 };
76
77 let left_raw = base.clone().and(split);
78 let right_raw = base.clone().and(not_split);
79
80 if Collider::new(std::slice::from_ref(&left_raw)).is_err()
81 || Collider::new(std::slice::from_ref(&right_raw)).is_err()
82 {
83 return Err(ExprSplitDegradeReason::ColliderRejected);
84 }
85
86 let left_expr = simplify_and_bounds(left_raw);
87 let right_expr = simplify_and_bounds(right_raw);
88
89 if is_empty_and_conjunction(&left_expr) || is_empty_and_conjunction(&right_expr) {
90 return Err(ExprSplitDegradeReason::EmptyBranch);
91 }
92
93 Ok((left_expr, right_expr))
94}
95
96fn is_empty_and_conjunction(expr: &PartitionExpr) -> bool {
122 let Some(collected) = collect_conjunction_bounds(expr) else {
123 return false;
124 };
125
126 if collected.has_conflict {
127 return true;
128 }
129
130 let CollectedConjunction {
131 lowers,
132 uppers,
133 not_equals,
134 passthrough: _,
135 has_conflict: _,
136 } = collected;
137
138 if lowers
139 .iter()
140 .any(|(col, lower)| !uppers.contains_key(col) && is_strictly_greater_than_domain_max(lower))
141 {
142 return true;
143 }
144
145 lowers.into_iter().any(|(col, lower)| {
147 let Some(upper) = uppers.get(&col) else {
148 return false;
149 };
150
151 match lower.value.partial_cmp(&upper.value) {
152 Some(std::cmp::Ordering::Greater) => true,
153 Some(std::cmp::Ordering::Equal) => {
154 if !lower.inclusive || !upper.inclusive {
155 true
156 } else {
157 not_equals
158 .get(&col)
159 .is_some_and(|excluded| excluded.contains(&lower.value))
160 }
161 }
162 Some(std::cmp::Ordering::Less) => {
163 match (
164 discrete_value_index(&lower.value),
165 discrete_value_index(&upper.value),
166 ) {
167 (Some(lower_idx), Some(upper_idx)) => {
168 let min_candidate = if lower.inclusive {
169 Some(lower_idx)
170 } else {
171 lower_idx.checked_add(1)
172 };
173 let max_candidate = if upper.inclusive {
174 Some(upper_idx)
175 } else {
176 upper_idx.checked_sub(1)
177 };
178 match (min_candidate, max_candidate) {
179 (Some(min_val), Some(max_val)) => min_val > max_val,
180 _ => true,
181 }
182 }
183 _ => false,
184 }
185 }
186 _ => false,
187 }
188 })
189}
190
191fn discrete_value_index(v: &Value) -> Option<i128> {
192 match v {
193 Value::Int8(x) => Some(*x as i128),
194 Value::Int16(x) => Some(*x as i128),
195 Value::Int32(x) => Some(*x as i128),
196 Value::Int64(x) => Some(*x as i128),
197 Value::UInt8(x) => Some(*x as i128),
198 Value::UInt16(x) => Some(*x as i128),
199 Value::UInt32(x) => Some(*x as i128),
200 Value::UInt64(x) => Some(*x as i128),
201 _ => None,
202 }
203}
204
205fn is_strictly_greater_than_domain_max(bound: &LowerBound) -> bool {
206 if bound.inclusive {
207 return false;
208 }
209
210 is_domain_max_value(&bound.value)
211}
212
213fn is_domain_max_value(v: &Value) -> bool {
214 match v {
215 Value::Float32(v) => v.0 == f32::MAX,
216 Value::Float64(v) => v.0 == f64::MAX,
217 Value::UInt8(v) => *v == u8::MAX,
218 Value::UInt16(v) => *v == u16::MAX,
219 Value::UInt32(v) => *v == u32::MAX,
220 Value::UInt64(v) => *v == u64::MAX,
221 Value::Int8(v) => *v == i8::MAX,
222 Value::Int16(v) => *v == i16::MAX,
223 Value::Int32(v) => *v == i32::MAX,
224 Value::Int64(v) => *v == i64::MAX,
225 _ => false,
226 }
227}
228
229pub fn negate_split_expr(expr: &PartitionExpr) -> Result<PartitionExpr> {
251 match expr.op() {
252 RestrictedOp::Eq
253 | RestrictedOp::NotEq
254 | RestrictedOp::Lt
255 | RestrictedOp::LtEq
256 | RestrictedOp::Gt
257 | RestrictedOp::GtEq => {
258 let op = match expr.op() {
260 RestrictedOp::Eq => RestrictedOp::NotEq,
261 RestrictedOp::NotEq => RestrictedOp::Eq,
262 RestrictedOp::Lt => RestrictedOp::GtEq,
263 RestrictedOp::LtEq => RestrictedOp::Gt,
264 RestrictedOp::Gt => RestrictedOp::LtEq,
265 RestrictedOp::GtEq => RestrictedOp::Lt,
266 RestrictedOp::And | RestrictedOp::Or => unreachable!(),
267 };
268 Ok(PartitionExpr::new(
269 expr.lhs().clone(),
270 op,
271 expr.rhs().clone(),
272 ))
273 }
274 RestrictedOp::And | RestrictedOp::Or => {
275 let lhs = match expr.lhs() {
277 Operand::Expr(lhs) => lhs,
278 other => {
279 return error::NoExprOperandSnafu {
280 operand: other.clone(),
281 }
282 .fail();
283 }
284 };
285 let rhs = match expr.rhs() {
286 Operand::Expr(rhs) => rhs,
287 other => {
288 return error::NoExprOperandSnafu {
289 operand: other.clone(),
290 }
291 .fail();
292 }
293 };
294 let not_lhs = negate_split_expr(lhs)?;
295 let not_rhs = negate_split_expr(rhs)?;
296 let op = match expr.op() {
297 RestrictedOp::And => RestrictedOp::Or,
299 RestrictedOp::Or => RestrictedOp::And,
301 _ => unreachable!(),
302 };
303 Ok(PartitionExpr::new(
304 Operand::Expr(not_lhs),
305 op,
306 Operand::Expr(not_rhs),
307 ))
308 }
309 }
310}
311
312pub fn validate_supported_expr(expr: &PartitionExpr) -> Result<()> {
313 match expr.op() {
314 RestrictedOp::And | RestrictedOp::Or => {
315 let lhs = match expr.lhs() {
316 Operand::Expr(lhs) => lhs,
317 other => {
318 return error::NoExprOperandSnafu {
319 operand: other.clone(),
320 }
321 .fail();
322 }
323 };
324 let rhs = match expr.rhs() {
325 Operand::Expr(rhs) => rhs,
326 other => {
327 return error::NoExprOperandSnafu {
328 operand: other.clone(),
329 }
330 .fail();
331 }
332 };
333 validate_supported_expr(lhs)?;
334 validate_supported_expr(rhs)?;
335 Ok(())
336 }
337 _ => validate_atomic(expr),
338 }
339}
340
341fn validate_atomic(expr: &PartitionExpr) -> Result<()> {
342 let (lhs, rhs) = (expr.lhs(), expr.rhs());
343 match (lhs, rhs) {
344 (Operand::Column(_), Operand::Value(v)) | (Operand::Value(v), Operand::Column(_)) => {
345 ensure!(
346 is_supported_value(v),
347 error::InvalidExprSnafu { expr: expr.clone() }
348 );
349 if is_nan_value(v) || is_infinite_value(v) {
350 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
351 }
352 Ok(())
353 }
354 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
355 }
356}
357
358fn validate_base_expr_shape(expr: &PartitionExpr) -> bool {
365 let mut atoms = Vec::new();
366 if !collect_and_atoms(expr, &mut atoms) {
367 return false;
368 }
369
370 atoms
371 .into_iter()
372 .all(|atom| is_atomic_range_expr(&atom.canonicalize()))
373}
374
375fn validate_split_expr_shape(expr: &PartitionExpr) -> bool {
380 is_atomic_range_expr(expr)
381}
382
383fn is_atomic_range_expr(expr: &PartitionExpr) -> bool {
387 atom_col_op_val(expr).is_some_and(|(_, op, _)| {
388 matches!(
389 op,
390 RestrictedOp::Lt | RestrictedOp::LtEq | RestrictedOp::Gt | RestrictedOp::GtEq
391 )
392 })
393}
394
395fn is_supported_value(v: &Value) -> bool {
396 matches!(
397 v,
398 Value::Int8(_)
399 | Value::Int16(_)
400 | Value::Int32(_)
401 | Value::Int64(_)
402 | Value::UInt8(_)
403 | Value::UInt16(_)
404 | Value::UInt32(_)
405 | Value::UInt64(_)
406 | Value::Float32(_)
407 | Value::Float64(_)
408 | Value::String(_)
409 )
410}
411
412fn is_nan_value(v: &Value) -> bool {
413 match v {
414 Value::Float32(x) => x.0.is_nan(),
415 Value::Float64(x) => x.0.is_nan(),
416 _ => false,
417 }
418}
419
420fn is_infinite_value(v: &Value) -> bool {
421 match v {
422 Value::Float32(x) => x.0.is_infinite(),
423 Value::Float64(x) => x.0.is_infinite(),
424 _ => false,
425 }
426}
427
428#[derive(Debug, Clone)]
429struct LowerBound {
430 value: Value,
431 inclusive: bool,
432}
433
434#[derive(Debug, Clone)]
435struct UpperBound {
436 value: Value,
437 inclusive: bool,
438}
439
440struct CollectedConjunction {
441 lowers: BTreeMap<String, LowerBound>,
442 uppers: BTreeMap<String, UpperBound>,
443 not_equals: BTreeMap<String, HashSet<Value>>,
444 passthrough: Vec<PartitionExpr>,
445 has_conflict: bool,
446}
447
448fn simplify_and_bounds(expr: PartitionExpr) -> PartitionExpr {
470 let Some(collected) = collect_conjunction_bounds(&expr) else {
471 return expr;
472 };
473
474 let CollectedConjunction {
475 lowers,
476 uppers,
477 not_equals: _,
478 passthrough,
479 has_conflict: _,
480 } = collected;
481
482 let mut out = passthrough;
483 out.extend(lowers.into_iter().map(|(col, lower)| {
484 PartitionExpr::new(
485 Operand::Column(col),
486 if lower.inclusive {
487 RestrictedOp::GtEq
488 } else {
489 RestrictedOp::Gt
490 },
491 Operand::Value(lower.value),
492 )
493 }));
494 out.extend(uppers.into_iter().map(|(col, upper)| {
495 PartitionExpr::new(
496 Operand::Column(col),
497 if upper.inclusive {
498 RestrictedOp::LtEq
499 } else {
500 RestrictedOp::Lt
501 },
502 Operand::Value(upper.value),
503 )
504 }));
505
506 fold_and_exprs(out).unwrap_or(expr)
507}
508
509fn collect_and_atoms(expr: &PartitionExpr, out: &mut Vec<PartitionExpr>) -> bool {
514 match expr.op() {
515 RestrictedOp::And => {
516 let lhs = match expr.lhs() {
517 Operand::Expr(lhs) => lhs,
518 _ => return false,
519 };
520 let rhs = match expr.rhs() {
521 Operand::Expr(rhs) => rhs,
522 _ => return false,
523 };
524 collect_and_atoms(lhs, out) && collect_and_atoms(rhs, out)
525 }
526 RestrictedOp::Or => false,
527 _ => {
528 out.push(expr.clone());
529 true
530 }
531 }
532}
533
534fn atom_col_op_val(expr: &PartitionExpr) -> Option<(String, RestrictedOp, Value)> {
536 let lhs = expr.lhs();
537 let rhs = expr.rhs();
538 match (lhs, rhs) {
539 (Operand::Column(col), Operand::Value(v)) => {
540 Some((col.clone(), expr.op().clone(), v.clone()))
541 }
542 _ => None,
543 }
544}
545
546fn collect_conjunction_bounds(expr: &PartitionExpr) -> Option<CollectedConjunction> {
568 let mut atoms = Vec::new();
569 if !collect_and_atoms(expr, &mut atoms) {
570 return None;
571 }
572
573 let mut lowers = BTreeMap::new();
574 let mut uppers = BTreeMap::new();
575 let mut equals = BTreeMap::new();
576 let mut not_equals: BTreeMap<String, HashSet<Value>> = BTreeMap::new();
577 let mut passthrough = Vec::new();
578 let mut seen = HashSet::new();
579 let mut has_conflict = false;
580
581 for atom in atoms {
582 let atom = atom.canonicalize();
583 let Some((col, op, val)) = atom_col_op_val(&atom) else {
584 push_unique_expr(&mut passthrough, &mut seen, atom);
585 continue;
586 };
587
588 match op {
589 RestrictedOp::Lt | RestrictedOp::LtEq => update_upper_bound(
590 &mut uppers,
591 col,
592 UpperBound {
593 value: val,
594 inclusive: matches!(op, RestrictedOp::LtEq),
595 },
596 ),
597 RestrictedOp::Gt | RestrictedOp::GtEq => update_lower_bound(
598 &mut lowers,
599 col,
600 LowerBound {
601 value: val,
602 inclusive: matches!(op, RestrictedOp::GtEq),
603 },
604 ),
605 RestrictedOp::Eq => {
606 if let Some(existing) = equals.get(&col)
607 && existing != &val
608 {
609 has_conflict = true;
610 }
611 if not_equals
612 .get(&col)
613 .is_some_and(|excluded| excluded.contains(&val))
614 {
615 has_conflict = true;
616 }
617 equals.insert(col.clone(), val.clone());
618 update_lower_bound(
619 &mut lowers,
620 col.clone(),
621 LowerBound {
622 value: val.clone(),
623 inclusive: true,
624 },
625 );
626 update_upper_bound(
627 &mut uppers,
628 col,
629 UpperBound {
630 value: val,
631 inclusive: true,
632 },
633 );
634 push_unique_expr(&mut passthrough, &mut seen, atom);
635 }
636 RestrictedOp::NotEq => {
637 if equals.get(&col).is_some_and(|eq| eq == &val) {
638 has_conflict = true;
639 }
640 not_equals.entry(col).or_default().insert(val);
641 push_unique_expr(&mut passthrough, &mut seen, atom);
642 }
643 RestrictedOp::And | RestrictedOp::Or => {
644 push_unique_expr(&mut passthrough, &mut seen, atom);
645 }
646 }
647 }
648
649 Some(CollectedConjunction {
650 lowers,
651 uppers,
652 not_equals,
653 passthrough,
654 has_conflict,
655 })
656}
657
658fn push_unique_expr(out: &mut Vec<PartitionExpr>, seen: &mut HashSet<String>, expr: PartitionExpr) {
659 let key = expr.to_string();
660 if seen.insert(key) {
661 out.push(expr);
662 }
663}
664
665fn update_upper_bound(
666 uppers: &mut BTreeMap<String, UpperBound>,
667 col: String,
668 candidate: UpperBound,
669) {
670 match uppers.get_mut(&col) {
671 Some(current) => {
672 if prefer_upper(&candidate, current) {
673 *current = candidate;
674 }
675 }
676 None => {
677 uppers.insert(col, candidate);
678 }
679 }
680}
681
682fn update_lower_bound(
683 lowers: &mut BTreeMap<String, LowerBound>,
684 col: String,
685 candidate: LowerBound,
686) {
687 match lowers.get_mut(&col) {
688 Some(current) => {
689 if prefer_lower(&candidate, current) {
690 *current = candidate;
691 }
692 }
693 None => {
694 lowers.insert(col, candidate);
695 }
696 }
697}
698
699fn prefer_upper(candidate: &UpperBound, current: &UpperBound) -> bool {
700 match candidate.value.partial_cmp(¤t.value) {
702 Some(std::cmp::Ordering::Less) => true,
703 Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive,
704 _ => false,
705 }
706}
707
708fn prefer_lower(candidate: &LowerBound, current: &LowerBound) -> bool {
709 match candidate.value.partial_cmp(¤t.value) {
711 Some(std::cmp::Ordering::Greater) => true,
712 Some(std::cmp::Ordering::Equal) => !candidate.inclusive && current.inclusive,
713 _ => false,
714 }
715}
716
717fn fold_and_exprs(mut exprs: Vec<PartitionExpr>) -> Option<PartitionExpr> {
720 exprs.drain(..).reduce(|acc, next| acc.and(next))
721}
722
723#[cfg(test)]
724mod tests {
725 use datatypes::value::{OrderedFloat, Value};
726 use store_api::storage::RegionNumber;
727
728 use super::*;
729 use crate::checker::PartitionChecker;
730 use crate::expr::col;
731 use crate::multi_dim::MultiDimPartitionRule;
732
733 fn validate_cut_result_with_checker(
734 original_rule_exprs: &[PartitionExpr],
735 replaced_index: usize,
736 left: &Option<PartitionExpr>,
737 right: &Option<PartitionExpr>,
738 partition_columns: Vec<String>,
739 regions: Vec<RegionNumber>,
740 ) -> Result<()> {
741 ensure!(
742 replaced_index < original_rule_exprs.len(),
743 error::UnexpectedSnafu {
744 err_msg: format!(
745 "replaced index out of bounds: {replaced_index} >= {}",
746 original_rule_exprs.len()
747 )
748 }
749 );
750
751 let mut exprs = original_rule_exprs.to_vec();
752 exprs.remove(replaced_index);
753 exprs.extend(left.iter().cloned());
754 exprs.extend(right.iter().cloned());
755
756 ensure!(
757 !exprs.is_empty(),
758 error::UnexpectedSnafu {
759 err_msg: "empty rule exprs after split".to_string()
760 }
761 );
762
763 let final_regions = if regions.len() == exprs.len() {
764 regions
765 } else {
766 (0..exprs.len() as RegionNumber).collect()
767 };
768
769 let rule = MultiDimPartitionRule::try_new(partition_columns, final_regions, exprs, false)?;
770 let checker = PartitionChecker::try_new(&rule)?;
771 checker.check()?;
772 Ok(())
773 }
774
775 #[test]
776 fn test_split_simple_range() {
777 let base = col("a").lt(Value::Int64(10));
779 let split = col("a").lt(Value::Int64(5));
781 let (left, right) = split_partition_expr(base, split).unwrap();
782 assert_eq!(left.to_string(), "a < 5");
784 assert_eq!(right.to_string(), "a >= 5 AND a < 10");
786 }
787
788 #[test]
789 fn test_split_string_interval() {
790 let base = col("v")
792 .gt(Value::String("m".into()))
793 .and(col("v").lt(Value::String("n".into())));
794 let split = col("v").lt(Value::String("m~".into()));
796 let (left, right) = split_partition_expr(base, split).unwrap();
797 assert_eq!(left.to_string(), "v > m AND v < m~");
799 assert_eq!(right.to_string(), "v >= m~ AND v < n");
801 }
802
803 #[test]
804 fn test_split_numeric_interval_mid_split() {
805 let base = col("a")
807 .gt(Value::Int64(3))
808 .and(col("a").lt(Value::Int64(10)));
809 let split = col("a").lt(Value::Int64(5));
811
812 let (left, right) = split_partition_expr(base, split).unwrap();
813
814 assert_eq!(left.to_string(), "a > 3 AND a < 5");
816 assert_eq!(right.to_string(), "a >= 5 AND a < 10");
818 }
819
820 #[test]
821 fn test_split_base_expr_allows_unrelated_range_columns() {
822 let base = col("a")
824 .gt(Value::Int64(20))
825 .and(col("b").lt(Value::Int64(20)));
826 let split = col("a").lt(Value::Int64(30));
828
829 let (left, right) = split_partition_expr(base, split).unwrap();
830
831 assert_eq!(left.to_string(), "a > 20 AND a < 30 AND b < 20");
833 assert_eq!(right.to_string(), "a >= 30 AND b < 20");
835 }
836
837 #[test]
838 fn test_split_degrade_on_unsupported_type() {
839 let base = col("a").eq(Value::Boolean(true));
841 let split = col("a").eq(Value::Boolean(true));
842
843 let result = split_partition_expr(base, split);
844 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
845 }
846
847 #[test]
848 fn test_validate_cut_result_with_checker() {
849 let original = vec![
851 col("a").lt(Value::Int64(10)),
852 col("a").gt_eq(Value::Int64(10)),
853 ];
854 let left = Some(col("a").lt(Value::Int64(5)));
855 let right = Some(
856 col("a")
857 .gt_eq(Value::Int64(5))
858 .and(col("a").lt(Value::Int64(10))),
859 );
860
861 validate_cut_result_with_checker(
862 &original,
863 0,
864 &left,
865 &right,
866 vec!["a".to_string()],
867 vec![1, 2, 3],
868 )
869 .unwrap();
870 }
871
872 #[test]
873 fn test_split_degrade_on_empty_branch() {
874 let base = col("a").lt(Value::Int64(10));
876 let split = col("a").lt(Value::Int64(20));
878
879 let result = split_partition_expr(base, split);
881 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
882 }
883
884 #[test]
885 fn test_split_rejects_eq_in_base_expr() {
886 let base = col("a").eq(Value::Int64(5));
888 let split = col("a").lt(Value::Int64(6));
890
891 let result = split_partition_expr(base, split);
892 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
893 }
894
895 #[test]
896 fn test_split_degrade_on_discrete_gap_int() {
897 let base = col("a").lt(Value::Int64(5));
899 let split = col("a").lt_eq(Value::Int64(4));
901
902 let result = split_partition_expr(base, split);
904 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
905 }
906
907 #[test]
908 fn test_split_degrade_on_unsupported_date_type() {
909 let base = col("d").lt(Value::Date(5.into()));
911 let split = col("d").lt_eq(Value::Date(4.into()));
912
913 let result = split_partition_expr(base, split);
914 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
915 }
916
917 #[test]
918 fn test_split_degrade_on_unsupported_timestamp_type() {
919 let base = col("ts").lt(Value::Timestamp(0.into()));
921 let split = col("ts").lt_eq(Value::Timestamp(1.into()));
922
923 let result = split_partition_expr(base, split);
924 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
925 }
926
927 #[test]
928 fn test_split_rejects_not_eq_in_split_expr() {
929 let base = col("a")
931 .gt_eq(Value::Int64(5))
932 .and(col("a").lt_eq(Value::Int64(5)));
933 let split = col("a").not_eq(Value::Int64(5));
935
936 let result = split_partition_expr(base, split);
937 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
938 }
939
940 #[test]
941 fn test_split_rejects_eq_in_split_expr() {
942 let base = col("a")
944 .gt_eq(Value::Int64(5))
945 .and(col("a").lt_eq(Value::Int64(5)));
946 let split = col("a").eq(Value::Int64(5));
948
949 let result = split_partition_expr(base, split);
950 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
951 }
952
953 #[test]
954 fn test_split_degrade_on_uint_one_sided_impossible_upper_bound() {
955 let base = col("a").lt(Value::UInt64(10));
957 let split = col("a").lt(Value::UInt64(0));
960
961 let (left, right) = split_partition_expr(base, split).unwrap();
962 assert_eq!(left.to_string(), "a < 0");
963 assert_eq!(right.to_string(), "a >= 0 AND a < 10");
964 }
965
966 #[test]
967 fn test_split_degrade_on_uint_one_sided_impossible_lower_bound() {
968 let base = col("a").lt(Value::UInt64(10));
970 let split = col("a").gt(Value::UInt64(u64::MAX));
972
973 let result = split_partition_expr(base, split);
975 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
976 }
977
978 #[test]
979 fn test_split_degrade_on_int_one_sided_impossible_upper_bound() {
980 let base = col("a").lt(Value::Int64(10));
982 let split = col("a").lt(Value::Int64(i64::MIN));
985
986 let (left, right) = split_partition_expr(base, split).unwrap();
987 assert_eq!(left.to_string(), format!("a < {}", i64::MIN));
988 assert_eq!(right.to_string(), format!("a >= {} AND a < 10", i64::MIN));
989 }
990
991 #[test]
992 fn test_split_degrade_on_int_one_sided_impossible_lower_bound() {
993 let base = col("a").lt(Value::Int64(10));
995 let split = col("a").gt(Value::Int64(i64::MAX));
997
998 let result = split_partition_expr(base, split);
1000 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1001 }
1002
1003 #[test]
1004 fn test_split_degrade_on_string_one_sided_impossible_upper_bound() {
1005 let base = col("s").lt(Value::String("z".into()));
1007 let split = col("s").lt(Value::String("".into()));
1010
1011 let (left, right) = split_partition_expr(base, split).unwrap();
1012 assert_eq!(left.to_string(), "s < ");
1013 assert_eq!(right.to_string(), "s >= AND s < z");
1014 }
1015
1016 #[test]
1017 fn test_split_degrade_on_float64_one_sided_impossible_upper_bound() {
1018 let base = col("a").lt(Value::Float64(OrderedFloat(10.0)));
1020 let split = col("a").lt(Value::Float64(OrderedFloat(f64::MIN)));
1023
1024 let (left, right) = split_partition_expr(base, split).unwrap();
1025 assert_eq!(left.to_string(), format!("a < {}", f64::MIN));
1026 assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f64::MIN));
1027 }
1028
1029 #[test]
1030 fn test_split_degrade_on_float64_one_sided_impossible_lower_bound() {
1031 let base = col("a").lt(Value::Float64(OrderedFloat(10.0)));
1033 let split = col("a").gt(Value::Float64(OrderedFloat(f64::MAX)));
1035
1036 let result = split_partition_expr(base, split);
1038 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1039 }
1040
1041 #[test]
1042 fn test_split_degrade_on_float32_one_sided_impossible_upper_bound() {
1043 let base = col("a").lt(Value::Float32(OrderedFloat(10.0)));
1045 let split = col("a").lt(Value::Float32(OrderedFloat(f32::MIN)));
1048
1049 let (left, right) = split_partition_expr(base, split).unwrap();
1050 assert_eq!(left.to_string(), format!("a < {}", f32::MIN));
1051 assert_eq!(right.to_string(), format!("a >= {} AND a < 10", f32::MIN));
1052 }
1053
1054 #[test]
1055 fn test_split_degrade_on_float32_one_sided_impossible_lower_bound() {
1056 let base = col("a").lt(Value::Float32(OrderedFloat(10.0)));
1058 let split = col("a").gt(Value::Float32(OrderedFloat(f32::MAX)));
1060
1061 let result = split_partition_expr(base, split);
1063 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::EmptyBranch);
1064 }
1065
1066 #[test]
1067 fn test_simplify_same_upper_bound_prefers_strict() {
1068 let expr = col("a")
1070 .lt_eq(Value::Int64(10))
1071 .and(col("a").lt(Value::Int64(10)));
1072
1073 let simplified = simplify_and_bounds(expr);
1074 assert_eq!(simplified.to_string(), "a < 10");
1075 }
1076
1077 #[test]
1078 fn test_simplify_same_lower_bound_prefers_strict() {
1079 let expr = col("a")
1081 .gt_eq(Value::Int64(10))
1082 .and(col("a").gt(Value::Int64(10)));
1083
1084 let simplified = simplify_and_bounds(expr);
1085 assert_eq!(simplified.to_string(), "a > 10");
1086 }
1087
1088 #[test]
1089 fn test_negate_split_expr_demorgan_and() {
1090 let expr = col("a")
1092 .lt(Value::Int64(10))
1093 .and(col("a").gt_eq(Value::Int64(3)));
1094 let not_expr = negate_split_expr(&expr).unwrap();
1095 assert_eq!(not_expr.to_string(), "a >= 10 OR a < 3");
1097 }
1098
1099 #[test]
1100 fn test_negate_split_expr_demorgan_or() {
1101 let expr = PartitionExpr::new(
1103 Operand::Expr(col("a").eq(Value::Int64(1))),
1104 RestrictedOp::Or,
1105 Operand::Expr(col("a").not_eq(Value::Int64(2))),
1106 );
1107 let not_expr = negate_split_expr(&expr).unwrap();
1108 assert_eq!(not_expr.to_string(), "a <> 1 AND a = 2");
1110 }
1111
1112 #[test]
1113 fn test_negate_split_expr_invalid_and_operand() {
1114 let malformed = PartitionExpr {
1116 lhs: Box::new(Operand::Expr(col("a").lt(Value::Int64(10)))),
1117 op: RestrictedOp::And,
1118 rhs: Box::new(Operand::Value(Value::Int64(1))),
1119 };
1120 assert!(negate_split_expr(&malformed).is_err());
1121 }
1122
1123 #[test]
1124 fn test_validate_supported_expr_value_column_allowed() {
1125 let expr = PartitionExpr::new(
1127 Operand::Value(Value::Int64(10)),
1128 RestrictedOp::Lt,
1129 Operand::Column("a".to_string()),
1130 );
1131 assert!(validate_supported_expr(&expr).is_ok());
1132 }
1133
1134 #[test]
1135 fn test_validate_supported_expr_invalid_atomic_shape() {
1136 let expr = PartitionExpr::new(
1138 Operand::Column("a".to_string()),
1139 RestrictedOp::Eq,
1140 Operand::Column("b".to_string()),
1141 );
1142 assert!(validate_supported_expr(&expr).is_err());
1143 }
1144
1145 #[test]
1146 fn test_validate_supported_expr_nan_comparison_rejected() {
1147 let expr = col("a").lt(Value::Float64(OrderedFloat(f64::NAN)));
1149 assert!(validate_supported_expr(&expr).is_err());
1150 }
1151
1152 #[test]
1153 fn test_validate_supported_expr_infinite_comparison_rejected() {
1154 let pos_inf = col("a").gt(Value::Float64(OrderedFloat(f64::INFINITY)));
1157 let neg_inf = col("a").lt(Value::Float32(OrderedFloat(f32::NEG_INFINITY)));
1158 assert!(validate_supported_expr(&pos_inf).is_err());
1159 assert!(validate_supported_expr(&neg_inf).is_err());
1160 }
1161
1162 #[test]
1163 fn test_validate_supported_expr_nan_eq_rejected() {
1164 let expr = col("a").eq(Value::Float64(OrderedFloat(f64::NAN)));
1165 assert!(validate_supported_expr(&expr).is_err());
1166 }
1167
1168 #[test]
1169 fn test_validate_supported_expr_infinite_eq_rejected() {
1170 let pos_inf = col("a").eq(Value::Float64(OrderedFloat(f64::INFINITY)));
1171 let neg_inf = col("a").not_eq(Value::Float32(OrderedFloat(f32::NEG_INFINITY)));
1172 assert!(validate_supported_expr(&pos_inf).is_err());
1173 assert!(validate_supported_expr(&neg_inf).is_err());
1174 }
1175
1176 #[test]
1177 fn test_simplify_and_bounds_or_keeps_original() {
1178 let expr = PartitionExpr::new(
1180 Operand::Expr(col("a").lt(Value::Int64(10))),
1181 RestrictedOp::Or,
1182 Operand::Expr(col("a").gt_eq(Value::Int64(20))),
1183 );
1184 let simplified = simplify_and_bounds(expr.clone());
1185 assert_eq!(simplified.to_string(), expr.to_string());
1186 }
1187
1188 #[test]
1189 fn test_simplify_and_bounds_keep_stronger_when_weaker_seen_later() {
1190 let upper = col("a")
1192 .lt(Value::Int64(5))
1193 .and(col("a").lt(Value::Int64(10)));
1194 assert_eq!(simplify_and_bounds(upper).to_string(), "a < 5");
1195
1196 let lower = col("a")
1198 .gt(Value::Int64(10))
1199 .and(col("a").gt(Value::Int64(5)));
1200 assert_eq!(simplify_and_bounds(lower).to_string(), "a > 10");
1201 }
1202
1203 #[test]
1204 fn test_internal_helpers_uncovered_branches() {
1205 assert!(fold_and_exprs(vec![]).is_none());
1207
1208 let mut out = Vec::new();
1210 let or_expr = PartitionExpr::new(
1211 Operand::Expr(col("a").lt(Value::Int64(10))),
1212 RestrictedOp::Or,
1213 Operand::Expr(col("a").gt_eq(Value::Int64(20))),
1214 );
1215 assert!(!collect_and_atoms(&or_expr, &mut out));
1216
1217 let value_value = PartitionExpr::new(
1219 Operand::Value(Value::Int64(1)),
1220 RestrictedOp::Eq,
1221 Operand::Value(Value::Int64(2)),
1222 );
1223 assert!(atom_col_op_val(&value_value).is_none());
1224 }
1225
1226 #[test]
1227 fn test_split_rejects_or_in_base_expr() {
1228 let base = PartitionExpr::new(
1230 Operand::Expr(col("a").lt(Value::Int64(10))),
1231 RestrictedOp::Or,
1232 Operand::Expr(
1233 col("a")
1234 .gt_eq(Value::Int64(20))
1235 .and(col("a").lt(Value::Int64(30))),
1236 ),
1237 );
1238 let split = col("a").lt(Value::Int64(5));
1240
1241 let result = split_partition_expr(base, split);
1242 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
1243 }
1244
1245 #[test]
1246 fn test_split_rejects_or_in_split_expr() {
1247 let base = col("a").lt(Value::Int64(10));
1249 let split = PartitionExpr::new(
1251 Operand::Expr(col("a").lt(Value::Int64(5))),
1252 RestrictedOp::Or,
1253 Operand::Expr(
1254 col("a")
1255 .gt_eq(Value::Int64(8))
1256 .and(col("a").lt(Value::Int64(9))),
1257 ),
1258 );
1259
1260 let result = split_partition_expr(base, split);
1261 assert_eq!(result.unwrap_err(), ExprSplitDegradeReason::UnsupportedType);
1262 }
1263}