1use std::collections::HashMap;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use datafusion_expr::Operator;
33use datafusion_physical_expr::expressions::{col, lit, BinaryExpr};
34use datafusion_physical_expr::PhysicalExpr;
35use datatypes::arrow::datatypes::Schema;
36use datatypes::value::{OrderedF64, OrderedFloat, Value};
37
38use crate::error;
39use crate::error::Result;
40use crate::expr::{Operand, PartitionExpr, RestrictedOp};
41
42const ZERO: OrderedF64 = OrderedFloat(0.0f64);
43pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
44pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
45
46#[allow(unused)]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct AtomicExpr {
50 pub nucleons: Vec<NucleonExpr>,
52 pub source_expr_index: usize,
55}
56
57impl AtomicExpr {
58 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
59 let mut exprs = Vec::with_capacity(self.nucleons.len());
60 for nucleon in &self.nucleons {
61 exprs.push(nucleon.to_physical_expr(schema));
62 }
63 let result: Arc<dyn PhysicalExpr> = exprs
64 .into_iter()
65 .reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r)))
66 .unwrap();
67 result
68 }
69}
70
71impl PartialOrd for AtomicExpr {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.nucleons.cmp(&other.nucleons))
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
81pub struct NucleonExpr {
82 column: String,
83 op: GluonOp,
84 value: OrderedF64,
86}
87
88impl NucleonExpr {
89 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
90 Arc::new(BinaryExpr::new(
91 col(&self.column, schema).unwrap(),
92 self.op.to_operator(),
93 lit(*self.value.as_ref()),
94 ))
95 }
96
97 pub fn column(&self) -> &str {
99 &self.column
100 }
101
102 pub fn value(&self) -> OrderedF64 {
104 self.value
105 }
106
107 pub fn op(&self) -> &GluonOp {
109 &self.op
110 }
111
112 pub fn new(column: impl Into<String>, op: GluonOp, value: OrderedF64) -> Self {
113 Self {
114 column: column.into(),
115 op,
116 value,
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub enum GluonOp {
127 Eq,
128 NotEq,
129 Lt,
130 LtEq,
131 Gt,
132 GtEq,
133}
134
135impl GluonOp {
136 pub fn to_operator(&self) -> Operator {
137 match self {
138 GluonOp::Eq => Operator::Eq,
139 GluonOp::NotEq => Operator::NotEq,
140 GluonOp::Lt => Operator::Lt,
141 GluonOp::LtEq => Operator::LtEq,
142 GluonOp::Gt => Operator::Gt,
143 GluonOp::GtEq => Operator::GtEq,
144 }
145 }
146}
147
148#[allow(unused)]
152pub struct Collider<'a> {
153 source_exprs: &'a [PartitionExpr],
154
155 pub atomic_exprs: Vec<AtomicExpr>,
156 pub normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
160}
161
162impl<'a> Collider<'a> {
163 pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
164 let mut values: HashMap<String, Vec<Value>> = HashMap::new();
166 for expr in source_exprs {
167 Self::collect_column_values_from_expr(expr, &mut values)?;
168 }
169
170 let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
172 HashMap::with_capacity(values.len());
173 for (column, mut column_values) in values {
174 column_values.sort_unstable();
175 column_values.dedup(); let mut value_map = HashMap::with_capacity(column_values.len());
177 let mut start_value = ZERO;
178 for value in column_values {
179 value_map.insert(value, start_value);
180 start_value += NORMALIZE_STEP;
181 }
182 normalized_values.insert(column, value_map);
183 }
184
185 let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
187 for (index, expr) in source_exprs.iter().enumerate() {
188 Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
189 }
190 for expr in &mut atomic_exprs {
192 expr.nucleons.sort_unstable();
193 }
194
195 let normalized_values = normalized_values
197 .into_iter()
198 .map(|(col, values)| {
199 let mut values = values.into_iter().collect::<Vec<_>>();
200 values.sort_unstable_by_key(|(_, v)| *v);
201 (col, values)
202 })
203 .collect();
204
205 Ok(Self {
206 source_exprs,
207 atomic_exprs,
208 normalized_values,
209 })
210 }
211
212 fn collect_column_values_from_expr(
214 expr: &PartitionExpr,
215 values: &mut HashMap<String, Vec<Value>>,
216 ) -> Result<()> {
217 match (&*expr.lhs, &*expr.rhs) {
219 (Operand::Column(col), Operand::Value(val))
220 | (Operand::Value(val), Operand::Column(col)) => {
221 values.entry(col.clone()).or_default().push(val.clone());
222 Ok(())
223 }
224 (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
225 Self::collect_column_values_from_expr(left_expr, values)?;
226 Self::collect_column_values_from_expr(right_expr, values)
227 }
228 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
230 }
231 }
232
233 fn collide_expr(
239 expr: &PartitionExpr,
240 index: usize,
241 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
242 result: &mut Vec<AtomicExpr>,
243 ) -> Result<()> {
244 match expr.op {
245 RestrictedOp::Or => {
246 match &*expr.lhs {
250 Operand::Expr(left_expr) => {
251 Self::collide_expr(left_expr, index, normalized_values, result)?;
252 }
253 _ => {
254 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
257 }
258 }
259
260 match &*expr.rhs {
262 Operand::Expr(right_expr) => {
263 Self::collide_expr(right_expr, index, normalized_values, result)?;
264 }
265 _ => {
266 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
269 }
270 }
271 }
272 RestrictedOp::And => {
273 let mut nucleons = Vec::new();
275 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
276
277 result.push(AtomicExpr {
278 nucleons,
279 source_expr_index: index,
280 });
281 }
282 _ => {
283 let mut nucleons = Vec::new();
285 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
286
287 result.push(AtomicExpr {
288 nucleons,
289 source_expr_index: index,
290 });
291 }
292 }
293 Ok(())
294 }
295
296 fn collect_nucleons_from_expr(
298 expr: &PartitionExpr,
299 nucleons: &mut Vec<NucleonExpr>,
300 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
301 ) -> Result<()> {
302 match expr.op {
303 RestrictedOp::And => {
304 Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
306 Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
307 }
308 _ => {
309 nucleons.push(Self::try_create_nucleon(
311 &expr.lhs,
312 &expr.op,
313 &expr.rhs,
314 normalized_values,
315 )?);
316 }
317 }
318 Ok(())
319 }
320
321 fn collect_nucleons_from_operand(
323 operand: &Operand,
324 nucleons: &mut Vec<NucleonExpr>,
325 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
326 ) -> Result<()> {
327 match operand {
328 Operand::Expr(expr) => {
329 Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
330 }
331 _ => {
332 error::NoExprOperandSnafu {
334 operand: operand.clone(),
335 }
336 .fail()
337 }
338 }
339 }
340
341 fn try_create_nucleon(
343 lhs: &Operand,
344 op: &RestrictedOp,
345 rhs: &Operand,
346 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
347 ) -> Result<NucleonExpr> {
348 let gluon_op = match op {
349 RestrictedOp::Eq => GluonOp::Eq,
350 RestrictedOp::NotEq => GluonOp::NotEq,
351 RestrictedOp::Lt => GluonOp::Lt,
352 RestrictedOp::LtEq => GluonOp::LtEq,
353 RestrictedOp::Gt => GluonOp::Gt,
354 RestrictedOp::GtEq => GluonOp::GtEq,
355 RestrictedOp::And | RestrictedOp::Or => {
356 return error::UnexpectedSnafu {
358 err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
359 }
360 .fail();
361 }
362 };
363
364 match (lhs, rhs) {
365 (Operand::Column(col), Operand::Value(val)) => {
366 if let Some(column_values) = normalized_values.get(col) {
367 if let Some(&normalized_val) = column_values.get(val) {
368 return Ok(NucleonExpr {
369 column: col.clone(),
370 op: gluon_op,
371 value: normalized_val,
372 });
373 }
374 }
375 }
376 (Operand::Value(val), Operand::Column(col)) => {
377 if let Some(column_values) = normalized_values.get(col) {
378 if let Some(&normalized_val) = column_values.get(val) {
379 let flipped_op = match gluon_op {
381 GluonOp::Lt => GluonOp::Gt,
382 GluonOp::LtEq => GluonOp::GtEq,
383 GluonOp::Gt => GluonOp::Lt,
384 GluonOp::GtEq => GluonOp::LtEq,
385 op => op, };
387 return Ok(NucleonExpr {
388 column: col.clone(),
389 op: flipped_op,
390 value: normalized_val,
391 });
392 }
393 }
394 }
395 _ => {}
396 }
397
398 error::InvalidExprSnafu {
400 expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
401 }
402 .fail()
403 }
404}
405
406#[cfg(test)]
407mod test {
408 use super::*;
409 use crate::expr::col;
410
411 #[test]
412 fn test_collider_basic_value_normalization() {
413 let exprs = vec![
415 col("age").eq(Value::UInt32(25)),
417 col("age").eq(Value::UInt32(30)),
418 col("age").eq(Value::UInt32(25)), col("name").eq(Value::String("alice".into())),
421 col("name").eq(Value::String("bob".into())),
422 col("active").eq(Value::Boolean(true)),
424 col("active").eq(Value::Boolean(false)),
425 col("score").eq(Value::Float64(OrderedFloat(95.5))),
427 col("score").eq(Value::Float64(OrderedFloat(87.2))),
428 ];
429
430 let collider = Collider::new(&exprs).expect("Failed to create collider");
431
432 assert_eq!(collider.normalized_values.len(), 4);
434
435 let age_values = &collider.normalized_values["age"];
437 assert_eq!(age_values.len(), 2);
438 assert_eq!(
439 age_values,
440 &[
441 (Value::UInt32(25), OrderedFloat(0.0f64)),
442 (Value::UInt32(30), OrderedFloat(1.0f64))
443 ]
444 );
445
446 let name_values = &collider.normalized_values["name"];
448 assert_eq!(name_values.len(), 2);
449 assert_eq!(
450 name_values,
451 &[
452 (Value::String("alice".into()), OrderedFloat(0.0f64)),
453 (Value::String("bob".into()), OrderedFloat(1.0f64))
454 ]
455 );
456
457 let active_values = &collider.normalized_values["active"];
459 assert_eq!(active_values.len(), 2);
460 assert_eq!(
461 active_values,
462 &[
463 (Value::Boolean(false), OrderedFloat(0.0f64)),
464 (Value::Boolean(true), OrderedFloat(1.0f64))
465 ]
466 );
467
468 let score_values = &collider.normalized_values["score"];
470 assert_eq!(score_values.len(), 2);
471 assert_eq!(
472 score_values,
473 &[
474 (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
475 (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
476 ]
477 );
478 }
479
480 #[test]
481 fn test_collider_simple_expressions() {
482 let exprs = vec![col("id").eq(Value::UInt32(1))];
484
485 let collider = Collider::new(&exprs).unwrap();
486 assert_eq!(collider.atomic_exprs.len(), 1);
487 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
488 assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
489
490 let exprs = vec![col("id")
492 .eq(Value::UInt32(1))
493 .and(col("status").eq(Value::String("active".into())))];
494
495 let collider = Collider::new(&exprs).unwrap();
496 assert_eq!(collider.atomic_exprs.len(), 1);
497 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
498
499 let expr = PartitionExpr::new(
501 Operand::Expr(col("id").eq(Value::UInt32(1))),
502 RestrictedOp::Or,
503 Operand::Expr(col("id").eq(Value::UInt32(2))),
504 );
505 let exprs = vec![expr];
506
507 let collider = Collider::new(&exprs).unwrap();
508 assert_eq!(collider.atomic_exprs.len(), 2);
509 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
510 assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
511 }
512
513 #[test]
514 fn test_collider_complex_nested_expressions() {
515 let branch1 = col("id")
517 .eq(Value::UInt32(1))
518 .and(col("status").eq(Value::String("active".into())));
519 let branch2 = col("id")
520 .eq(Value::UInt32(2))
521 .and(col("status").eq(Value::String("inactive".into())));
522 let branch3 = col("id").eq(Value::UInt32(3));
523
524 let expr = PartitionExpr::new(
525 Operand::Expr(PartitionExpr::new(
526 Operand::Expr(branch1),
527 RestrictedOp::Or,
528 Operand::Expr(branch2),
529 )),
530 RestrictedOp::Or,
531 Operand::Expr(branch3),
532 );
533
534 let exprs = vec![expr];
535 let collider = Collider::new(&exprs).unwrap();
536
537 assert_eq!(collider.atomic_exprs.len(), 3);
538
539 let total_nucleons: usize = collider
540 .atomic_exprs
541 .iter()
542 .map(|ae| ae.nucleons.len())
543 .sum();
544 assert_eq!(total_nucleons, 5);
545 }
546
547 #[test]
548 fn test_collider_deep_nesting() {
549 let expr = col("a")
551 .eq(Value::UInt32(1))
552 .and(col("b").eq(Value::UInt32(2)))
553 .and(col("c").eq(Value::UInt32(3)))
554 .and(col("d").eq(Value::UInt32(4)));
555
556 let exprs = vec![expr];
557 let collider = Collider::new(&exprs).unwrap();
558
559 assert_eq!(collider.atomic_exprs.len(), 1);
560 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
561
562 for nucleon in &collider.atomic_exprs[0].nucleons {
564 assert_eq!(nucleon.op, GluonOp::Eq);
565 }
566 }
567
568 #[test]
569 fn test_collider_multiple_expressions() {
570 let exprs = vec![
572 col("id").eq(Value::UInt32(1)),
573 col("name").eq(Value::String("alice".into())),
574 col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
575 ];
576
577 let collider = Collider::new(&exprs).unwrap();
578
579 assert_eq!(collider.atomic_exprs.len(), 3);
581
582 for atomic_expr in &collider.atomic_exprs {
584 assert_eq!(atomic_expr.nucleons.len(), 1);
585 }
586
587 let indices: Vec<usize> = collider
589 .atomic_exprs
590 .iter()
591 .map(|ae| ae.source_expr_index)
592 .collect();
593 assert!(indices.contains(&0));
594 assert!(indices.contains(&1));
595 assert!(indices.contains(&2));
596 }
597
598 #[test]
599 fn test_collider_value_column_order() {
600 let expr1 = PartitionExpr::new(
602 Operand::Value(Value::UInt32(10)),
603 RestrictedOp::Lt,
604 Operand::Column("age".to_string()),
605 ); let expr2 = PartitionExpr::new(
608 Operand::Value(Value::UInt32(20)),
609 RestrictedOp::GtEq,
610 Operand::Column("score".to_string()),
611 ); let exprs = vec![expr1, expr2];
614 let collider = Collider::new(&exprs).unwrap();
615
616 assert_eq!(collider.atomic_exprs.len(), 2);
617
618 let operations: Vec<GluonOp> = collider
620 .atomic_exprs
621 .iter()
622 .map(|ae| ae.nucleons[0].op.clone())
623 .collect();
624
625 assert!(operations.contains(&GluonOp::Gt)); assert!(operations.contains(&GluonOp::LtEq)); }
628
629 #[test]
630 fn test_collider_complex_or_with_different_columns() {
631 let branch1 = col("name")
633 .eq(Value::String("alice".into()))
634 .and(col("age").eq(Value::UInt32(25)));
635
636 let branch2 = col("status")
637 .eq(Value::String("active".into()))
638 .and(PartitionExpr::new(
639 Operand::Column("score".to_string()),
640 RestrictedOp::Gt,
641 Operand::Value(Value::Float64(OrderedFloat(90.0))),
642 ));
643
644 let expr = PartitionExpr::new(
645 Operand::Expr(branch1),
646 RestrictedOp::Or,
647 Operand::Expr(branch2),
648 );
649
650 let exprs = vec![expr];
651 let collider = Collider::new(&exprs).expect("Failed to create collider");
652
653 assert_eq!(collider.atomic_exprs.len(), 2);
655
656 for atomic_expr in &collider.atomic_exprs {
658 assert_eq!(atomic_expr.nucleons.len(), 2);
659 }
660
661 assert_eq!(collider.normalized_values.len(), 4);
663 assert!(collider.normalized_values.contains_key("name"));
664 assert!(collider.normalized_values.contains_key("age"));
665 assert!(collider.normalized_values.contains_key("status"));
666 assert!(collider.normalized_values.contains_key("score"));
667 }
668
669 #[test]
670 fn test_try_create_nucleon_edge_cases() {
671 let normalized_values = HashMap::new();
672
673 let result = Collider::try_create_nucleon(
675 &col("a"),
676 &RestrictedOp::And,
677 &Operand::Value(Value::UInt32(1)),
678 &normalized_values,
679 );
680 assert!(result.is_err());
681
682 let result = Collider::try_create_nucleon(
684 &col("a"),
685 &RestrictedOp::Or,
686 &Operand::Value(Value::UInt32(1)),
687 &normalized_values,
688 );
689 assert!(result.is_err());
690
691 let result = Collider::try_create_nucleon(
693 &col("a"),
694 &RestrictedOp::Eq,
695 &col("b"),
696 &normalized_values,
697 );
698 assert!(result.is_err());
699
700 let result = Collider::try_create_nucleon(
702 &Operand::Value(Value::UInt32(1)),
703 &RestrictedOp::Eq,
704 &Operand::Value(Value::UInt32(2)),
705 &normalized_values,
706 );
707 assert!(result.is_err());
708
709 let exprs = vec![];
711 let collider = Collider::new(&exprs).unwrap();
712 assert_eq!(collider.atomic_exprs.len(), 0);
713 assert_eq!(collider.normalized_values.len(), 0);
714 }
715}