partition/
overlap.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Rule overlap and association utilities.
16//!
17//! This module provides pure functions to determine overlap relationships between
18//! partition expressions and to associate rule sets.
19
20use std::cmp::Ordering;
21use std::ops::Bound;
22
23use datatypes::value::OrderedF64;
24
25use crate::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
26use crate::error::Result;
27use crate::expr::PartitionExpr;
28
29/// Check if two atomic expressions can be both satisfied, i.e., whether they
30/// overlap on all common columns.
31pub fn atomic_exprs_overlap(lhs: &AtomicExpr, rhs: &AtomicExpr) -> bool {
32    // Merge-walk over columns since nucleons are sorted by (column, op, value)
33    let mut lhs_index = 0;
34    let mut rhs_index = 0;
35
36    while lhs_index < lhs.nucleons.len() && rhs_index < rhs.nucleons.len() {
37        let lhs_col = lhs.nucleons[lhs_index].column();
38        let rhs_col = rhs.nucleons[rhs_index].column();
39
40        match lhs_col.cmp(rhs_col) {
41            Ordering::Equal => {
42                // advance to the next column boundaries in both atomics
43                let mut lhs_next = lhs_index;
44                let mut rhs_next = rhs_index;
45                while lhs_next < lhs.nucleons.len() && lhs.nucleons[lhs_next].column() == lhs_col {
46                    lhs_next += 1;
47                }
48                while rhs_next < rhs.nucleons.len() && rhs.nucleons[rhs_next].column() == rhs_col {
49                    rhs_next += 1;
50                }
51
52                let lhs_range = nucleons_to_range(&lhs.nucleons[lhs_index..lhs_next]);
53                let rhs_range = nucleons_to_range(&rhs.nucleons[rhs_index..rhs_next]);
54
55                if !lhs_range.overlaps_with(&rhs_range) {
56                    return false;
57                }
58
59                lhs_index = lhs_next;
60                rhs_index = rhs_next;
61            }
62            Ordering::Less => {
63                // column appears only in `lhs`, skip all nucleons for this column
64                let col = lhs_col;
65                while lhs_index < lhs.nucleons.len() && lhs.nucleons[lhs_index].column() == col {
66                    lhs_index += 1;
67                }
68            }
69            Ordering::Greater => {
70                // column appears only in `rhs`, skip all nucleons for this column
71                let col = rhs_col;
72                while rhs_index < rhs.nucleons.len() && rhs.nucleons[rhs_index].column() == col {
73                    rhs_index += 1;
74                }
75            }
76        }
77    }
78
79    true
80}
81
82/// Pairwise overlap check between two expression lists.
83///
84/// Returns true if two [`PartitionExpr`]s are overlapping (any pair of atomics overlaps).
85fn expr_pair_overlap(lhs: &PartitionExpr, rhs: &PartitionExpr) -> Result<bool> {
86    let binding = [lhs.clone(), rhs.clone()];
87    let collider = Collider::new(&binding)?;
88    // Split atomic exprs by source index
89    let mut lhs_atoms = Vec::new();
90    let mut rhs_atoms = Vec::new();
91    for atomic in collider.atomic_exprs.iter() {
92        if atomic.source_expr_index == 0 {
93            lhs_atoms.push(atomic);
94        } else {
95            rhs_atoms.push(atomic);
96        }
97    }
98    for lhs_atomic in &lhs_atoms {
99        for rhs_atomic in &rhs_atoms {
100            if atomic_exprs_overlap(lhs_atomic, rhs_atomic) {
101                return Ok(true);
102            }
103        }
104    }
105    Ok(false)
106}
107
108/// Associates each expression in `from_exprs` with indices of overlapping expressions in `to_exprs`.
109///
110/// Output vector length equals `from_exprs.len()`, and each inner vector contains indices into
111/// `to_exprs` that overlap with the corresponding `from_exprs[i]`.
112pub fn associate_from_to(
113    from_exprs: &[PartitionExpr],
114    to_exprs: &[PartitionExpr],
115) -> Result<Vec<Vec<usize>>> {
116    let mut result = Vec::with_capacity(from_exprs.len());
117    for from in from_exprs.iter() {
118        let mut targets = Vec::new();
119        for (i, to) in to_exprs.iter().enumerate() {
120            if expr_pair_overlap(from, to)? {
121                targets.push(i);
122            }
123        }
124        result.push(targets);
125    }
126    Ok(result)
127}
128
129/// Represents a value range derived from a group of nucleons for the same column
130#[derive(Debug, Clone)]
131struct ValueRange {
132    lower: Bound<OrderedF64>,
133    upper: Bound<OrderedF64>,
134}
135
136impl ValueRange {
137    fn new() -> Self {
138        Self {
139            lower: Bound::Unbounded,
140            upper: Bound::Unbounded,
141        }
142    }
143
144    fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
145        match (&self.lower, &new_lower) {
146            (Bound::Unbounded, _) => self.lower = new_lower,
147            (_, Bound::Unbounded) => {}
148            (Bound::Included(cur), Bound::Included(new))
149            | (Bound::Excluded(cur), Bound::Included(new))
150            | (Bound::Included(cur), Bound::Excluded(new))
151            | (Bound::Excluded(cur), Bound::Excluded(new)) => {
152                if new > cur {
153                    self.lower = new_lower;
154                }
155            }
156        }
157    }
158
159    fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
160        match (&self.upper, &new_upper) {
161            (Bound::Unbounded, _) => self.upper = new_upper,
162            (_, Bound::Unbounded) => {}
163            (Bound::Included(cur), Bound::Included(new))
164            | (Bound::Excluded(cur), Bound::Included(new))
165            | (Bound::Included(cur), Bound::Excluded(new))
166            | (Bound::Excluded(cur), Bound::Excluded(new)) => {
167                if new < cur {
168                    self.upper = new_upper;
169                }
170            }
171        }
172    }
173
174    fn overlaps_with(&self, other: &Self) -> bool {
175        fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
176            match (upper, lower) {
177                (Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
178                // u], [l
179                (Bound::Included(u), Bound::Included(l)) => u < l,
180                // u], (l) or u), [l or u), (l)
181                (Bound::Included(u), Bound::Excluded(l))
182                | (Bound::Excluded(u), Bound::Included(l))
183                | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
184            }
185        }
186
187        if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
188            return false;
189        }
190        true
191    }
192}
193
194/// Convert nucleons for the same column into a ValueRange
195fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
196    use GluonOp::*;
197
198    let mut range = ValueRange::new();
199    for n in nucleons {
200        let v = n.value();
201        match n.op() {
202            Eq => {
203                range.lower = Bound::Included(v);
204                range.upper = Bound::Included(v);
205                break;
206            }
207            Lt => range.update_upper(Bound::Excluded(v)),
208            LtEq => range.update_upper(Bound::Included(v)),
209            Gt => range.update_lower(Bound::Excluded(v)),
210            GtEq => range.update_lower(Bound::Included(v)),
211            NotEq => continue, // handled elsewhere as separate atomics
212        }
213    }
214    range
215}
216
217#[cfg(test)]
218mod tests {
219    use datatypes::value::Value;
220
221    use super::*;
222    use crate::expr::{Operand, PartitionExpr, RestrictedOp, col};
223
224    #[test]
225    fn test_pair_overlap_simple() {
226        let a = col("user_id")
227            .gt_eq(Value::Int64(0))
228            .and(col("user_id").lt(Value::Int64(100)));
229        let b = col("user_id").eq(Value::Int64(50));
230        assert!(expr_pair_overlap(&a, &b).unwrap());
231
232        let c = col("user_id")
233            .gt_eq(Value::Int64(100))
234            .and(col("user_id").lt(Value::Int64(200)));
235        assert!(!expr_pair_overlap(&a, &c).unwrap());
236    }
237
238    #[test]
239    fn test_associate_from_to() {
240        // from: [ [0,100), [100,200) ]
241        let from = vec![
242            col("user_id")
243                .gt_eq(Value::Int64(0))
244                .and(col("user_id").lt(Value::Int64(100))),
245            col("user_id")
246                .gt_eq(Value::Int64(100))
247                .and(col("user_id").lt(Value::Int64(200))),
248        ];
249        // to: [ [0,150), [150,300) ]
250        let to = vec![
251            col("user_id")
252                .gt_eq(Value::Int64(0))
253                .and(col("user_id").lt(Value::Int64(150))),
254            col("user_id")
255                .gt_eq(Value::Int64(150))
256                .and(col("user_id").lt(Value::Int64(300))),
257        ];
258        let assoc = associate_from_to(&from, &to).unwrap();
259        assert_eq!(assoc.len(), 2);
260        // [0,100) overlaps only with [0,150)
261        assert_eq!(assoc[0], vec![0]);
262        // [100,200) overlaps both [0,150) and [150,300)
263        assert_eq!(assoc[1], vec![0, 1]);
264    }
265
266    #[test]
267    fn test_expr_with_or() {
268        // a: (user_id = 10 OR user_id = 20)
269        let a = PartitionExpr::new(
270            Operand::Expr(col("user_id").eq(Value::Int64(10))),
271            RestrictedOp::Or,
272            Operand::Expr(col("user_id").eq(Value::Int64(20))),
273        );
274        let b = col("user_id")
275            .gt_eq(Value::Int64(15))
276            .and(col("user_id").lt_eq(Value::Int64(25)));
277        assert!(expr_pair_overlap(&a, &b).unwrap());
278    }
279
280    #[test]
281    fn test_adjacent_ranges_non_overlap() {
282        // [0, 100) vs [100, 200) -> no overlap
283        let from = vec![
284            col("k")
285                .gt_eq(Value::Int64(0))
286                .and(col("k").lt(Value::Int64(100))),
287        ];
288        let to = vec![
289            col("k")
290                .gt_eq(Value::Int64(100))
291                .and(col("k").lt(Value::Int64(200))),
292        ];
293        let assoc = associate_from_to(&from, &to).unwrap();
294        assert_eq!(assoc[0], Vec::<usize>::new());
295    }
296
297    #[test]
298    fn test_multi_column_conflict_no_overlap() {
299        // Left: a in [0,10) AND b >= 5
300        let left = col("a")
301            .gt_eq(Value::Int64(0))
302            .and(col("a").lt(Value::Int64(10)))
303            .and(col("b").gt_eq(Value::Int64(5)));
304        // Right: a = 9 AND b < 5 -> conflict on b
305        let right = col("a")
306            .eq(Value::Int64(9))
307            .and(col("b").lt(Value::Int64(5)));
308        assert!(!expr_pair_overlap(&left, &right).unwrap());
309    }
310
311    #[test]
312    fn test_disjoint_columns_overlap() {
313        // Different columns don't constrain each other => satisfiable together
314        let from = vec![col("a").eq(Value::Int64(1))];
315        let to = vec![col("b").eq(Value::Int64(2))];
316        let assoc = associate_from_to(&from, &to).unwrap();
317        assert_eq!(assoc[0], vec![0]);
318    }
319
320    #[test]
321    fn test_boundary_inclusive_exclusive() {
322        // Left: a <= 10 AND a >= 10 => a = 10
323        let left = col("a")
324            .lt_eq(Value::Int64(10))
325            .and(col("a").gt_eq(Value::Int64(10)));
326        // Right: a = 10 -> overlap
327        let right_eq = col("a").eq(Value::Int64(10));
328        assert!(expr_pair_overlap(&left, &right_eq).unwrap());
329
330        // Left: a < 10, Right: a = 10 -> no overlap
331        let left_lt = col("a").lt(Value::Int64(10));
332        assert!(!expr_pair_overlap(&left_lt, &right_eq).unwrap());
333    }
334}