1use std::collections::HashMap;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use datafusion_expr::Operator;
33use datafusion_physical_expr::PhysicalExpr;
34use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
35use datatypes::arrow::datatypes::Schema;
36use datatypes::value::{OrderedF64, OrderedFloat, Value};
37
38use crate::error;
39use crate::error::Result;
40use crate::expr::{Operand, PartitionExpr, RestrictedOp};
41
42const ZERO: OrderedF64 = OrderedFloat(0.0f64);
43pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
44pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
45
46#[allow(unused)]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct AtomicExpr {
50 pub nucleons: Vec<NucleonExpr>,
52 pub source_expr_index: usize,
55}
56
57impl AtomicExpr {
58 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
59 let mut exprs = Vec::with_capacity(self.nucleons.len());
60 for nucleon in &self.nucleons {
61 exprs.push(nucleon.to_physical_expr(schema));
62 }
63 let result: Arc<dyn PhysicalExpr> = exprs
64 .into_iter()
65 .reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r)))
66 .unwrap();
67 result
68 }
69}
70
71impl PartialOrd for AtomicExpr {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.nucleons.cmp(&other.nucleons))
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
81pub struct NucleonExpr {
82 column: String,
83 op: GluonOp,
84 value: OrderedF64,
86}
87
88impl NucleonExpr {
89 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
90 Arc::new(BinaryExpr::new(
91 col(&self.column, schema).unwrap(),
92 self.op.to_operator(),
93 lit(*self.value.as_ref()),
94 ))
95 }
96
97 pub fn column(&self) -> &str {
99 &self.column
100 }
101
102 pub fn value(&self) -> OrderedF64 {
104 self.value
105 }
106
107 pub fn op(&self) -> &GluonOp {
109 &self.op
110 }
111
112 pub fn new(column: impl Into<String>, op: GluonOp, value: OrderedF64) -> Self {
113 Self {
114 column: column.into(),
115 op,
116 value,
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub enum GluonOp {
127 Eq,
128 NotEq,
129 Lt,
130 LtEq,
131 Gt,
132 GtEq,
133}
134
135impl GluonOp {
136 pub fn to_operator(&self) -> Operator {
137 match self {
138 GluonOp::Eq => Operator::Eq,
139 GluonOp::NotEq => Operator::NotEq,
140 GluonOp::Lt => Operator::Lt,
141 GluonOp::LtEq => Operator::LtEq,
142 GluonOp::Gt => Operator::Gt,
143 GluonOp::GtEq => Operator::GtEq,
144 }
145 }
146}
147
148#[allow(unused)]
152pub struct Collider<'a> {
153 source_exprs: &'a [PartitionExpr],
154
155 pub atomic_exprs: Vec<AtomicExpr>,
156 pub normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
160}
161
162impl<'a> Collider<'a> {
163 pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
164 let mut values: HashMap<String, Vec<Value>> = HashMap::new();
166 for expr in source_exprs {
167 Self::collect_column_values_from_expr(expr, &mut values)?;
168 }
169
170 let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
172 HashMap::with_capacity(values.len());
173 for (column, mut column_values) in values {
174 column_values.sort_unstable();
175 column_values.dedup(); #[allow(clippy::mutable_key_type)]
179 let mut value_map = HashMap::with_capacity(column_values.len());
180 let mut start_value = ZERO;
181 for value in column_values {
182 value_map.insert(value, start_value);
183 start_value += NORMALIZE_STEP;
184 }
185 normalized_values.insert(column, value_map);
186 }
187
188 let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
190 for (index, expr) in source_exprs.iter().enumerate() {
191 Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
192 }
193 for expr in &mut atomic_exprs {
195 expr.nucleons.sort_unstable();
196 }
197
198 let normalized_values = normalized_values
200 .into_iter()
201 .map(|(col, values)| {
202 let mut values = values.into_iter().collect::<Vec<_>>();
203 values.sort_unstable_by_key(|(_, v)| *v);
204 (col, values)
205 })
206 .collect();
207
208 Ok(Self {
209 source_exprs,
210 atomic_exprs,
211 normalized_values,
212 })
213 }
214
215 fn collect_column_values_from_expr(
217 expr: &PartitionExpr,
218 values: &mut HashMap<String, Vec<Value>>,
219 ) -> Result<()> {
220 match (&*expr.lhs, &*expr.rhs) {
222 (Operand::Column(col), Operand::Value(val))
223 | (Operand::Value(val), Operand::Column(col)) => {
224 values.entry(col.clone()).or_default().push(val.clone());
225 Ok(())
226 }
227 (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
228 Self::collect_column_values_from_expr(left_expr, values)?;
229 Self::collect_column_values_from_expr(right_expr, values)
230 }
231 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
233 }
234 }
235
236 fn collide_expr(
242 expr: &PartitionExpr,
243 index: usize,
244 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
245 result: &mut Vec<AtomicExpr>,
246 ) -> Result<()> {
247 match expr.op {
248 RestrictedOp::Or => {
249 match &*expr.lhs {
253 Operand::Expr(left_expr) => {
254 Self::collide_expr(left_expr, index, normalized_values, result)?;
255 }
256 _ => {
257 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
260 }
261 }
262
263 match &*expr.rhs {
265 Operand::Expr(right_expr) => {
266 Self::collide_expr(right_expr, index, normalized_values, result)?;
267 }
268 _ => {
269 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
272 }
273 }
274 }
275 RestrictedOp::And => {
276 let mut nucleons = Vec::new();
278 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
279
280 result.push(AtomicExpr {
281 nucleons,
282 source_expr_index: index,
283 });
284 }
285 _ => {
286 let mut nucleons = Vec::new();
288 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
289
290 result.push(AtomicExpr {
291 nucleons,
292 source_expr_index: index,
293 });
294 }
295 }
296 Ok(())
297 }
298
299 fn collect_nucleons_from_expr(
301 expr: &PartitionExpr,
302 nucleons: &mut Vec<NucleonExpr>,
303 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
304 ) -> Result<()> {
305 match expr.op {
306 RestrictedOp::And => {
307 Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
309 Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
310 }
311 _ => {
312 nucleons.push(Self::try_create_nucleon(
314 &expr.lhs,
315 &expr.op,
316 &expr.rhs,
317 normalized_values,
318 )?);
319 }
320 }
321 Ok(())
322 }
323
324 fn collect_nucleons_from_operand(
326 operand: &Operand,
327 nucleons: &mut Vec<NucleonExpr>,
328 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
329 ) -> Result<()> {
330 match operand {
331 Operand::Expr(expr) => {
332 Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
333 }
334 _ => {
335 error::NoExprOperandSnafu {
337 operand: operand.clone(),
338 }
339 .fail()
340 }
341 }
342 }
343
344 fn try_create_nucleon(
346 lhs: &Operand,
347 op: &RestrictedOp,
348 rhs: &Operand,
349 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
350 ) -> Result<NucleonExpr> {
351 let gluon_op = match op {
352 RestrictedOp::Eq => GluonOp::Eq,
353 RestrictedOp::NotEq => GluonOp::NotEq,
354 RestrictedOp::Lt => GluonOp::Lt,
355 RestrictedOp::LtEq => GluonOp::LtEq,
356 RestrictedOp::Gt => GluonOp::Gt,
357 RestrictedOp::GtEq => GluonOp::GtEq,
358 RestrictedOp::And | RestrictedOp::Or => {
359 return error::UnexpectedSnafu {
361 err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
362 }
363 .fail();
364 }
365 };
366
367 match (lhs, rhs) {
368 (Operand::Column(col), Operand::Value(val)) => {
369 if let Some(column_values) = normalized_values.get(col)
370 && let Some(&normalized_val) = column_values.get(val)
371 {
372 return Ok(NucleonExpr {
373 column: col.clone(),
374 op: gluon_op,
375 value: normalized_val,
376 });
377 }
378 }
379 (Operand::Value(val), Operand::Column(col)) => {
380 if let Some(column_values) = normalized_values.get(col)
381 && let Some(&normalized_val) = column_values.get(val)
382 {
383 let flipped_op = match gluon_op {
385 GluonOp::Lt => GluonOp::Gt,
386 GluonOp::LtEq => GluonOp::GtEq,
387 GluonOp::Gt => GluonOp::Lt,
388 GluonOp::GtEq => GluonOp::LtEq,
389 op => op, };
391 return Ok(NucleonExpr {
392 column: col.clone(),
393 op: flipped_op,
394 value: normalized_val,
395 });
396 }
397 }
398 _ => {}
399 }
400
401 error::InvalidExprSnafu {
403 expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
404 }
405 .fail()
406 }
407}
408
409#[cfg(test)]
410mod test {
411 use super::*;
412 use crate::expr::col;
413
414 #[test]
415 fn test_collider_basic_value_normalization() {
416 let exprs = vec![
418 col("age").eq(Value::UInt32(25)),
420 col("age").eq(Value::UInt32(30)),
421 col("age").eq(Value::UInt32(25)), col("name").eq(Value::String("alice".into())),
424 col("name").eq(Value::String("bob".into())),
425 col("active").eq(Value::Boolean(true)),
427 col("active").eq(Value::Boolean(false)),
428 col("score").eq(Value::Float64(OrderedFloat(95.5))),
430 col("score").eq(Value::Float64(OrderedFloat(87.2))),
431 ];
432
433 let collider = Collider::new(&exprs).expect("Failed to create collider");
434
435 assert_eq!(collider.normalized_values.len(), 4);
437
438 let age_values = &collider.normalized_values["age"];
440 assert_eq!(age_values.len(), 2);
441 assert_eq!(
442 age_values,
443 &[
444 (Value::UInt32(25), OrderedFloat(0.0f64)),
445 (Value::UInt32(30), OrderedFloat(1.0f64))
446 ]
447 );
448
449 let name_values = &collider.normalized_values["name"];
451 assert_eq!(name_values.len(), 2);
452 assert_eq!(
453 name_values,
454 &[
455 (Value::String("alice".into()), OrderedFloat(0.0f64)),
456 (Value::String("bob".into()), OrderedFloat(1.0f64))
457 ]
458 );
459
460 let active_values = &collider.normalized_values["active"];
462 assert_eq!(active_values.len(), 2);
463 assert_eq!(
464 active_values,
465 &[
466 (Value::Boolean(false), OrderedFloat(0.0f64)),
467 (Value::Boolean(true), OrderedFloat(1.0f64))
468 ]
469 );
470
471 let score_values = &collider.normalized_values["score"];
473 assert_eq!(score_values.len(), 2);
474 assert_eq!(
475 score_values,
476 &[
477 (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
478 (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
479 ]
480 );
481 }
482
483 #[test]
484 fn test_collider_simple_expressions() {
485 let exprs = vec![col("id").eq(Value::UInt32(1))];
487
488 let collider = Collider::new(&exprs).unwrap();
489 assert_eq!(collider.atomic_exprs.len(), 1);
490 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
491 assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
492
493 let exprs = vec![
495 col("id")
496 .eq(Value::UInt32(1))
497 .and(col("status").eq(Value::String("active".into()))),
498 ];
499
500 let collider = Collider::new(&exprs).unwrap();
501 assert_eq!(collider.atomic_exprs.len(), 1);
502 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
503
504 let expr = PartitionExpr::new(
506 Operand::Expr(col("id").eq(Value::UInt32(1))),
507 RestrictedOp::Or,
508 Operand::Expr(col("id").eq(Value::UInt32(2))),
509 );
510 let exprs = vec![expr];
511
512 let collider = Collider::new(&exprs).unwrap();
513 assert_eq!(collider.atomic_exprs.len(), 2);
514 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
515 assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
516 }
517
518 #[test]
519 fn test_collider_complex_nested_expressions() {
520 let branch1 = col("id")
522 .eq(Value::UInt32(1))
523 .and(col("status").eq(Value::String("active".into())));
524 let branch2 = col("id")
525 .eq(Value::UInt32(2))
526 .and(col("status").eq(Value::String("inactive".into())));
527 let branch3 = col("id").eq(Value::UInt32(3));
528
529 let expr = PartitionExpr::new(
530 Operand::Expr(PartitionExpr::new(
531 Operand::Expr(branch1),
532 RestrictedOp::Or,
533 Operand::Expr(branch2),
534 )),
535 RestrictedOp::Or,
536 Operand::Expr(branch3),
537 );
538
539 let exprs = vec![expr];
540 let collider = Collider::new(&exprs).unwrap();
541
542 assert_eq!(collider.atomic_exprs.len(), 3);
543
544 let total_nucleons: usize = collider
545 .atomic_exprs
546 .iter()
547 .map(|ae| ae.nucleons.len())
548 .sum();
549 assert_eq!(total_nucleons, 5);
550 }
551
552 #[test]
553 fn test_collider_deep_nesting() {
554 let expr = col("a")
556 .eq(Value::UInt32(1))
557 .and(col("b").eq(Value::UInt32(2)))
558 .and(col("c").eq(Value::UInt32(3)))
559 .and(col("d").eq(Value::UInt32(4)));
560
561 let exprs = vec![expr];
562 let collider = Collider::new(&exprs).unwrap();
563
564 assert_eq!(collider.atomic_exprs.len(), 1);
565 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
566
567 for nucleon in &collider.atomic_exprs[0].nucleons {
569 assert_eq!(nucleon.op, GluonOp::Eq);
570 }
571 }
572
573 #[test]
574 fn test_collider_multiple_expressions() {
575 let exprs = vec![
577 col("id").eq(Value::UInt32(1)),
578 col("name").eq(Value::String("alice".into())),
579 col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
580 ];
581
582 let collider = Collider::new(&exprs).unwrap();
583
584 assert_eq!(collider.atomic_exprs.len(), 3);
586
587 for atomic_expr in &collider.atomic_exprs {
589 assert_eq!(atomic_expr.nucleons.len(), 1);
590 }
591
592 let indices: Vec<usize> = collider
594 .atomic_exprs
595 .iter()
596 .map(|ae| ae.source_expr_index)
597 .collect();
598 assert!(indices.contains(&0));
599 assert!(indices.contains(&1));
600 assert!(indices.contains(&2));
601 }
602
603 #[test]
604 fn test_collider_value_column_order() {
605 let expr1 = PartitionExpr::new(
607 Operand::Value(Value::UInt32(10)),
608 RestrictedOp::Lt,
609 Operand::Column("age".to_string()),
610 ); let expr2 = PartitionExpr::new(
613 Operand::Value(Value::UInt32(20)),
614 RestrictedOp::GtEq,
615 Operand::Column("score".to_string()),
616 ); let exprs = vec![expr1, expr2];
619 let collider = Collider::new(&exprs).unwrap();
620
621 assert_eq!(collider.atomic_exprs.len(), 2);
622
623 let operations: Vec<GluonOp> = collider
625 .atomic_exprs
626 .iter()
627 .map(|ae| ae.nucleons[0].op.clone())
628 .collect();
629
630 assert!(operations.contains(&GluonOp::Gt)); assert!(operations.contains(&GluonOp::LtEq)); }
633
634 #[test]
635 fn test_collider_complex_or_with_different_columns() {
636 let branch1 = col("name")
638 .eq(Value::String("alice".into()))
639 .and(col("age").eq(Value::UInt32(25)));
640
641 let branch2 = col("status")
642 .eq(Value::String("active".into()))
643 .and(PartitionExpr::new(
644 Operand::Column("score".to_string()),
645 RestrictedOp::Gt,
646 Operand::Value(Value::Float64(OrderedFloat(90.0))),
647 ));
648
649 let expr = PartitionExpr::new(
650 Operand::Expr(branch1),
651 RestrictedOp::Or,
652 Operand::Expr(branch2),
653 );
654
655 let exprs = vec![expr];
656 let collider = Collider::new(&exprs).expect("Failed to create collider");
657
658 assert_eq!(collider.atomic_exprs.len(), 2);
660
661 for atomic_expr in &collider.atomic_exprs {
663 assert_eq!(atomic_expr.nucleons.len(), 2);
664 }
665
666 assert_eq!(collider.normalized_values.len(), 4);
668 assert!(collider.normalized_values.contains_key("name"));
669 assert!(collider.normalized_values.contains_key("age"));
670 assert!(collider.normalized_values.contains_key("status"));
671 assert!(collider.normalized_values.contains_key("score"));
672 }
673
674 #[test]
675 fn test_try_create_nucleon_edge_cases() {
676 let normalized_values = HashMap::new();
677
678 let result = Collider::try_create_nucleon(
680 &col("a"),
681 &RestrictedOp::And,
682 &Operand::Value(Value::UInt32(1)),
683 &normalized_values,
684 );
685 assert!(result.is_err());
686
687 let result = Collider::try_create_nucleon(
689 &col("a"),
690 &RestrictedOp::Or,
691 &Operand::Value(Value::UInt32(1)),
692 &normalized_values,
693 );
694 assert!(result.is_err());
695
696 let result = Collider::try_create_nucleon(
698 &col("a"),
699 &RestrictedOp::Eq,
700 &col("b"),
701 &normalized_values,
702 );
703 assert!(result.is_err());
704
705 let result = Collider::try_create_nucleon(
707 &Operand::Value(Value::UInt32(1)),
708 &RestrictedOp::Eq,
709 &Operand::Value(Value::UInt32(2)),
710 &normalized_values,
711 );
712 assert!(result.is_err());
713
714 let exprs = vec![];
716 let collider = Collider::new(&exprs).unwrap();
717 assert_eq!(collider.atomic_exprs.len(), 0);
718 assert_eq!(collider.normalized_values.len(), 0);
719 }
720}