1use std::cmp::Ordering;
21use std::ops::Bound;
22
23use datatypes::value::OrderedF64;
24
25use crate::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
26use crate::error::Result;
27use crate::expr::PartitionExpr;
28
29pub fn atomic_exprs_overlap(lhs: &AtomicExpr, rhs: &AtomicExpr) -> bool {
32 let mut lhs_index = 0;
34 let mut rhs_index = 0;
35
36 while lhs_index < lhs.nucleons.len() && rhs_index < rhs.nucleons.len() {
37 let lhs_col = lhs.nucleons[lhs_index].column();
38 let rhs_col = rhs.nucleons[rhs_index].column();
39
40 match lhs_col.cmp(rhs_col) {
41 Ordering::Equal => {
42 let mut lhs_next = lhs_index;
44 let mut rhs_next = rhs_index;
45 while lhs_next < lhs.nucleons.len() && lhs.nucleons[lhs_next].column() == lhs_col {
46 lhs_next += 1;
47 }
48 while rhs_next < rhs.nucleons.len() && rhs.nucleons[rhs_next].column() == rhs_col {
49 rhs_next += 1;
50 }
51
52 let lhs_range = nucleons_to_range(&lhs.nucleons[lhs_index..lhs_next]);
53 let rhs_range = nucleons_to_range(&rhs.nucleons[rhs_index..rhs_next]);
54
55 if !lhs_range.overlaps_with(&rhs_range) {
56 return false;
57 }
58
59 lhs_index = lhs_next;
60 rhs_index = rhs_next;
61 }
62 Ordering::Less => {
63 let col = lhs_col;
65 while lhs_index < lhs.nucleons.len() && lhs.nucleons[lhs_index].column() == col {
66 lhs_index += 1;
67 }
68 }
69 Ordering::Greater => {
70 let col = rhs_col;
72 while rhs_index < rhs.nucleons.len() && rhs.nucleons[rhs_index].column() == col {
73 rhs_index += 1;
74 }
75 }
76 }
77 }
78
79 true
80}
81
82fn expr_pair_overlap(lhs: &PartitionExpr, rhs: &PartitionExpr) -> Result<bool> {
86 let binding = [lhs.clone(), rhs.clone()];
87 let collider = Collider::new(&binding)?;
88 let mut lhs_atoms = Vec::new();
90 let mut rhs_atoms = Vec::new();
91 for atomic in collider.atomic_exprs.iter() {
92 if atomic.source_expr_index == 0 {
93 lhs_atoms.push(atomic);
94 } else {
95 rhs_atoms.push(atomic);
96 }
97 }
98 for lhs_atomic in &lhs_atoms {
99 for rhs_atomic in &rhs_atoms {
100 if atomic_exprs_overlap(lhs_atomic, rhs_atomic) {
101 return Ok(true);
102 }
103 }
104 }
105 Ok(false)
106}
107
108pub fn associate_from_to(
113 from_exprs: &[PartitionExpr],
114 to_exprs: &[PartitionExpr],
115) -> Result<Vec<Vec<usize>>> {
116 let mut result = Vec::with_capacity(from_exprs.len());
117 for from in from_exprs.iter() {
118 let mut targets = Vec::new();
119 for (i, to) in to_exprs.iter().enumerate() {
120 if expr_pair_overlap(from, to)? {
121 targets.push(i);
122 }
123 }
124 result.push(targets);
125 }
126 Ok(result)
127}
128
129#[derive(Debug, Clone)]
131struct ValueRange {
132 lower: Bound<OrderedF64>,
133 upper: Bound<OrderedF64>,
134}
135
136impl ValueRange {
137 fn new() -> Self {
138 Self {
139 lower: Bound::Unbounded,
140 upper: Bound::Unbounded,
141 }
142 }
143
144 fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
145 match (&self.lower, &new_lower) {
146 (Bound::Unbounded, _) => self.lower = new_lower,
147 (_, Bound::Unbounded) => {}
148 (Bound::Included(cur), Bound::Included(new))
149 | (Bound::Excluded(cur), Bound::Included(new))
150 | (Bound::Included(cur), Bound::Excluded(new))
151 | (Bound::Excluded(cur), Bound::Excluded(new)) => {
152 if new > cur {
153 self.lower = new_lower;
154 }
155 }
156 }
157 }
158
159 fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
160 match (&self.upper, &new_upper) {
161 (Bound::Unbounded, _) => self.upper = new_upper,
162 (_, Bound::Unbounded) => {}
163 (Bound::Included(cur), Bound::Included(new))
164 | (Bound::Excluded(cur), Bound::Included(new))
165 | (Bound::Included(cur), Bound::Excluded(new))
166 | (Bound::Excluded(cur), Bound::Excluded(new)) => {
167 if new < cur {
168 self.upper = new_upper;
169 }
170 }
171 }
172 }
173
174 fn overlaps_with(&self, other: &Self) -> bool {
175 fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
176 match (upper, lower) {
177 (Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
178 (Bound::Included(u), Bound::Included(l)) => u < l,
180 (Bound::Included(u), Bound::Excluded(l))
182 | (Bound::Excluded(u), Bound::Included(l))
183 | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
184 }
185 }
186
187 if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
188 return false;
189 }
190 true
191 }
192}
193
194fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
196 use GluonOp::*;
197
198 let mut range = ValueRange::new();
199 for n in nucleons {
200 let v = n.value();
201 match n.op() {
202 Eq => {
203 range.lower = Bound::Included(v);
204 range.upper = Bound::Included(v);
205 break;
206 }
207 Lt => range.update_upper(Bound::Excluded(v)),
208 LtEq => range.update_upper(Bound::Included(v)),
209 Gt => range.update_lower(Bound::Excluded(v)),
210 GtEq => range.update_lower(Bound::Included(v)),
211 NotEq => continue, }
213 }
214 range
215}
216
217#[cfg(test)]
218mod tests {
219 use datatypes::value::Value;
220
221 use super::*;
222 use crate::expr::{Operand, PartitionExpr, RestrictedOp, col};
223
224 #[test]
225 fn test_pair_overlap_simple() {
226 let a = col("user_id")
227 .gt_eq(Value::Int64(0))
228 .and(col("user_id").lt(Value::Int64(100)));
229 let b = col("user_id").eq(Value::Int64(50));
230 assert!(expr_pair_overlap(&a, &b).unwrap());
231
232 let c = col("user_id")
233 .gt_eq(Value::Int64(100))
234 .and(col("user_id").lt(Value::Int64(200)));
235 assert!(!expr_pair_overlap(&a, &c).unwrap());
236 }
237
238 #[test]
239 fn test_associate_from_to() {
240 let from = vec![
242 col("user_id")
243 .gt_eq(Value::Int64(0))
244 .and(col("user_id").lt(Value::Int64(100))),
245 col("user_id")
246 .gt_eq(Value::Int64(100))
247 .and(col("user_id").lt(Value::Int64(200))),
248 ];
249 let to = vec![
251 col("user_id")
252 .gt_eq(Value::Int64(0))
253 .and(col("user_id").lt(Value::Int64(150))),
254 col("user_id")
255 .gt_eq(Value::Int64(150))
256 .and(col("user_id").lt(Value::Int64(300))),
257 ];
258 let assoc = associate_from_to(&from, &to).unwrap();
259 assert_eq!(assoc.len(), 2);
260 assert_eq!(assoc[0], vec![0]);
262 assert_eq!(assoc[1], vec![0, 1]);
264 }
265
266 #[test]
267 fn test_expr_with_or() {
268 let a = PartitionExpr::new(
270 Operand::Expr(col("user_id").eq(Value::Int64(10))),
271 RestrictedOp::Or,
272 Operand::Expr(col("user_id").eq(Value::Int64(20))),
273 );
274 let b = col("user_id")
275 .gt_eq(Value::Int64(15))
276 .and(col("user_id").lt_eq(Value::Int64(25)));
277 assert!(expr_pair_overlap(&a, &b).unwrap());
278 }
279
280 #[test]
281 fn test_adjacent_ranges_non_overlap() {
282 let from = vec![
284 col("k")
285 .gt_eq(Value::Int64(0))
286 .and(col("k").lt(Value::Int64(100))),
287 ];
288 let to = vec![
289 col("k")
290 .gt_eq(Value::Int64(100))
291 .and(col("k").lt(Value::Int64(200))),
292 ];
293 let assoc = associate_from_to(&from, &to).unwrap();
294 assert_eq!(assoc[0], Vec::<usize>::new());
295 }
296
297 #[test]
298 fn test_multi_column_conflict_no_overlap() {
299 let left = col("a")
301 .gt_eq(Value::Int64(0))
302 .and(col("a").lt(Value::Int64(10)))
303 .and(col("b").gt_eq(Value::Int64(5)));
304 let right = col("a")
306 .eq(Value::Int64(9))
307 .and(col("b").lt(Value::Int64(5)));
308 assert!(!expr_pair_overlap(&left, &right).unwrap());
309 }
310
311 #[test]
312 fn test_disjoint_columns_overlap() {
313 let from = vec![col("a").eq(Value::Int64(1))];
315 let to = vec![col("b").eq(Value::Int64(2))];
316 let assoc = associate_from_to(&from, &to).unwrap();
317 assert_eq!(assoc[0], vec![0]);
318 }
319
320 #[test]
321 fn test_boundary_inclusive_exclusive() {
322 let left = col("a")
324 .lt_eq(Value::Int64(10))
325 .and(col("a").gt_eq(Value::Int64(10)));
326 let right_eq = col("a").eq(Value::Int64(10));
328 assert!(expr_pair_overlap(&left, &right_eq).unwrap());
329
330 let left_lt = col("a").lt(Value::Int64(10));
332 assert!(!expr_pair_overlap(&left_lt, &right_eq).unwrap());
333 }
334}