1use std::collections::HashMap;
29use std::fmt::Debug;
30use std::sync::Arc;
31
32use datafusion_expr::Operator;
33use datafusion_physical_expr::expressions::{col, lit, BinaryExpr};
34use datafusion_physical_expr::PhysicalExpr;
35use datatypes::arrow::datatypes::Schema;
36use datatypes::value::{OrderedF64, OrderedFloat, Value};
37
38use crate::error;
39use crate::error::Result;
40use crate::expr::{Operand, PartitionExpr, RestrictedOp};
41
42const ZERO: OrderedF64 = OrderedFloat(0.0f64);
43pub(crate) const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
44pub(crate) const CHECK_STEP: OrderedF64 = OrderedFloat(0.5f64);
45
46#[allow(unused)]
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub(crate) struct AtomicExpr {
50 pub(crate) nucleons: Vec<NucleonExpr>,
52 pub(crate) source_expr_index: usize,
55}
56
57impl AtomicExpr {
58 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
59 let mut exprs = Vec::with_capacity(self.nucleons.len());
60 for nucleon in &self.nucleons {
61 exprs.push(nucleon.to_physical_expr(schema));
62 }
63 let result: Arc<dyn PhysicalExpr> = exprs
64 .into_iter()
65 .reduce(|l, r| Arc::new(BinaryExpr::new(l, Operator::And, r)))
66 .unwrap();
67 result
68 }
69}
70
71impl PartialOrd for AtomicExpr {
72 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
73 Some(self.nucleons.cmp(&other.nucleons))
74 }
75}
76
77#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
81pub(crate) struct NucleonExpr {
82 column: String,
83 op: GluonOp,
84 value: OrderedF64,
86}
87
88impl NucleonExpr {
89 pub fn to_physical_expr(&self, schema: &Schema) -> Arc<dyn PhysicalExpr> {
90 Arc::new(BinaryExpr::new(
91 col(&self.column, schema).unwrap(),
92 self.op.to_operator(),
93 lit(*self.value.as_ref()),
94 ))
95 }
96}
97
98#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
103enum GluonOp {
104 Eq,
105 NotEq,
106 Lt,
107 LtEq,
108 Gt,
109 GtEq,
110}
111
112impl GluonOp {
113 pub fn to_operator(&self) -> Operator {
114 match self {
115 GluonOp::Eq => Operator::Eq,
116 GluonOp::NotEq => Operator::NotEq,
117 GluonOp::Lt => Operator::Lt,
118 GluonOp::LtEq => Operator::LtEq,
119 GluonOp::Gt => Operator::Gt,
120 GluonOp::GtEq => Operator::GtEq,
121 }
122 }
123}
124
125#[allow(unused)]
129pub struct Collider<'a> {
130 source_exprs: &'a [PartitionExpr],
131
132 pub(crate) atomic_exprs: Vec<AtomicExpr>,
133 pub(crate) normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
137}
138
139impl<'a> Collider<'a> {
140 pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
141 let mut values: HashMap<String, Vec<Value>> = HashMap::new();
143 for expr in source_exprs {
144 Self::collect_column_values_from_expr(expr, &mut values)?;
145 }
146
147 let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
149 HashMap::with_capacity(values.len());
150 for (column, mut column_values) in values {
151 column_values.sort_unstable();
152 column_values.dedup(); let mut value_map = HashMap::with_capacity(column_values.len());
154 let mut start_value = ZERO;
155 for value in column_values {
156 value_map.insert(value, start_value);
157 start_value += NORMALIZE_STEP;
158 }
159 normalized_values.insert(column, value_map);
160 }
161
162 let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
164 for (index, expr) in source_exprs.iter().enumerate() {
165 Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
166 }
167 for expr in &mut atomic_exprs {
169 expr.nucleons.sort_unstable();
170 }
171
172 let normalized_values = normalized_values
174 .into_iter()
175 .map(|(col, values)| {
176 let mut values = values.into_iter().collect::<Vec<_>>();
177 values.sort_unstable_by_key(|(_, v)| *v);
178 (col, values)
179 })
180 .collect();
181
182 Ok(Self {
183 source_exprs,
184 atomic_exprs,
185 normalized_values,
186 })
187 }
188
189 fn collect_column_values_from_expr(
191 expr: &PartitionExpr,
192 values: &mut HashMap<String, Vec<Value>>,
193 ) -> Result<()> {
194 match (&*expr.lhs, &*expr.rhs) {
196 (Operand::Column(col), Operand::Value(val))
197 | (Operand::Value(val), Operand::Column(col)) => {
198 values.entry(col.clone()).or_default().push(val.clone());
199 Ok(())
200 }
201 (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
202 Self::collect_column_values_from_expr(left_expr, values)?;
203 Self::collect_column_values_from_expr(right_expr, values)
204 }
205 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
207 }
208 }
209
210 fn collide_expr(
216 expr: &PartitionExpr,
217 index: usize,
218 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
219 result: &mut Vec<AtomicExpr>,
220 ) -> Result<()> {
221 match expr.op {
222 RestrictedOp::Or => {
223 match &*expr.lhs {
227 Operand::Expr(left_expr) => {
228 Self::collide_expr(left_expr, index, normalized_values, result)?;
229 }
230 _ => {
231 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
234 }
235 }
236
237 match &*expr.rhs {
239 Operand::Expr(right_expr) => {
240 Self::collide_expr(right_expr, index, normalized_values, result)?;
241 }
242 _ => {
243 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
246 }
247 }
248 }
249 RestrictedOp::And => {
250 let mut nucleons = Vec::new();
252 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
253
254 result.push(AtomicExpr {
255 nucleons,
256 source_expr_index: index,
257 });
258 }
259 _ => {
260 let mut nucleons = Vec::new();
262 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
263
264 result.push(AtomicExpr {
265 nucleons,
266 source_expr_index: index,
267 });
268 }
269 }
270 Ok(())
271 }
272
273 fn collect_nucleons_from_expr(
275 expr: &PartitionExpr,
276 nucleons: &mut Vec<NucleonExpr>,
277 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
278 ) -> Result<()> {
279 match expr.op {
280 RestrictedOp::And => {
281 Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
283 Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
284 }
285 _ => {
286 nucleons.push(Self::try_create_nucleon(
288 &expr.lhs,
289 &expr.op,
290 &expr.rhs,
291 normalized_values,
292 )?);
293 }
294 }
295 Ok(())
296 }
297
298 fn collect_nucleons_from_operand(
300 operand: &Operand,
301 nucleons: &mut Vec<NucleonExpr>,
302 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
303 ) -> Result<()> {
304 match operand {
305 Operand::Expr(expr) => {
306 Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
307 }
308 _ => {
309 error::NoExprOperandSnafu {
311 operand: operand.clone(),
312 }
313 .fail()
314 }
315 }
316 }
317
318 fn try_create_nucleon(
320 lhs: &Operand,
321 op: &RestrictedOp,
322 rhs: &Operand,
323 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
324 ) -> Result<NucleonExpr> {
325 let gluon_op = match op {
326 RestrictedOp::Eq => GluonOp::Eq,
327 RestrictedOp::NotEq => GluonOp::NotEq,
328 RestrictedOp::Lt => GluonOp::Lt,
329 RestrictedOp::LtEq => GluonOp::LtEq,
330 RestrictedOp::Gt => GluonOp::Gt,
331 RestrictedOp::GtEq => GluonOp::GtEq,
332 RestrictedOp::And | RestrictedOp::Or => {
333 return error::UnexpectedSnafu {
335 err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
336 }
337 .fail();
338 }
339 };
340
341 match (lhs, rhs) {
342 (Operand::Column(col), Operand::Value(val)) => {
343 if let Some(column_values) = normalized_values.get(col) {
344 if let Some(&normalized_val) = column_values.get(val) {
345 return Ok(NucleonExpr {
346 column: col.clone(),
347 op: gluon_op,
348 value: normalized_val,
349 });
350 }
351 }
352 }
353 (Operand::Value(val), Operand::Column(col)) => {
354 if let Some(column_values) = normalized_values.get(col) {
355 if let Some(&normalized_val) = column_values.get(val) {
356 let flipped_op = match gluon_op {
358 GluonOp::Lt => GluonOp::Gt,
359 GluonOp::LtEq => GluonOp::GtEq,
360 GluonOp::Gt => GluonOp::Lt,
361 GluonOp::GtEq => GluonOp::LtEq,
362 op => op, };
364 return Ok(NucleonExpr {
365 column: col.clone(),
366 op: flipped_op,
367 value: normalized_val,
368 });
369 }
370 }
371 }
372 _ => {}
373 }
374
375 error::InvalidExprSnafu {
377 expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
378 }
379 .fail()
380 }
381}
382
383#[cfg(test)]
384mod test {
385 use super::*;
386 use crate::expr::col;
387
388 #[test]
389 fn test_collider_basic_value_normalization() {
390 let exprs = vec![
392 col("age").eq(Value::UInt32(25)),
394 col("age").eq(Value::UInt32(30)),
395 col("age").eq(Value::UInt32(25)), col("name").eq(Value::String("alice".into())),
398 col("name").eq(Value::String("bob".into())),
399 col("active").eq(Value::Boolean(true)),
401 col("active").eq(Value::Boolean(false)),
402 col("score").eq(Value::Float64(OrderedFloat(95.5))),
404 col("score").eq(Value::Float64(OrderedFloat(87.2))),
405 ];
406
407 let collider = Collider::new(&exprs).expect("Failed to create collider");
408
409 assert_eq!(collider.normalized_values.len(), 4);
411
412 let age_values = &collider.normalized_values["age"];
414 assert_eq!(age_values.len(), 2);
415 assert_eq!(
416 age_values,
417 &[
418 (Value::UInt32(25), OrderedFloat(0.0f64)),
419 (Value::UInt32(30), OrderedFloat(1.0f64))
420 ]
421 );
422
423 let name_values = &collider.normalized_values["name"];
425 assert_eq!(name_values.len(), 2);
426 assert_eq!(
427 name_values,
428 &[
429 (Value::String("alice".into()), OrderedFloat(0.0f64)),
430 (Value::String("bob".into()), OrderedFloat(1.0f64))
431 ]
432 );
433
434 let active_values = &collider.normalized_values["active"];
436 assert_eq!(active_values.len(), 2);
437 assert_eq!(
438 active_values,
439 &[
440 (Value::Boolean(false), OrderedFloat(0.0f64)),
441 (Value::Boolean(true), OrderedFloat(1.0f64))
442 ]
443 );
444
445 let score_values = &collider.normalized_values["score"];
447 assert_eq!(score_values.len(), 2);
448 assert_eq!(
449 score_values,
450 &[
451 (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
452 (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
453 ]
454 );
455 }
456
457 #[test]
458 fn test_collider_simple_expressions() {
459 let exprs = vec![col("id").eq(Value::UInt32(1))];
461
462 let collider = Collider::new(&exprs).unwrap();
463 assert_eq!(collider.atomic_exprs.len(), 1);
464 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
465 assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
466
467 let exprs = vec![col("id")
469 .eq(Value::UInt32(1))
470 .and(col("status").eq(Value::String("active".into())))];
471
472 let collider = Collider::new(&exprs).unwrap();
473 assert_eq!(collider.atomic_exprs.len(), 1);
474 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
475
476 let expr = PartitionExpr::new(
478 Operand::Expr(col("id").eq(Value::UInt32(1))),
479 RestrictedOp::Or,
480 Operand::Expr(col("id").eq(Value::UInt32(2))),
481 );
482 let exprs = vec![expr];
483
484 let collider = Collider::new(&exprs).unwrap();
485 assert_eq!(collider.atomic_exprs.len(), 2);
486 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
487 assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
488 }
489
490 #[test]
491 fn test_collider_complex_nested_expressions() {
492 let branch1 = col("id")
494 .eq(Value::UInt32(1))
495 .and(col("status").eq(Value::String("active".into())));
496 let branch2 = col("id")
497 .eq(Value::UInt32(2))
498 .and(col("status").eq(Value::String("inactive".into())));
499 let branch3 = col("id").eq(Value::UInt32(3));
500
501 let expr = PartitionExpr::new(
502 Operand::Expr(PartitionExpr::new(
503 Operand::Expr(branch1),
504 RestrictedOp::Or,
505 Operand::Expr(branch2),
506 )),
507 RestrictedOp::Or,
508 Operand::Expr(branch3),
509 );
510
511 let exprs = vec![expr];
512 let collider = Collider::new(&exprs).unwrap();
513
514 assert_eq!(collider.atomic_exprs.len(), 3);
515
516 let total_nucleons: usize = collider
517 .atomic_exprs
518 .iter()
519 .map(|ae| ae.nucleons.len())
520 .sum();
521 assert_eq!(total_nucleons, 5);
522 }
523
524 #[test]
525 fn test_collider_deep_nesting() {
526 let expr = col("a")
528 .eq(Value::UInt32(1))
529 .and(col("b").eq(Value::UInt32(2)))
530 .and(col("c").eq(Value::UInt32(3)))
531 .and(col("d").eq(Value::UInt32(4)));
532
533 let exprs = vec![expr];
534 let collider = Collider::new(&exprs).unwrap();
535
536 assert_eq!(collider.atomic_exprs.len(), 1);
537 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
538
539 for nucleon in &collider.atomic_exprs[0].nucleons {
541 assert_eq!(nucleon.op, GluonOp::Eq);
542 }
543 }
544
545 #[test]
546 fn test_collider_multiple_expressions() {
547 let exprs = vec![
549 col("id").eq(Value::UInt32(1)),
550 col("name").eq(Value::String("alice".into())),
551 col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
552 ];
553
554 let collider = Collider::new(&exprs).unwrap();
555
556 assert_eq!(collider.atomic_exprs.len(), 3);
558
559 for atomic_expr in &collider.atomic_exprs {
561 assert_eq!(atomic_expr.nucleons.len(), 1);
562 }
563
564 let indices: Vec<usize> = collider
566 .atomic_exprs
567 .iter()
568 .map(|ae| ae.source_expr_index)
569 .collect();
570 assert!(indices.contains(&0));
571 assert!(indices.contains(&1));
572 assert!(indices.contains(&2));
573 }
574
575 #[test]
576 fn test_collider_value_column_order() {
577 let expr1 = PartitionExpr::new(
579 Operand::Value(Value::UInt32(10)),
580 RestrictedOp::Lt,
581 Operand::Column("age".to_string()),
582 ); let expr2 = PartitionExpr::new(
585 Operand::Value(Value::UInt32(20)),
586 RestrictedOp::GtEq,
587 Operand::Column("score".to_string()),
588 ); let exprs = vec![expr1, expr2];
591 let collider = Collider::new(&exprs).unwrap();
592
593 assert_eq!(collider.atomic_exprs.len(), 2);
594
595 let operations: Vec<GluonOp> = collider
597 .atomic_exprs
598 .iter()
599 .map(|ae| ae.nucleons[0].op.clone())
600 .collect();
601
602 assert!(operations.contains(&GluonOp::Gt)); assert!(operations.contains(&GluonOp::LtEq)); }
605
606 #[test]
607 fn test_collider_complex_or_with_different_columns() {
608 let branch1 = col("name")
610 .eq(Value::String("alice".into()))
611 .and(col("age").eq(Value::UInt32(25)));
612
613 let branch2 = col("status")
614 .eq(Value::String("active".into()))
615 .and(PartitionExpr::new(
616 Operand::Column("score".to_string()),
617 RestrictedOp::Gt,
618 Operand::Value(Value::Float64(OrderedFloat(90.0))),
619 ));
620
621 let expr = PartitionExpr::new(
622 Operand::Expr(branch1),
623 RestrictedOp::Or,
624 Operand::Expr(branch2),
625 );
626
627 let exprs = vec![expr];
628 let collider = Collider::new(&exprs).expect("Failed to create collider");
629
630 assert_eq!(collider.atomic_exprs.len(), 2);
632
633 for atomic_expr in &collider.atomic_exprs {
635 assert_eq!(atomic_expr.nucleons.len(), 2);
636 }
637
638 assert_eq!(collider.normalized_values.len(), 4);
640 assert!(collider.normalized_values.contains_key("name"));
641 assert!(collider.normalized_values.contains_key("age"));
642 assert!(collider.normalized_values.contains_key("status"));
643 assert!(collider.normalized_values.contains_key("score"));
644 }
645
646 #[test]
647 fn test_try_create_nucleon_edge_cases() {
648 let normalized_values = HashMap::new();
649
650 let result = Collider::try_create_nucleon(
652 &col("a"),
653 &RestrictedOp::And,
654 &Operand::Value(Value::UInt32(1)),
655 &normalized_values,
656 );
657 assert!(result.is_err());
658
659 let result = Collider::try_create_nucleon(
661 &col("a"),
662 &RestrictedOp::Or,
663 &Operand::Value(Value::UInt32(1)),
664 &normalized_values,
665 );
666 assert!(result.is_err());
667
668 let result = Collider::try_create_nucleon(
670 &col("a"),
671 &RestrictedOp::Eq,
672 &col("b"),
673 &normalized_values,
674 );
675 assert!(result.is_err());
676
677 let result = Collider::try_create_nucleon(
679 &Operand::Value(Value::UInt32(1)),
680 &RestrictedOp::Eq,
681 &Operand::Value(Value::UInt32(2)),
682 &normalized_values,
683 );
684 assert!(result.is_err());
685
686 let exprs = vec![];
688 let collider = Collider::new(&exprs).unwrap();
689 assert_eq!(collider.atomic_exprs.len(), 0);
690 assert_eq!(collider.normalized_values.len(), 0);
691 }
692}