1use std::collections::HashMap;
29use std::fmt::Debug;
30
31use datatypes::value::{OrderedF64, OrderedFloat, Value};
32
33use crate::error;
34use crate::error::Result;
35use crate::expr::{Operand, PartitionExpr, RestrictedOp};
36
37const ZERO: OrderedF64 = OrderedFloat(0.0f64);
38const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
39
40#[allow(unused)]
42pub(crate) struct AtomicExpr {
43 nucleons: Vec<NucleonExpr>,
45 source_expr_index: usize,
48}
49
50#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
54struct NucleonExpr {
55 column: String,
56 op: GluonOp,
57 value: OrderedF64,
59}
60
61#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
66enum GluonOp {
67 Eq,
68 NotEq,
69 Lt,
70 LtEq,
71 Gt,
72 GtEq,
73}
74
75#[allow(unused)]
79pub struct Collider<'a> {
80 source_exprs: &'a [PartitionExpr],
81
82 atomic_exprs: Vec<AtomicExpr>,
83 normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
87}
88
89impl<'a> Collider<'a> {
90 pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
91 let mut values: HashMap<String, Vec<Value>> = HashMap::new();
93 for expr in source_exprs {
94 Self::collect_column_values_from_expr(expr, &mut values)?;
95 }
96
97 let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
99 HashMap::with_capacity(values.len());
100 for (column, mut column_values) in values {
101 column_values.sort_unstable();
102 column_values.dedup(); let mut value_map = HashMap::with_capacity(column_values.len());
104 let mut start_value = ZERO;
105 for value in column_values {
106 value_map.insert(value, start_value);
107 start_value += NORMALIZE_STEP;
108 }
109 normalized_values.insert(column, value_map);
110 }
111
112 let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
114 for (index, expr) in source_exprs.iter().enumerate() {
115 Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
116 }
117
118 let normalized_values = normalized_values
120 .into_iter()
121 .map(|(col, values)| {
122 let mut values = values.into_iter().collect::<Vec<_>>();
123 values.sort_unstable_by_key(|(_, v)| *v);
124 (col, values)
125 })
126 .collect();
127
128 Ok(Self {
129 source_exprs,
130 atomic_exprs,
131 normalized_values,
132 })
133 }
134
135 fn collect_column_values_from_expr(
137 expr: &PartitionExpr,
138 values: &mut HashMap<String, Vec<Value>>,
139 ) -> Result<()> {
140 match (&*expr.lhs, &*expr.rhs) {
142 (Operand::Column(col), Operand::Value(val))
143 | (Operand::Value(val), Operand::Column(col)) => {
144 values.entry(col.clone()).or_default().push(val.clone());
145 Ok(())
146 }
147 (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
148 Self::collect_column_values_from_expr(left_expr, values)?;
149 Self::collect_column_values_from_expr(right_expr, values)
150 }
151 _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
153 }
154 }
155
156 fn collide_expr(
162 expr: &PartitionExpr,
163 index: usize,
164 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
165 result: &mut Vec<AtomicExpr>,
166 ) -> Result<()> {
167 match expr.op {
168 RestrictedOp::Or => {
169 match &*expr.lhs {
173 Operand::Expr(left_expr) => {
174 Self::collide_expr(left_expr, index, normalized_values, result)?;
175 }
176 _ => {
177 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
180 }
181 }
182
183 match &*expr.rhs {
185 Operand::Expr(right_expr) => {
186 Self::collide_expr(right_expr, index, normalized_values, result)?;
187 }
188 _ => {
189 return error::InvalidExprSnafu { expr: expr.clone() }.fail();
192 }
193 }
194 }
195 RestrictedOp::And => {
196 let mut nucleons = Vec::new();
198 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
199
200 result.push(AtomicExpr {
201 nucleons,
202 source_expr_index: index,
203 });
204 }
205 _ => {
206 let mut nucleons = Vec::new();
208 Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
209
210 result.push(AtomicExpr {
211 nucleons,
212 source_expr_index: index,
213 });
214 }
215 }
216 Ok(())
217 }
218
219 fn collect_nucleons_from_expr(
221 expr: &PartitionExpr,
222 nucleons: &mut Vec<NucleonExpr>,
223 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
224 ) -> Result<()> {
225 match expr.op {
226 RestrictedOp::And => {
227 Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
229 Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
230 }
231 _ => {
232 nucleons.push(Self::try_create_nucleon(
234 &expr.lhs,
235 &expr.op,
236 &expr.rhs,
237 normalized_values,
238 )?);
239 }
240 }
241 Ok(())
242 }
243
244 fn collect_nucleons_from_operand(
246 operand: &Operand,
247 nucleons: &mut Vec<NucleonExpr>,
248 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
249 ) -> Result<()> {
250 match operand {
251 Operand::Expr(expr) => {
252 Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
253 }
254 _ => {
255 error::NoExprOperandSnafu {
257 operand: operand.clone(),
258 }
259 .fail()
260 }
261 }
262 }
263
264 fn try_create_nucleon(
266 lhs: &Operand,
267 op: &RestrictedOp,
268 rhs: &Operand,
269 normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
270 ) -> Result<NucleonExpr> {
271 let gluon_op = match op {
272 RestrictedOp::Eq => GluonOp::Eq,
273 RestrictedOp::NotEq => GluonOp::NotEq,
274 RestrictedOp::Lt => GluonOp::Lt,
275 RestrictedOp::LtEq => GluonOp::LtEq,
276 RestrictedOp::Gt => GluonOp::Gt,
277 RestrictedOp::GtEq => GluonOp::GtEq,
278 RestrictedOp::And | RestrictedOp::Or => {
279 return error::UnexpectedSnafu {
281 err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
282 }
283 .fail();
284 }
285 };
286
287 match (lhs, rhs) {
288 (Operand::Column(col), Operand::Value(val)) => {
289 if let Some(column_values) = normalized_values.get(col) {
290 if let Some(&normalized_val) = column_values.get(val) {
291 return Ok(NucleonExpr {
292 column: col.clone(),
293 op: gluon_op,
294 value: normalized_val,
295 });
296 }
297 }
298 }
299 (Operand::Value(val), Operand::Column(col)) => {
300 if let Some(column_values) = normalized_values.get(col) {
301 if let Some(&normalized_val) = column_values.get(val) {
302 let flipped_op = match gluon_op {
304 GluonOp::Lt => GluonOp::Gt,
305 GluonOp::LtEq => GluonOp::GtEq,
306 GluonOp::Gt => GluonOp::Lt,
307 GluonOp::GtEq => GluonOp::LtEq,
308 op => op, };
310 return Ok(NucleonExpr {
311 column: col.clone(),
312 op: flipped_op,
313 value: normalized_val,
314 });
315 }
316 }
317 }
318 _ => {}
319 }
320
321 error::InvalidExprSnafu {
323 expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
324 }
325 .fail()
326 }
327}
328
329#[cfg(test)]
330mod test {
331 use super::*;
332 use crate::expr::col;
333
334 #[test]
335 fn test_collider_basic_value_normalization() {
336 let exprs = vec![
338 col("age").eq(Value::UInt32(25)),
340 col("age").eq(Value::UInt32(30)),
341 col("age").eq(Value::UInt32(25)), col("name").eq(Value::String("alice".into())),
344 col("name").eq(Value::String("bob".into())),
345 col("active").eq(Value::Boolean(true)),
347 col("active").eq(Value::Boolean(false)),
348 col("score").eq(Value::Float64(OrderedFloat(95.5))),
350 col("score").eq(Value::Float64(OrderedFloat(87.2))),
351 ];
352
353 let collider = Collider::new(&exprs).expect("Failed to create collider");
354
355 assert_eq!(collider.normalized_values.len(), 4);
357
358 let age_values = &collider.normalized_values["age"];
360 assert_eq!(age_values.len(), 2);
361 assert_eq!(
362 age_values,
363 &[
364 (Value::UInt32(25), OrderedFloat(0.0f64)),
365 (Value::UInt32(30), OrderedFloat(1.0f64))
366 ]
367 );
368
369 let name_values = &collider.normalized_values["name"];
371 assert_eq!(name_values.len(), 2);
372 assert_eq!(
373 name_values,
374 &[
375 (Value::String("alice".into()), OrderedFloat(0.0f64)),
376 (Value::String("bob".into()), OrderedFloat(1.0f64))
377 ]
378 );
379
380 let active_values = &collider.normalized_values["active"];
382 assert_eq!(active_values.len(), 2);
383 assert_eq!(
384 active_values,
385 &[
386 (Value::Boolean(false), OrderedFloat(0.0f64)),
387 (Value::Boolean(true), OrderedFloat(1.0f64))
388 ]
389 );
390
391 let score_values = &collider.normalized_values["score"];
393 assert_eq!(score_values.len(), 2);
394 assert_eq!(
395 score_values,
396 &[
397 (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
398 (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
399 ]
400 );
401 }
402
403 #[test]
404 fn test_collider_simple_expressions() {
405 let exprs = vec![col("id").eq(Value::UInt32(1))];
407
408 let collider = Collider::new(&exprs).unwrap();
409 assert_eq!(collider.atomic_exprs.len(), 1);
410 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
411 assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
412
413 let exprs = vec![col("id")
415 .eq(Value::UInt32(1))
416 .and(col("status").eq(Value::String("active".into())))];
417
418 let collider = Collider::new(&exprs).unwrap();
419 assert_eq!(collider.atomic_exprs.len(), 1);
420 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
421
422 let expr = PartitionExpr::new(
424 Operand::Expr(col("id").eq(Value::UInt32(1))),
425 RestrictedOp::Or,
426 Operand::Expr(col("id").eq(Value::UInt32(2))),
427 );
428 let exprs = vec![expr];
429
430 let collider = Collider::new(&exprs).unwrap();
431 assert_eq!(collider.atomic_exprs.len(), 2);
432 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
433 assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
434 }
435
436 #[test]
437 fn test_collider_complex_nested_expressions() {
438 let branch1 = col("id")
440 .eq(Value::UInt32(1))
441 .and(col("status").eq(Value::String("active".into())));
442 let branch2 = col("id")
443 .eq(Value::UInt32(2))
444 .and(col("status").eq(Value::String("inactive".into())));
445 let branch3 = col("id").eq(Value::UInt32(3));
446
447 let expr = PartitionExpr::new(
448 Operand::Expr(PartitionExpr::new(
449 Operand::Expr(branch1),
450 RestrictedOp::Or,
451 Operand::Expr(branch2),
452 )),
453 RestrictedOp::Or,
454 Operand::Expr(branch3),
455 );
456
457 let exprs = vec![expr];
458 let collider = Collider::new(&exprs).unwrap();
459
460 assert_eq!(collider.atomic_exprs.len(), 3);
461
462 let total_nucleons: usize = collider
463 .atomic_exprs
464 .iter()
465 .map(|ae| ae.nucleons.len())
466 .sum();
467 assert_eq!(total_nucleons, 5);
468 }
469
470 #[test]
471 fn test_collider_deep_nesting() {
472 let expr = col("a")
474 .eq(Value::UInt32(1))
475 .and(col("b").eq(Value::UInt32(2)))
476 .and(col("c").eq(Value::UInt32(3)))
477 .and(col("d").eq(Value::UInt32(4)));
478
479 let exprs = vec![expr];
480 let collider = Collider::new(&exprs).unwrap();
481
482 assert_eq!(collider.atomic_exprs.len(), 1);
483 assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
484
485 for nucleon in &collider.atomic_exprs[0].nucleons {
487 assert_eq!(nucleon.op, GluonOp::Eq);
488 }
489 }
490
491 #[test]
492 fn test_collider_multiple_expressions() {
493 let exprs = vec![
495 col("id").eq(Value::UInt32(1)),
496 col("name").eq(Value::String("alice".into())),
497 col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
498 ];
499
500 let collider = Collider::new(&exprs).unwrap();
501
502 assert_eq!(collider.atomic_exprs.len(), 3);
504
505 for atomic_expr in &collider.atomic_exprs {
507 assert_eq!(atomic_expr.nucleons.len(), 1);
508 }
509
510 let indices: Vec<usize> = collider
512 .atomic_exprs
513 .iter()
514 .map(|ae| ae.source_expr_index)
515 .collect();
516 assert!(indices.contains(&0));
517 assert!(indices.contains(&1));
518 assert!(indices.contains(&2));
519 }
520
521 #[test]
522 fn test_collider_value_column_order() {
523 let expr1 = PartitionExpr::new(
525 Operand::Value(Value::UInt32(10)),
526 RestrictedOp::Lt,
527 Operand::Column("age".to_string()),
528 ); let expr2 = PartitionExpr::new(
531 Operand::Value(Value::UInt32(20)),
532 RestrictedOp::GtEq,
533 Operand::Column("score".to_string()),
534 ); let exprs = vec![expr1, expr2];
537 let collider = Collider::new(&exprs).unwrap();
538
539 assert_eq!(collider.atomic_exprs.len(), 2);
540
541 let operations: Vec<GluonOp> = collider
543 .atomic_exprs
544 .iter()
545 .map(|ae| ae.nucleons[0].op.clone())
546 .collect();
547
548 assert!(operations.contains(&GluonOp::Gt)); assert!(operations.contains(&GluonOp::LtEq)); }
551
552 #[test]
553 fn test_collider_complex_or_with_different_columns() {
554 let branch1 = col("name")
556 .eq(Value::String("alice".into()))
557 .and(col("age").eq(Value::UInt32(25)));
558
559 let branch2 = col("status")
560 .eq(Value::String("active".into()))
561 .and(PartitionExpr::new(
562 Operand::Column("score".to_string()),
563 RestrictedOp::Gt,
564 Operand::Value(Value::Float64(OrderedFloat(90.0))),
565 ));
566
567 let expr = PartitionExpr::new(
568 Operand::Expr(branch1),
569 RestrictedOp::Or,
570 Operand::Expr(branch2),
571 );
572
573 let exprs = vec![expr];
574 let collider = Collider::new(&exprs).expect("Failed to create collider");
575
576 assert_eq!(collider.atomic_exprs.len(), 2);
578
579 for atomic_expr in &collider.atomic_exprs {
581 assert_eq!(atomic_expr.nucleons.len(), 2);
582 }
583
584 assert_eq!(collider.normalized_values.len(), 4);
586 assert!(collider.normalized_values.contains_key("name"));
587 assert!(collider.normalized_values.contains_key("age"));
588 assert!(collider.normalized_values.contains_key("status"));
589 assert!(collider.normalized_values.contains_key("score"));
590 }
591
592 #[test]
593 fn test_try_create_nucleon_edge_cases() {
594 let normalized_values = HashMap::new();
595
596 let result = Collider::try_create_nucleon(
598 &col("a"),
599 &RestrictedOp::And,
600 &Operand::Value(Value::UInt32(1)),
601 &normalized_values,
602 );
603 assert!(result.is_err());
604
605 let result = Collider::try_create_nucleon(
607 &col("a"),
608 &RestrictedOp::Or,
609 &Operand::Value(Value::UInt32(1)),
610 &normalized_values,
611 );
612 assert!(result.is_err());
613
614 let result = Collider::try_create_nucleon(
616 &col("a"),
617 &RestrictedOp::Eq,
618 &col("b"),
619 &normalized_values,
620 );
621 assert!(result.is_err());
622
623 let result = Collider::try_create_nucleon(
625 &Operand::Value(Value::UInt32(1)),
626 &RestrictedOp::Eq,
627 &Operand::Value(Value::UInt32(2)),
628 &normalized_values,
629 );
630 assert!(result.is_err());
631
632 let exprs = vec![];
634 let collider = Collider::new(&exprs).unwrap();
635 assert_eq!(collider.atomic_exprs.len(), 0);
636 assert_eq!(collider.normalized_values.len(), 0);
637 }
638}