1use std::collections::HashMap;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use datafusion_expr::Operator;
33use datafusion_physical_expr::PhysicalExpr;
34use datafusion_physical_expr::expressions::{BinaryExpr, col, lit};
35use datatypes::arrow::datatypes::Schema;
36use datatypes::value::{OrderedF64, OrderedFloat, Value};
37
38use crate::error;
39use crate::error::Result;
40use crate::expr::{Operand, PartitionExpr, RestrictedOp};
41
42const ZERO: OrderedF64 = OrderedFloat(0.0f64);
43pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
44pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
45
46#[allow(unused)]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct AtomicExpr {
50 pub nucleons: Vec<NucleonExpr>,
52 pub source_expr_index: usize,
55}
56
57impl AtomicExpr {
58 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
59 let mut exprs = Vec::with_capacity(self.nucleons.len());
60 for nucleon in &self.nucleons {
61 exprs.push(nucleon.to_physical_expr(schema));
62 }
63 let result: Arc<dyn PhysicalExpr> = exprs
64 .into_iter()
65 .reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r)))
66 .unwrap();
67 result
68 }
69}
70
71impl PartialOrd for AtomicExpr {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.nucleons.cmp(&other.nucleons))
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
81pub struct NucleonExpr {
82 column: String,
83 op: GluonOp,
84 value: OrderedF64,
86}
87
88impl NucleonExpr {
89 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
90 Arc::new(BinaryExpr::new(
91 col(&self.column, schema).unwrap(),
92 self.op.to_operator(),
93 lit(*self.value.as_ref()),
94 ))
95 }
96
97 pub fn column(&self) -> &str {
99 &self.column
100 }
101
102 pub fn value(&self) -> OrderedF64 {
104 self.value
105 }
106
107 pub fn op(&self) -> &GluonOp {
109 &self.op
110 }
111
112 pub fn new(column: impl Into<String>, op: GluonOp, value: OrderedF64) -> Self {
113 Self {
114 column: column.into(),
115 op,
116 value,
117 }
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
126pub enum GluonOp {
127 Eq,
128 NotEq,
129 Lt,
130 LtEq,
131 Gt,
132 GtEq,
133}
134
135impl GluonOp {
136 pub fn to_operator(&self) -> Operator {
137 match self {
138 GluonOp::Eq => Operator::Eq,
139 GluonOp::NotEq => Operator::NotEq,
140 GluonOp::Lt => Operator::Lt,
141 GluonOp::LtEq => Operator::LtEq,
142 GluonOp::Gt => Operator::Gt,
143 GluonOp::GtEq => Operator::GtEq,
144 }
145 }
146}
147
148#[allow(unused)]
152pub struct Collider<'a> {
153 source_exprs: &'a [PartitionExpr],
154
155 pub atomic_exprs: Vec<AtomicExpr>,
156 pub normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
160}
161
162impl<'a> Collider<'a> {
163 pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
164 let mut values: HashMap<String, Vec<Value>> = HashMap::new();
166 for expr in source_exprs {
167 Self::collect_column_values_from_expr(expr, &mut values)?;
168 }
169
170 let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
172 HashMap::with_capacity(values.len());
173 for (column, mut column_values) in values {
174 column_values.sort_unstable();
175 column_values.dedup(); let mut value_map = HashMap::with_capacity(column_values.len());
177 let mut start_value = ZERO;
178 for value in column_values {
179 value_map.insert(value, start_value);
180 start_value += NORMALIZE_STEP;
181 }
182 normalized_values.insert(column, value_map);
183 }
184
185 let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
187 for (index, expr) in source_exprs.iter().enumerate() {
188 Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
189 }
190 for expr in &mut atomic_exprs {
192 expr.nucleons.sort_unstable();
193 }
194
195 let normalized_values = normalized_values
197 .into_iter()
198 .map(|(col, values)| {
199 let mut values = values.into_iter().collect::<Vec<_>>();
200 values.sort_unstable_by_key(|(_, v)| *v);
201 (col, values)
202 })
203 .collect();
204
205 Ok(Self {
206 source_exprs,
207 atomic_exprs,
208 normalized_values,
209 })
210 }
211
212 fn collect_column_values_from_expr(
214 expr: &PartitionExpr,
215 values: &mut HashMap<String, Vec<Value>>,
216 ) -> Result<()> {
217 match (&*expr.lhs, &*expr.rhs) {
219 (Operand::Column(col), Operand::Value(val))
220 | (Operand::Value(val), Operand::Column(col)) => {
221 values.entry(col.clone()).or_default().push(val.clone());
222 Ok(())
223 }
224 (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
225 Self::collect_column_values_from_expr(left_expr, values)?;
226 Self::collect_column_values_from_expr(right_expr, values)
227 }
228 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
230 }
231 }
232
233 fn collide_expr(
239 expr: &PartitionExpr,
240 index: usize,
241 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
242 result: &mut Vec<AtomicExpr>,
243 ) -> Result<()> {
244 match expr.op {
245 RestrictedOp::Or => {
246 match &*expr.lhs {
250 Operand::Expr(left_expr) => {
251 Self::collide_expr(left_expr, index, normalized_values, result)?;
252 }
253 _ => {
254 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
257 }
258 }
259
260 match &*expr.rhs {
262 Operand::Expr(right_expr) => {
263 Self::collide_expr(right_expr, index, normalized_values, result)?;
264 }
265 _ => {
266 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
269 }
270 }
271 }
272 RestrictedOp::And => {
273 let mut nucleons = Vec::new();
275 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
276
277 result.push(AtomicExpr {
278 nucleons,
279 source_expr_index: index,
280 });
281 }
282 _ => {
283 let mut nucleons = Vec::new();
285 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
286
287 result.push(AtomicExpr {
288 nucleons,
289 source_expr_index: index,
290 });
291 }
292 }
293 Ok(())
294 }
295
296 fn collect_nucleons_from_expr(
298 expr: &PartitionExpr,
299 nucleons: &mut Vec<NucleonExpr>,
300 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
301 ) -> Result<()> {
302 match expr.op {
303 RestrictedOp::And => {
304 Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
306 Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
307 }
308 _ => {
309 nucleons.push(Self::try_create_nucleon(
311 &expr.lhs,
312 &expr.op,
313 &expr.rhs,
314 normalized_values,
315 )?);
316 }
317 }
318 Ok(())
319 }
320
321 fn collect_nucleons_from_operand(
323 operand: &Operand,
324 nucleons: &mut Vec<NucleonExpr>,
325 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
326 ) -> Result<()> {
327 match operand {
328 Operand::Expr(expr) => {
329 Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
330 }
331 _ => {
332 error::NoExprOperandSnafu {
334 operand: operand.clone(),
335 }
336 .fail()
337 }
338 }
339 }
340
341 fn try_create_nucleon(
343 lhs: &Operand,
344 op: &RestrictedOp,
345 rhs: &Operand,
346 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
347 ) -> Result<NucleonExpr> {
348 let gluon_op = match op {
349 RestrictedOp::Eq => GluonOp::Eq,
350 RestrictedOp::NotEq => GluonOp::NotEq,
351 RestrictedOp::Lt => GluonOp::Lt,
352 RestrictedOp::LtEq => GluonOp::LtEq,
353 RestrictedOp::Gt => GluonOp::Gt,
354 RestrictedOp::GtEq => GluonOp::GtEq,
355 RestrictedOp::And | RestrictedOp::Or => {
356 return error::UnexpectedSnafu {
358 err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
359 }
360 .fail();
361 }
362 };
363
364 match (lhs, rhs) {
365 (Operand::Column(col), Operand::Value(val)) => {
366 if let Some(column_values) = normalized_values.get(col)
367 && let Some(&normalized_val) = column_values.get(val)
368 {
369 return Ok(NucleonExpr {
370 column: col.clone(),
371 op: gluon_op,
372 value: normalized_val,
373 });
374 }
375 }
376 (Operand::Value(val), Operand::Column(col)) => {
377 if let Some(column_values) = normalized_values.get(col)
378 && let Some(&normalized_val) = column_values.get(val)
379 {
380 let flipped_op = match gluon_op {
382 GluonOp::Lt => GluonOp::Gt,
383 GluonOp::LtEq => GluonOp::GtEq,
384 GluonOp::Gt => GluonOp::Lt,
385 GluonOp::GtEq => GluonOp::LtEq,
386 op => op, };
388 return Ok(NucleonExpr {
389 column: col.clone(),
390 op: flipped_op,
391 value: normalized_val,
392 });
393 }
394 }
395 _ => {}
396 }
397
398 error::InvalidExprSnafu {
400 expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
401 }
402 .fail()
403 }
404}
405
406#[cfg(test)]
407mod test {
408 use super::*;
409 use crate::expr::col;
410
411 #[test]
412 fn test_collider_basic_value_normalization() {
413 let exprs = vec![
415 col("age").eq(Value::UInt32(25)),
417 col("age").eq(Value::UInt32(30)),
418 col("age").eq(Value::UInt32(25)), col("name").eq(Value::String("alice".into())),
421 col("name").eq(Value::String("bob".into())),
422 col("active").eq(Value::Boolean(true)),
424 col("active").eq(Value::Boolean(false)),
425 col("score").eq(Value::Float64(OrderedFloat(95.5))),
427 col("score").eq(Value::Float64(OrderedFloat(87.2))),
428 ];
429
430 let collider = Collider::new(&exprs).expect("Failed to create collider");
431
432 assert_eq!(collider.normalized_values.len(), 4);
434
435 let age_values = &collider.normalized_values["age"];
437 assert_eq!(age_values.len(), 2);
438 assert_eq!(
439 age_values,
440 &[
441 (Value::UInt32(25), OrderedFloat(0.0f64)),
442 (Value::UInt32(30), OrderedFloat(1.0f64))
443 ]
444 );
445
446 let name_values = &collider.normalized_values["name"];
448 assert_eq!(name_values.len(), 2);
449 assert_eq!(
450 name_values,
451 &[
452 (Value::String("alice".into()), OrderedFloat(0.0f64)),
453 (Value::String("bob".into()), OrderedFloat(1.0f64))
454 ]
455 );
456
457 let active_values = &collider.normalized_values["active"];
459 assert_eq!(active_values.len(), 2);
460 assert_eq!(
461 active_values,
462 &[
463 (Value::Boolean(false), OrderedFloat(0.0f64)),
464 (Value::Boolean(true), OrderedFloat(1.0f64))
465 ]
466 );
467
468 let score_values = &collider.normalized_values["score"];
470 assert_eq!(score_values.len(), 2);
471 assert_eq!(
472 score_values,
473 &[
474 (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
475 (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
476 ]
477 );
478 }
479
480 #[test]
481 fn test_collider_simple_expressions() {
482 let exprs = vec![col("id").eq(Value::UInt32(1))];
484
485 let collider = Collider::new(&exprs).unwrap();
486 assert_eq!(collider.atomic_exprs.len(), 1);
487 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
488 assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
489
490 let exprs = vec![
492 col("id")
493 .eq(Value::UInt32(1))
494 .and(col("status").eq(Value::String("active".into()))),
495 ];
496
497 let collider = Collider::new(&exprs).unwrap();
498 assert_eq!(collider.atomic_exprs.len(), 1);
499 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
500
501 let expr = PartitionExpr::new(
503 Operand::Expr(col("id").eq(Value::UInt32(1))),
504 RestrictedOp::Or,
505 Operand::Expr(col("id").eq(Value::UInt32(2))),
506 );
507 let exprs = vec![expr];
508
509 let collider = Collider::new(&exprs).unwrap();
510 assert_eq!(collider.atomic_exprs.len(), 2);
511 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
512 assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
513 }
514
515 #[test]
516 fn test_collider_complex_nested_expressions() {
517 let branch1 = col("id")
519 .eq(Value::UInt32(1))
520 .and(col("status").eq(Value::String("active".into())));
521 let branch2 = col("id")
522 .eq(Value::UInt32(2))
523 .and(col("status").eq(Value::String("inactive".into())));
524 let branch3 = col("id").eq(Value::UInt32(3));
525
526 let expr = PartitionExpr::new(
527 Operand::Expr(PartitionExpr::new(
528 Operand::Expr(branch1),
529 RestrictedOp::Or,
530 Operand::Expr(branch2),
531 )),
532 RestrictedOp::Or,
533 Operand::Expr(branch3),
534 );
535
536 let exprs = vec![expr];
537 let collider = Collider::new(&exprs).unwrap();
538
539 assert_eq!(collider.atomic_exprs.len(), 3);
540
541 let total_nucleons: usize = collider
542 .atomic_exprs
543 .iter()
544 .map(|ae| ae.nucleons.len())
545 .sum();
546 assert_eq!(total_nucleons, 5);
547 }
548
549 #[test]
550 fn test_collider_deep_nesting() {
551 let expr = col("a")
553 .eq(Value::UInt32(1))
554 .and(col("b").eq(Value::UInt32(2)))
555 .and(col("c").eq(Value::UInt32(3)))
556 .and(col("d").eq(Value::UInt32(4)));
557
558 let exprs = vec![expr];
559 let collider = Collider::new(&exprs).unwrap();
560
561 assert_eq!(collider.atomic_exprs.len(), 1);
562 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
563
564 for nucleon in &collider.atomic_exprs[0].nucleons {
566 assert_eq!(nucleon.op, GluonOp::Eq);
567 }
568 }
569
570 #[test]
571 fn test_collider_multiple_expressions() {
572 let exprs = vec![
574 col("id").eq(Value::UInt32(1)),
575 col("name").eq(Value::String("alice".into())),
576 col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
577 ];
578
579 let collider = Collider::new(&exprs).unwrap();
580
581 assert_eq!(collider.atomic_exprs.len(), 3);
583
584 for atomic_expr in &collider.atomic_exprs {
586 assert_eq!(atomic_expr.nucleons.len(), 1);
587 }
588
589 let indices: Vec<usize> = collider
591 .atomic_exprs
592 .iter()
593 .map(|ae| ae.source_expr_index)
594 .collect();
595 assert!(indices.contains(&0));
596 assert!(indices.contains(&1));
597 assert!(indices.contains(&2));
598 }
599
600 #[test]
601 fn test_collider_value_column_order() {
602 let expr1 = PartitionExpr::new(
604 Operand::Value(Value::UInt32(10)),
605 RestrictedOp::Lt,
606 Operand::Column("age".to_string()),
607 ); let expr2 = PartitionExpr::new(
610 Operand::Value(Value::UInt32(20)),
611 RestrictedOp::GtEq,
612 Operand::Column("score".to_string()),
613 ); let exprs = vec![expr1, expr2];
616 let collider = Collider::new(&exprs).unwrap();
617
618 assert_eq!(collider.atomic_exprs.len(), 2);
619
620 let operations: Vec<GluonOp> = collider
622 .atomic_exprs
623 .iter()
624 .map(|ae| ae.nucleons[0].op.clone())
625 .collect();
626
627 assert!(operations.contains(&GluonOp::Gt)); assert!(operations.contains(&GluonOp::LtEq)); }
630
631 #[test]
632 fn test_collider_complex_or_with_different_columns() {
633 let branch1 = col("name")
635 .eq(Value::String("alice".into()))
636 .and(col("age").eq(Value::UInt32(25)));
637
638 let branch2 = col("status")
639 .eq(Value::String("active".into()))
640 .and(PartitionExpr::new(
641 Operand::Column("score".to_string()),
642 RestrictedOp::Gt,
643 Operand::Value(Value::Float64(OrderedFloat(90.0))),
644 ));
645
646 let expr = PartitionExpr::new(
647 Operand::Expr(branch1),
648 RestrictedOp::Or,
649 Operand::Expr(branch2),
650 );
651
652 let exprs = vec![expr];
653 let collider = Collider::new(&exprs).expect("Failed to create collider");
654
655 assert_eq!(collider.atomic_exprs.len(), 2);
657
658 for atomic_expr in &collider.atomic_exprs {
660 assert_eq!(atomic_expr.nucleons.len(), 2);
661 }
662
663 assert_eq!(collider.normalized_values.len(), 4);
665 assert!(collider.normalized_values.contains_key("name"));
666 assert!(collider.normalized_values.contains_key("age"));
667 assert!(collider.normalized_values.contains_key("status"));
668 assert!(collider.normalized_values.contains_key("score"));
669 }
670
671 #[test]
672 fn test_try_create_nucleon_edge_cases() {
673 let normalized_values = HashMap::new();
674
675 let result = Collider::try_create_nucleon(
677 &col("a"),
678 &RestrictedOp::And,
679 &Operand::Value(Value::UInt32(1)),
680 &normalized_values,
681 );
682 assert!(result.is_err());
683
684 let result = Collider::try_create_nucleon(
686 &col("a"),
687 &RestrictedOp::Or,
688 &Operand::Value(Value::UInt32(1)),
689 &normalized_values,
690 );
691 assert!(result.is_err());
692
693 let result = Collider::try_create_nucleon(
695 &col("a"),
696 &RestrictedOp::Eq,
697 &col("b"),
698 &normalized_values,
699 );
700 assert!(result.is_err());
701
702 let result = Collider::try_create_nucleon(
704 &Operand::Value(Value::UInt32(1)),
705 &RestrictedOp::Eq,
706 &Operand::Value(Value::UInt32(2)),
707 &normalized_values,
708 );
709 assert!(result.is_err());
710
711 let exprs = vec![];
713 let collider = Collider::new(&exprs).unwrap();
714 assert_eq!(collider.atomic_exprs.len(), 0);
715 assert_eq!(collider.normalized_values.len(), 0);
716 }
717}