partition/
collider.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Provides a Collider tool to convert [`PartitionExpr`] into a form that is easier to operate by program.
16//!
17//! This mod provides the following major structs:
18//!
19//! - [`Collider`]: The main struct that converts [`PartitionExpr`].
20//! - [`AtomicExpr`]: An "atomic" Expression, which isn't composed (OR-ed) of other expressions.
21//! - [`NucleonExpr`]: A simplified expression representation.
22//! - [`GluonOp`]: Further restricted operation set.
23//!
24//! On the naming aspect, "collider" is a high-energy machine that cracks particles, "atomic" is a typical
25//! non-divisible particle before ~100 years ago, "nucleon" is what composes an atom and "gluon" is the
26//! force inside nucleons.
27
28use std::collections::HashMap;
29use std::fmt::Debug;
30
31use datatypes::value::{OrderedF64, OrderedFloat, Value};
32
33use crate::error;
34use crate::error::Result;
35use crate::expr::{Operand, PartitionExpr, RestrictedOp};
36
37const ZERO: OrderedF64 = OrderedFloat(0.0f64);
38const NORMALIZE_STEP: OrderedF64 = OrderedFloat(1.0f64);
39
40/// Represents an "atomic" Expression, which isn't composed (OR-ed) of other expressions.
41#[allow(unused)]
42pub(crate) struct AtomicExpr {
43    /// A (ordered) list of simplified expressions. They are [`RestrictedOp::And`]'ed together.
44    nucleons: Vec<NucleonExpr>,
45    /// Index to reference the [`PartitionExpr`] that this [`AtomicExpr`] is derived from.
46    /// This index is used with `exprs` field in [`MultiDimPartitionRule`](crate::multi_dim::MultiDimPartitionRule).
47    source_expr_index: usize,
48}
49
50/// A simplified expression representation.
51///
52/// This struct is used to compose [`AtomicExpr`], hence "nucleon".
53#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
54struct NucleonExpr {
55    column: String,
56    op: GluonOp,
57    /// Normalized [`Value`].
58    value: OrderedF64,
59}
60
61/// Further restricted operation set.
62///
63/// Conjunction operations are removed from [`RestrictedOp`].
64/// This enumeration is used to bind elements in [`NucleonExpr`], hence "gluon".
65#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
66enum GluonOp {
67    Eq,
68    NotEq,
69    Lt,
70    LtEq,
71    Gt,
72    GtEq,
73}
74
75/// Collider is used to collide a list of [`PartitionExpr`] into a list of [`AtomicExpr`]
76///
77/// It also normalizes the values of the columns in the expressions.
78#[allow(unused)]
79pub struct Collider<'a> {
80    source_exprs: &'a [PartitionExpr],
81
82    atomic_exprs: Vec<AtomicExpr>,
83    /// A map of column name to a list of `(value, normalized value)` pairs.
84    ///
85    /// The normalized value is used for comparison. The normalization process keeps the order of the values.
86    normalized_values: HashMap<String, Vec<(Value, OrderedF64)>>,
87}
88
89impl<'a> Collider<'a> {
90    pub fn new(source_exprs: &'a [PartitionExpr]) -> Result<Self> {
91        // first walk to collect all values
92        let mut values: HashMap<String, Vec<Value>> = HashMap::new();
93        for expr in source_exprs {
94            Self::collect_column_values_from_expr(expr, &mut values)?;
95        }
96
97        // normalize values, assumes all values on a column are the same type
98        let mut normalized_values: HashMap<String, HashMap<Value, OrderedF64>> =
99            HashMap::with_capacity(values.len());
100        for (column, mut column_values) in values {
101            column_values.sort_unstable();
102            column_values.dedup(); // Remove duplicates
103            let mut value_map = HashMap::with_capacity(column_values.len());
104            let mut start_value = ZERO;
105            for value in column_values {
106                value_map.insert(value, start_value);
107                start_value += NORMALIZE_STEP;
108            }
109            normalized_values.insert(column, value_map);
110        }
111
112        // second walk to get atomic exprs
113        let mut atomic_exprs = Vec::with_capacity(source_exprs.len());
114        for (index, expr) in source_exprs.iter().enumerate() {
115            Self::collide_expr(expr, index, &normalized_values, &mut atomic_exprs)?;
116        }
117
118        // convert normalized values to a map
119        let normalized_values = normalized_values
120            .into_iter()
121            .map(|(col, values)| {
122                let mut values = values.into_iter().collect::<Vec<_>>();
123                values.sort_unstable_by_key(|(_, v)| *v);
124                (col, values)
125            })
126            .collect();
127
128        Ok(Self {
129            source_exprs,
130            atomic_exprs,
131            normalized_values,
132        })
133    }
134
135    /// Helper to collect values with their associated columns from an expression
136    fn collect_column_values_from_expr(
137        expr: &PartitionExpr,
138        values: &mut HashMap<String, Vec<Value>>,
139    ) -> Result<()> {
140        // Handle binary operations between column and value
141        match (&*expr.lhs, &*expr.rhs) {
142            (Operand::Column(col), Operand::Value(val))
143            | (Operand::Value(val), Operand::Column(col)) => {
144                values.entry(col.clone()).or_default().push(val.clone());
145                Ok(())
146            }
147            (Operand::Expr(left_expr), Operand::Expr(right_expr)) => {
148                Self::collect_column_values_from_expr(left_expr, values)?;
149                Self::collect_column_values_from_expr(right_expr, values)
150            }
151            // Other combinations don't directly contribute column-value pairs
152            _ => error::InvalidExprSnafu { expr: expr.clone() }.fail(),
153        }
154    }
155
156    /// Collide a [`PartitionExpr`] into multiple [`AtomicExpr`]s.
157    ///
158    /// Split the [`PartitionExpr`] on every [`RestrictedOp::Or`] (disjunction), each branch is an [`AtomicExpr`].
159    /// Since [`PartitionExpr`] doesn't allow parentheses, Expression like `(a = 1 OR b = 2) AND c = 3` won't occur.
160    /// We can safely split on every [`RestrictedOp::Or`].
161    fn collide_expr(
162        expr: &PartitionExpr,
163        index: usize,
164        normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
165        result: &mut Vec<AtomicExpr>,
166    ) -> Result<()> {
167        match expr.op {
168            RestrictedOp::Or => {
169                // Split on OR operation - each side becomes a separate atomic expression
170
171                // Process left side
172                match &*expr.lhs {
173                    Operand::Expr(left_expr) => {
174                        Self::collide_expr(left_expr, index, normalized_values, result)?;
175                    }
176                    _ => {
177                        // Single operand - this shouldn't happen with OR
178                        // OR should always connect two sub-expressions
179                        return error::InvalidExprSnafu { expr: expr.clone() }.fail();
180                    }
181                }
182
183                // Process right side
184                match &*expr.rhs {
185                    Operand::Expr(right_expr) => {
186                        Self::collide_expr(right_expr, index, normalized_values, result)?;
187                    }
188                    _ => {
189                        // Single operand - this shouldn't happen with OR
190                        // OR should always connect two sub-expressions
191                        return error::InvalidExprSnafu { expr: expr.clone() }.fail();
192                    }
193                }
194            }
195            RestrictedOp::And => {
196                // For AND operations, we need to combine nucleons
197                let mut nucleons = Vec::new();
198                Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
199
200                result.push(AtomicExpr {
201                    nucleons,
202                    source_expr_index: index,
203                });
204            }
205            _ => {
206                // For other operations, create a single atomic expression
207                let mut nucleons = Vec::new();
208                Self::collect_nucleons_from_expr(expr, &mut nucleons, normalized_values)?;
209
210                result.push(AtomicExpr {
211                    nucleons,
212                    source_expr_index: index,
213                });
214            }
215        }
216        Ok(())
217    }
218
219    /// Collect nucleons from an expression (handles AND operations recursively)
220    fn collect_nucleons_from_expr(
221        expr: &PartitionExpr,
222        nucleons: &mut Vec<NucleonExpr>,
223        normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
224    ) -> Result<()> {
225        match expr.op {
226            RestrictedOp::And => {
227                // For AND operations, collect nucleons from both sides
228                Self::collect_nucleons_from_operand(&expr.lhs, nucleons, normalized_values)?;
229                Self::collect_nucleons_from_operand(&expr.rhs, nucleons, normalized_values)?;
230            }
231            _ => {
232                // For non-AND operations, try to create a nucleon directly
233                nucleons.push(Self::try_create_nucleon(
234                    &expr.lhs,
235                    &expr.op,
236                    &expr.rhs,
237                    normalized_values,
238                )?);
239            }
240        }
241        Ok(())
242    }
243
244    /// Collect nucleons from an operand
245    fn collect_nucleons_from_operand(
246        operand: &Operand,
247        nucleons: &mut Vec<NucleonExpr>,
248        normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
249    ) -> Result<()> {
250        match operand {
251            Operand::Expr(expr) => {
252                Self::collect_nucleons_from_expr(expr, nucleons, normalized_values)
253            }
254            _ => {
255                // Only `Operand::Expr` can be conjuncted by AND.
256                error::NoExprOperandSnafu {
257                    operand: operand.clone(),
258                }
259                .fail()
260            }
261        }
262    }
263
264    /// Try to create a nucleon from operands
265    fn try_create_nucleon(
266        lhs: &Operand,
267        op: &RestrictedOp,
268        rhs: &Operand,
269        normalized_values: &HashMap<String, HashMap<Value, OrderedF64>>,
270    ) -> Result<NucleonExpr> {
271        let gluon_op = match op {
272            RestrictedOp::Eq => GluonOp::Eq,
273            RestrictedOp::NotEq => GluonOp::NotEq,
274            RestrictedOp::Lt => GluonOp::Lt,
275            RestrictedOp::LtEq => GluonOp::LtEq,
276            RestrictedOp::Gt => GluonOp::Gt,
277            RestrictedOp::GtEq => GluonOp::GtEq,
278            RestrictedOp::And | RestrictedOp::Or => {
279                // These should be handled elsewhere
280                return error::UnexpectedSnafu {
281                    err_msg: format!("Conjunction operation {:?} should be handled elsewhere", op),
282                }
283                .fail();
284            }
285        };
286
287        match (lhs, rhs) {
288            (Operand::Column(col), Operand::Value(val)) => {
289                if let Some(column_values) = normalized_values.get(col) {
290                    if let Some(&normalized_val) = column_values.get(val) {
291                        return Ok(NucleonExpr {
292                            column: col.clone(),
293                            op: gluon_op,
294                            value: normalized_val,
295                        });
296                    }
297                }
298            }
299            (Operand::Value(val), Operand::Column(col)) => {
300                if let Some(column_values) = normalized_values.get(col) {
301                    if let Some(&normalized_val) = column_values.get(val) {
302                        // Flip the operation for value op column
303                        let flipped_op = match gluon_op {
304                            GluonOp::Lt => GluonOp::Gt,
305                            GluonOp::LtEq => GluonOp::GtEq,
306                            GluonOp::Gt => GluonOp::Lt,
307                            GluonOp::GtEq => GluonOp::LtEq,
308                            op => op, // Eq and NotEq remain the same
309                        };
310                        return Ok(NucleonExpr {
311                            column: col.clone(),
312                            op: flipped_op,
313                            value: normalized_val,
314                        });
315                    }
316                }
317            }
318            _ => {}
319        }
320
321        // Other combinations not supported for nucleons
322        error::InvalidExprSnafu {
323            expr: PartitionExpr::new(lhs.clone(), op.clone(), rhs.clone()),
324        }
325        .fail()
326    }
327}
328
329#[cfg(test)]
330mod test {
331    use super::*;
332    use crate::expr::col;
333
334    #[test]
335    fn test_collider_basic_value_normalization() {
336        // Test with different value types in different columns
337        let exprs = vec![
338            // Integer values
339            col("age").eq(Value::UInt32(25)),
340            col("age").eq(Value::UInt32(30)),
341            col("age").eq(Value::UInt32(25)), // Duplicate should be handled
342            // String values
343            col("name").eq(Value::String("alice".into())),
344            col("name").eq(Value::String("bob".into())),
345            // Boolean values
346            col("active").eq(Value::Boolean(true)),
347            col("active").eq(Value::Boolean(false)),
348            // Float values
349            col("score").eq(Value::Float64(OrderedFloat(95.5))),
350            col("score").eq(Value::Float64(OrderedFloat(87.2))),
351        ];
352
353        let collider = Collider::new(&exprs).expect("Failed to create collider");
354
355        // Check that we have the right number of columns
356        assert_eq!(collider.normalized_values.len(), 4);
357
358        // Check age column - should have 2 unique values (25, 30)
359        let age_values = &collider.normalized_values["age"];
360        assert_eq!(age_values.len(), 2);
361        assert_eq!(
362            age_values,
363            &[
364                (Value::UInt32(25), OrderedFloat(0.0f64)),
365                (Value::UInt32(30), OrderedFloat(1.0f64))
366            ]
367        );
368
369        // Check name column - should have 2 values
370        let name_values = &collider.normalized_values["name"];
371        assert_eq!(name_values.len(), 2);
372        assert_eq!(
373            name_values,
374            &[
375                (Value::String("alice".into()), OrderedFloat(0.0f64)),
376                (Value::String("bob".into()), OrderedFloat(1.0f64))
377            ]
378        );
379
380        // Check active column - should have 2 values
381        let active_values = &collider.normalized_values["active"];
382        assert_eq!(active_values.len(), 2);
383        assert_eq!(
384            active_values,
385            &[
386                (Value::Boolean(false), OrderedFloat(0.0f64)),
387                (Value::Boolean(true), OrderedFloat(1.0f64))
388            ]
389        );
390
391        // Check score column - should have 2 values
392        let score_values = &collider.normalized_values["score"];
393        assert_eq!(score_values.len(), 2);
394        assert_eq!(
395            score_values,
396            &[
397                (Value::Float64(OrderedFloat(87.2)), OrderedFloat(0.0f64)),
398                (Value::Float64(OrderedFloat(95.5)), OrderedFloat(1.0f64))
399            ]
400        );
401    }
402
403    #[test]
404    fn test_collider_simple_expressions() {
405        // Test simple equality
406        let exprs = vec![col("id").eq(Value::UInt32(1))];
407
408        let collider = Collider::new(&exprs).unwrap();
409        assert_eq!(collider.atomic_exprs.len(), 1);
410        assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
411        assert_eq!(collider.atomic_exprs[0].source_expr_index, 0);
412
413        // Test simple AND
414        let exprs = vec![col("id")
415            .eq(Value::UInt32(1))
416            .and(col("status").eq(Value::String("active".into())))];
417
418        let collider = Collider::new(&exprs).unwrap();
419        assert_eq!(collider.atomic_exprs.len(), 1);
420        assert_eq!(collider.atomic_exprs[0].nucleons.len(), 2);
421
422        // Test simple OR - should create 2 atomic expressions
423        let expr = PartitionExpr::new(
424            Operand::Expr(col("id").eq(Value::UInt32(1))),
425            RestrictedOp::Or,
426            Operand::Expr(col("id").eq(Value::UInt32(2))),
427        );
428        let exprs = vec![expr];
429
430        let collider = Collider::new(&exprs).unwrap();
431        assert_eq!(collider.atomic_exprs.len(), 2);
432        assert_eq!(collider.atomic_exprs[0].nucleons.len(), 1);
433        assert_eq!(collider.atomic_exprs[1].nucleons.len(), 1);
434    }
435
436    #[test]
437    fn test_collider_complex_nested_expressions() {
438        // Test: (id = 1 AND status = 'active') OR (id = 2 AND status = 'inactive') OR (id = 3)
439        let branch1 = col("id")
440            .eq(Value::UInt32(1))
441            .and(col("status").eq(Value::String("active".into())));
442        let branch2 = col("id")
443            .eq(Value::UInt32(2))
444            .and(col("status").eq(Value::String("inactive".into())));
445        let branch3 = col("id").eq(Value::UInt32(3));
446
447        let expr = PartitionExpr::new(
448            Operand::Expr(PartitionExpr::new(
449                Operand::Expr(branch1),
450                RestrictedOp::Or,
451                Operand::Expr(branch2),
452            )),
453            RestrictedOp::Or,
454            Operand::Expr(branch3),
455        );
456
457        let exprs = vec![expr];
458        let collider = Collider::new(&exprs).unwrap();
459
460        assert_eq!(collider.atomic_exprs.len(), 3);
461
462        let total_nucleons: usize = collider
463            .atomic_exprs
464            .iter()
465            .map(|ae| ae.nucleons.len())
466            .sum();
467        assert_eq!(total_nucleons, 5);
468    }
469
470    #[test]
471    fn test_collider_deep_nesting() {
472        // Test deeply nested AND operations: a = 1 AND b = 2 AND c = 3 AND d = 4
473        let expr = col("a")
474            .eq(Value::UInt32(1))
475            .and(col("b").eq(Value::UInt32(2)))
476            .and(col("c").eq(Value::UInt32(3)))
477            .and(col("d").eq(Value::UInt32(4)));
478
479        let exprs = vec![expr];
480        let collider = Collider::new(&exprs).unwrap();
481
482        assert_eq!(collider.atomic_exprs.len(), 1);
483        assert_eq!(collider.atomic_exprs[0].nucleons.len(), 4);
484
485        // All nucleons should have Eq operation
486        for nucleon in &collider.atomic_exprs[0].nucleons {
487            assert_eq!(nucleon.op, GluonOp::Eq);
488        }
489    }
490
491    #[test]
492    fn test_collider_multiple_expressions() {
493        // Test multiple separate expressions
494        let exprs = vec![
495            col("id").eq(Value::UInt32(1)),
496            col("name").eq(Value::String("alice".into())),
497            col("score").gt_eq(Value::Float64(OrderedFloat(90.0))),
498        ];
499
500        let collider = Collider::new(&exprs).unwrap();
501
502        // Should create 3 atomic expressions (one for each input expression)
503        assert_eq!(collider.atomic_exprs.len(), 3);
504
505        // Each should have exactly 1 nucleon
506        for atomic_expr in &collider.atomic_exprs {
507            assert_eq!(atomic_expr.nucleons.len(), 1);
508        }
509
510        // Check that source indices are correct
511        let indices: Vec<usize> = collider
512            .atomic_exprs
513            .iter()
514            .map(|ae| ae.source_expr_index)
515            .collect();
516        assert!(indices.contains(&0));
517        assert!(indices.contains(&1));
518        assert!(indices.contains(&2));
519    }
520
521    #[test]
522    fn test_collider_value_column_order() {
523        // Test expressions where value comes before column (should flip operation)
524        let expr1 = PartitionExpr::new(
525            Operand::Value(Value::UInt32(10)),
526            RestrictedOp::Lt,
527            Operand::Column("age".to_string()),
528        ); // 10 < age should become age > 10
529
530        let expr2 = PartitionExpr::new(
531            Operand::Value(Value::UInt32(20)),
532            RestrictedOp::GtEq,
533            Operand::Column("score".to_string()),
534        ); // 20 >= score should become score <= 20
535
536        let exprs = vec![expr1, expr2];
537        let collider = Collider::new(&exprs).unwrap();
538
539        assert_eq!(collider.atomic_exprs.len(), 2);
540
541        // Check that operations were flipped correctly
542        let operations: Vec<GluonOp> = collider
543            .atomic_exprs
544            .iter()
545            .map(|ae| ae.nucleons[0].op.clone())
546            .collect();
547
548        assert!(operations.contains(&GluonOp::Gt)); // 10 < age -> age > 10
549        assert!(operations.contains(&GluonOp::LtEq)); // 20 >= score -> score <= 20
550    }
551
552    #[test]
553    fn test_collider_complex_or_with_different_columns() {
554        // Test: (name = 'alice' AND age = 25) OR (status = 'active' AND score > 90)
555        let branch1 = col("name")
556            .eq(Value::String("alice".into()))
557            .and(col("age").eq(Value::UInt32(25)));
558
559        let branch2 = col("status")
560            .eq(Value::String("active".into()))
561            .and(PartitionExpr::new(
562                Operand::Column("score".to_string()),
563                RestrictedOp::Gt,
564                Operand::Value(Value::Float64(OrderedFloat(90.0))),
565            ));
566
567        let expr = PartitionExpr::new(
568            Operand::Expr(branch1),
569            RestrictedOp::Or,
570            Operand::Expr(branch2),
571        );
572
573        let exprs = vec![expr];
574        let collider = Collider::new(&exprs).expect("Failed to create collider");
575
576        // Should create 2 atomic expressions
577        assert_eq!(collider.atomic_exprs.len(), 2);
578
579        // Each atomic expression should have 2 nucleons
580        for atomic_expr in &collider.atomic_exprs {
581            assert_eq!(atomic_expr.nucleons.len(), 2);
582        }
583
584        // Should have normalized values for all 4 columns
585        assert_eq!(collider.normalized_values.len(), 4);
586        assert!(collider.normalized_values.contains_key("name"));
587        assert!(collider.normalized_values.contains_key("age"));
588        assert!(collider.normalized_values.contains_key("status"));
589        assert!(collider.normalized_values.contains_key("score"));
590    }
591
592    #[test]
593    fn test_try_create_nucleon_edge_cases() {
594        let normalized_values = HashMap::new();
595
596        // Test with AND operation
597        let result = Collider::try_create_nucleon(
598            &col("a"),
599            &RestrictedOp::And,
600            &Operand::Value(Value::UInt32(1)),
601            &normalized_values,
602        );
603        assert!(result.is_err());
604
605        // Test with OR operation
606        let result = Collider::try_create_nucleon(
607            &col("a"),
608            &RestrictedOp::Or,
609            &Operand::Value(Value::UInt32(1)),
610            &normalized_values,
611        );
612        assert!(result.is_err());
613
614        // Test with Column-Column
615        let result = Collider::try_create_nucleon(
616            &col("a"),
617            &RestrictedOp::Eq,
618            &col("b"),
619            &normalized_values,
620        );
621        assert!(result.is_err());
622
623        // Test with Value-Value
624        let result = Collider::try_create_nucleon(
625            &Operand::Value(Value::UInt32(1)),
626            &RestrictedOp::Eq,
627            &Operand::Value(Value::UInt32(2)),
628            &normalized_values,
629        );
630        assert!(result.is_err());
631
632        // Test empty expression list
633        let exprs = vec![];
634        let collider = Collider::new(&exprs).unwrap();
635        assert_eq!(collider.atomic_exprs.len(), 0);
636        assert_eq!(collider.normalized_values.len(), 0);
637    }
638}