1use std::cmp::Ordering;
18use std::ops::Bound;
19
20use ahash::{HashMap, HashSet};
21use common_telemetry::debug;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::value::{OrderedF64, OrderedFloat, Value};
24use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
25use partition::expr::{Operand, PartitionExpr};
26use partition::manager::PartitionInfo;
27use store_api::storage::RegionId;
28use GluonOp::*;
29
30use crate::error::Result;
31
32pub struct ConstraintPruner;
33
34impl ConstraintPruner {
35 pub fn prune_regions(
39 query_expressions: &[PartitionExpr],
40 partitions: &[PartitionInfo],
41 column_datatypes: HashMap<String, ConcreteDataType>,
42 ) -> Result<Vec<RegionId>> {
43 let start = std::time::Instant::now();
44 if query_expressions.is_empty() || partitions.is_empty() {
45 return Ok(partitions.iter().map(|p| p.id).collect());
47 }
48
49 let mut expression_to_partition = Vec::with_capacity(partitions.len());
51 let mut all_partition_expressions = Vec::with_capacity(partitions.len());
52 for partition in partitions {
53 if let Some(expr) = &partition.partition_expr {
54 expression_to_partition.push(partition.id);
55 all_partition_expressions.push(expr.clone());
56 }
57 }
58 if all_partition_expressions.is_empty() {
59 return Ok(partitions.iter().map(|p| p.id).collect());
60 }
61
62 let mut all_expressions = query_expressions.to_vec();
64 all_expressions.extend(all_partition_expressions.iter().cloned());
65 if !Self::normalize_datatype(&mut all_expressions, &column_datatypes) {
66 return Ok(partitions.iter().map(|p| p.id).collect());
67 }
68
69 let collider = match Collider::new(&all_expressions) {
70 Ok(collider) => collider,
71 Err(err) => {
72 debug!(
73 "Failed to create unified collider: {}, returning all regions conservatively",
74 err
75 );
76 return Ok(partitions.iter().map(|p| p.id).collect());
77 }
78 };
79
80 let query_atomics: Vec<&AtomicExpr> = collider
82 .atomic_exprs
83 .iter()
84 .filter(|atomic| atomic.source_expr_index < query_expressions.len())
85 .collect();
86
87 let mut candidate_regions = HashSet::default();
88
89 for region_atomics in collider
90 .atomic_exprs
91 .iter()
92 .filter(|atomic| atomic.source_expr_index >= query_expressions.len())
93 {
94 if Self::atomic_sets_overlap(&query_atomics, region_atomics) {
95 let partition_expr_index =
96 region_atomics.source_expr_index - query_expressions.len();
97 candidate_regions.insert(expression_to_partition[partition_expr_index]);
98 }
99 }
100
101 debug!(
102 "Constraint pruning (cost {}ms): {} -> {} regions",
103 start.elapsed().as_millis(),
104 partitions.len(),
105 candidate_regions.len()
106 );
107
108 Ok(candidate_regions.into_iter().collect())
109 }
110
111 fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool {
112 for query_atomic in query_atomics {
113 if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) {
114 return true;
115 }
116 }
117
118 false
119 }
120
121 fn normalize_datatype(
122 all_expressions: &mut Vec<PartitionExpr>,
123 column_datatypes: &HashMap<String, ConcreteDataType>,
124 ) -> bool {
125 for expr in all_expressions {
126 if !Self::normalize_expr_datatype(&mut expr.lhs, &mut expr.rhs, column_datatypes) {
127 return false;
128 }
129 }
130 true
131 }
132
133 fn normalize_expr_datatype(
134 lhs: &mut Operand,
135 rhs: &mut Operand,
136 column_datatypes: &HashMap<String, ConcreteDataType>,
137 ) -> bool {
138 match (lhs, rhs) {
139 (Operand::Expr(lhs_expr), Operand::Expr(rhs_expr)) => {
140 Self::normalize_expr_datatype(
141 &mut lhs_expr.lhs,
142 &mut lhs_expr.rhs,
143 column_datatypes,
144 ) && Self::normalize_expr_datatype(
145 &mut rhs_expr.lhs,
146 &mut rhs_expr.rhs,
147 column_datatypes,
148 )
149 }
150 (Operand::Column(col_name), Operand::Value(val))
151 | (Operand::Value(val), Operand::Column(col_name)) => {
152 let Some(datatype) = column_datatypes.get(col_name) else {
153 debug!("Column {} not found from type set, skip pruning", col_name);
154 return false;
155 };
156
157 match datatype {
158 ConcreteDataType::Int8(_)
159 | ConcreteDataType::Int16(_)
160 | ConcreteDataType::Int32(_)
161 | ConcreteDataType::Int64(_) => {
162 let Some(new_lit) = val.as_i64() else {
163 debug!("Value {:?} cannot be converted to i64", val);
164 return false;
165 };
166 *val = Value::Int64(new_lit);
167 }
168
169 ConcreteDataType::UInt8(_)
170 | ConcreteDataType::UInt16(_)
171 | ConcreteDataType::UInt32(_)
172 | ConcreteDataType::UInt64(_) => {
173 let Some(new_lit) = val.as_u64() else {
174 debug!("Value {:?} cannot be converted to u64", val);
175 return false;
176 };
177 *val = Value::UInt64(new_lit);
178 }
179
180 ConcreteDataType::Float32(_) | ConcreteDataType::Float64(_) => {
181 let Some(new_lit) = val.as_f64_lossy() else {
182 debug!("Value {:?} cannot be converted to f64", val);
183 return false;
184 };
185
186 *val = Value::Float64(OrderedFloat(new_lit));
187 }
188
189 ConcreteDataType::String(_) | ConcreteDataType::Boolean(_) => {
190 }
192
193 ConcreteDataType::Decimal128(_)
194 | ConcreteDataType::Binary(_)
195 | ConcreteDataType::Date(_)
196 | ConcreteDataType::Timestamp(_)
197 | ConcreteDataType::Time(_)
198 | ConcreteDataType::Duration(_)
199 | ConcreteDataType::Interval(_)
200 | ConcreteDataType::List(_)
201 | ConcreteDataType::Dictionary(_)
202 | ConcreteDataType::Struct(_)
203 | ConcreteDataType::Json(_)
204 | ConcreteDataType::Null(_)
205 | ConcreteDataType::Vector(_) => {
206 debug!("Unsupported data type {datatype}");
207 return false;
208 }
209 }
210
211 true
212 }
213 _ => false,
214 }
215 }
216
217 fn atomic_constraint_satisfied(
219 query_atomic: &AtomicExpr,
220 partition_atomic: &AtomicExpr,
221 ) -> bool {
222 let mut query_index = 0;
223 let mut partition_index = 0;
224
225 while query_index < query_atomic.nucleons.len()
226 && partition_index < partition_atomic.nucleons.len()
227 {
228 let query_col = query_atomic.nucleons[query_index].column();
229 let partition_col = partition_atomic.nucleons[partition_index].column();
230
231 match query_col.cmp(partition_col) {
232 Ordering::Equal => {
233 let mut query_index_for_next_col = query_index;
234 let mut partition_index_for_next_col = partition_index;
235
236 while query_index_for_next_col < query_atomic.nucleons.len()
237 && query_atomic.nucleons[query_index_for_next_col].column() == query_col
238 {
239 query_index_for_next_col += 1;
240 }
241 while partition_index_for_next_col < partition_atomic.nucleons.len()
242 && partition_atomic.nucleons[partition_index_for_next_col].column()
243 == partition_col
244 {
245 partition_index_for_next_col += 1;
246 }
247
248 let query_range = Self::nucleons_to_range(
249 &query_atomic.nucleons[query_index..query_index_for_next_col],
250 );
251 let partition_range = Self::nucleons_to_range(
252 &partition_atomic.nucleons[partition_index..partition_index_for_next_col],
253 );
254
255 debug!("Comparing two ranges, {query_range:?} and {partition_range:?}");
256
257 query_index = query_index_for_next_col;
258 partition_index = partition_index_for_next_col;
259
260 if !query_range.overlaps_with(&partition_range) {
261 return false;
262 }
263 }
264 Ordering::Less => {
265 while query_index < query_atomic.nucleons.len()
267 && query_atomic.nucleons[query_index].column() == query_col
268 {
269 query_index += 1;
270 }
271 }
272 Ordering::Greater => {
273 while partition_index < partition_atomic.nucleons.len()
275 && partition_atomic.nucleons[partition_index].column() == partition_col
276 {
277 partition_index += 1;
278 }
279 }
280 }
281 }
282
283 true
284 }
285
286 fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
288 let mut range = ValueRange::new();
289
290 for nucleon in nucleons {
291 let value = nucleon.value();
292 match nucleon.op() {
293 Eq => {
294 range.lower = Bound::Included(value);
295 range.upper = Bound::Included(value);
296 break; }
298 Lt => {
299 range.update_upper(Bound::Excluded(value));
301 }
302 LtEq => {
303 range.update_upper(Bound::Included(value));
304 }
305 Gt => {
306 range.update_lower(Bound::Excluded(value));
307 }
308 GtEq => {
309 range.update_lower(Bound::Included(value));
310 }
311 NotEq => {
312 continue;
314 }
315 }
316 }
317
318 range
319 }
320}
321
322#[derive(Debug, Clone)]
324struct ValueRange {
325 lower: Bound<OrderedF64>,
327 upper: Bound<OrderedF64>,
328}
329
330impl ValueRange {
331 fn new() -> Self {
332 Self {
333 lower: Bound::Unbounded,
334 upper: Bound::Unbounded,
335 }
336 }
337
338 fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
340 match (&self.lower, &new_lower) {
341 (Bound::Unbounded, _) => self.lower = new_lower,
342 (_, Bound::Unbounded) => { }
343 (Bound::Included(cur), Bound::Included(new))
344 | (Bound::Excluded(cur), Bound::Included(new))
345 | (Bound::Included(cur), Bound::Excluded(new))
346 | (Bound::Excluded(cur), Bound::Excluded(new)) => {
347 if new > cur {
348 self.lower = new_lower;
349 } else if new == cur {
350 if matches!(new_lower, Bound::Excluded(_))
352 && matches!(self.lower, Bound::Included(_))
353 {
354 self.lower = new_lower;
355 }
356 }
357 }
358 }
359 }
360
361 fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
363 match (&self.upper, &new_upper) {
364 (Bound::Unbounded, _) => self.upper = new_upper,
365 (_, Bound::Unbounded) => { }
366 (Bound::Included(cur), Bound::Included(new))
367 | (Bound::Excluded(cur), Bound::Included(new))
368 | (Bound::Included(cur), Bound::Excluded(new))
369 | (Bound::Excluded(cur), Bound::Excluded(new)) => {
370 if new < cur {
371 self.upper = new_upper;
372 } else if new == cur {
373 if matches!(new_upper, Bound::Excluded(_))
375 && matches!(self.upper, Bound::Included(_))
376 {
377 self.upper = new_upper;
378 }
379 }
380 }
381 }
382 }
383
384 fn overlaps_with(&self, other: &ValueRange) -> bool {
386 fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
387 match (upper, lower) {
388 (Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
389 (Bound::Included(u), Bound::Included(l)) => u < l,
391 (Bound::Included(u), Bound::Excluded(l))
393 | (Bound::Excluded(u), Bound::Included(l))
395 | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
397 }
398 }
399
400 if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
401 return false;
402 }
403 true
404 }
405}
406
407#[cfg(test)]
408mod tests {
409 use datatypes::value::Value;
410 use partition::expr::{col, Operand, PartitionExpr, RestrictedOp};
411 use store_api::storage::RegionId;
412
413 use super::*;
414
415 fn create_test_partition_info(region_id: u64, expr: Option<PartitionExpr>) -> PartitionInfo {
416 PartitionInfo {
417 id: RegionId::new(1, region_id as u32),
418 partition_expr: expr,
419 }
420 }
421
422 #[test]
423 fn test_constraint_pruning_equality() {
424 let partitions = vec![
425 create_test_partition_info(
427 1,
428 Some(
429 col("user_id")
430 .gt_eq(Value::Int64(0))
431 .and(col("user_id").lt(Value::Int64(100))),
432 ),
433 ),
434 create_test_partition_info(
436 2,
437 Some(
438 col("user_id")
439 .gt_eq(Value::Int64(100))
440 .and(col("user_id").lt(Value::Int64(200))),
441 ),
442 ),
443 create_test_partition_info(
445 3,
446 Some(
447 col("user_id")
448 .gt_eq(Value::Int64(200))
449 .and(col("user_id").lt(Value::Int64(300))),
450 ),
451 ),
452 ];
453
454 let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
456 let mut column_datatypes = HashMap::default();
457 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
458 let pruned =
459 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
460
461 assert!(pruned.contains(&RegionId::new(1, 2)));
463 }
464
465 #[test]
466 fn test_constraint_pruning_in_list() {
467 let partitions = vec![
468 create_test_partition_info(
470 1,
471 Some(
472 col("user_id")
473 .gt_eq(Value::Int64(0))
474 .and(col("user_id").lt(Value::Int64(100))),
475 ),
476 ),
477 create_test_partition_info(
479 2,
480 Some(
481 col("user_id")
482 .gt_eq(Value::Int64(100))
483 .and(col("user_id").lt(Value::Int64(200))),
484 ),
485 ),
486 create_test_partition_info(
488 3,
489 Some(
490 col("user_id")
491 .gt_eq(Value::Int64(200))
492 .and(col("user_id").lt(Value::Int64(300))),
493 ),
494 ),
495 ];
496
497 let query_exprs = vec![PartitionExpr::new(
499 Operand::Expr(PartitionExpr::new(
500 Operand::Expr(col("user_id").eq(Value::Int64(50))),
501 RestrictedOp::Or,
502 Operand::Expr(col("user_id").eq(Value::Int64(150))),
503 )),
504 RestrictedOp::Or,
505 Operand::Expr(col("user_id").eq(Value::Int64(250))),
506 )];
507
508 let mut column_datatypes = HashMap::default();
509 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
510 let pruned =
511 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
512
513 assert!(!pruned.is_empty());
515 }
516
517 #[test]
518 fn test_constraint_pruning_range() {
519 let partitions = vec![
520 create_test_partition_info(
522 1,
523 Some(
524 col("user_id")
525 .gt_eq(Value::Int64(0))
526 .and(col("user_id").lt(Value::Int64(100))),
527 ),
528 ),
529 create_test_partition_info(
531 2,
532 Some(
533 col("user_id")
534 .gt_eq(Value::Int64(100))
535 .and(col("user_id").lt(Value::Int64(200))),
536 ),
537 ),
538 create_test_partition_info(
540 3,
541 Some(
542 col("user_id")
543 .gt_eq(Value::Int64(200))
544 .and(col("user_id").lt(Value::Int64(300))),
545 ),
546 ),
547 ];
548
549 let query_exprs = vec![col("user_id").gt_eq(Value::Int64(150))];
551 let mut column_datatypes = HashMap::default();
552 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
553 let pruned =
554 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
555
556 assert!(pruned.len() >= 2);
562 assert!(pruned.contains(&RegionId::new(1, 2))); assert!(pruned.contains(&RegionId::new(1, 3))); }
565
566 #[test]
567 fn test_prune_regions_no_constraints() {
568 let partitions = vec![
569 create_test_partition_info(1, None),
570 create_test_partition_info(2, None),
571 ];
572
573 let constraints = vec![];
574 let column_datatypes = HashMap::default();
575 let pruned =
576 ConstraintPruner::prune_regions(&constraints, &partitions, column_datatypes).unwrap();
577
578 assert_eq!(pruned.len(), 2);
580 }
581
582 #[test]
583 fn test_prune_regions_with_simple_equality() {
584 let partitions = vec![
585 create_test_partition_info(
587 1,
588 Some(
589 col("user_id")
590 .gt_eq(Value::Int64(0))
591 .and(col("user_id").lt(Value::Int64(100))),
592 ),
593 ),
594 create_test_partition_info(
596 2,
597 Some(
598 col("user_id")
599 .gt_eq(Value::Int64(100))
600 .and(col("user_id").lt(Value::Int64(200))),
601 ),
602 ),
603 create_test_partition_info(
605 3,
606 Some(
607 col("user_id")
608 .gt_eq(Value::Int64(200))
609 .and(col("user_id").lt(Value::Int64(300))),
610 ),
611 ),
612 ];
613
614 let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
616 let mut column_datatypes = HashMap::default();
617 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
618 let pruned =
619 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
620
621 assert!(pruned.contains(&RegionId::new(1, 2)));
623 }
624
625 #[test]
626 fn test_prune_regions_with_or_constraint() {
627 let partitions = vec![
628 create_test_partition_info(
630 1,
631 Some(
632 col("user_id")
633 .gt_eq(Value::Int64(0))
634 .and(col("user_id").lt(Value::Int64(100))),
635 ),
636 ),
637 create_test_partition_info(
639 2,
640 Some(
641 col("user_id")
642 .gt_eq(Value::Int64(100))
643 .and(col("user_id").lt(Value::Int64(200))),
644 ),
645 ),
646 create_test_partition_info(
648 3,
649 Some(
650 col("user_id")
651 .gt_eq(Value::Int64(200))
652 .and(col("user_id").lt(Value::Int64(300))),
653 ),
654 ),
655 ];
656
657 let expr1 = col("user_id").eq(Value::Int64(50));
659 let expr2 = col("user_id").eq(Value::Int64(150));
660 let expr3 = col("user_id").eq(Value::Int64(250));
661
662 let or_expr = PartitionExpr::new(
663 Operand::Expr(PartitionExpr::new(
664 Operand::Expr(expr1),
665 RestrictedOp::Or,
666 Operand::Expr(expr2),
667 )),
668 RestrictedOp::Or,
669 Operand::Expr(expr3),
670 );
671
672 let query_exprs = vec![or_expr];
673 let mut column_datatypes = HashMap::default();
674 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
675 let pruned =
676 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
677
678 assert_eq!(pruned.len(), 3);
680 assert!(pruned.contains(&RegionId::new(1, 1)));
681 assert!(pruned.contains(&RegionId::new(1, 2)));
682 assert!(pruned.contains(&RegionId::new(1, 3)));
683 }
684
685 #[test]
686 fn test_constraint_pruning_no_match() {
687 let partitions = vec![
688 create_test_partition_info(
690 1,
691 Some(
692 col("user_id")
693 .gt_eq(Value::Int64(0))
694 .and(col("user_id").lt(Value::Int64(100))),
695 ),
696 ),
697 create_test_partition_info(
699 2,
700 Some(
701 col("user_id")
702 .gt_eq(Value::Int64(100))
703 .and(col("user_id").lt(Value::Int64(200))),
704 ),
705 ),
706 ];
707
708 let query_exprs = vec![col("user_id").eq(Value::Int64(300))];
710 let mut column_datatypes = HashMap::default();
711 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
712 let pruned =
713 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
714
715 assert_eq!(pruned.len(), 0);
717 }
718
719 #[test]
720 fn test_constraint_pruning_partial_match() {
721 let partitions = vec![
722 create_test_partition_info(
724 1,
725 Some(
726 col("user_id")
727 .gt_eq(Value::Int64(0))
728 .and(col("user_id").lt(Value::Int64(100))),
729 ),
730 ),
731 create_test_partition_info(
733 2,
734 Some(
735 col("user_id")
736 .gt_eq(Value::Int64(100))
737 .and(col("user_id").lt(Value::Int64(200))),
738 ),
739 ),
740 ];
741
742 let query_exprs = vec![col("user_id").gt_eq(Value::Int64(50))];
744 let mut column_datatypes = HashMap::default();
745 column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
746 let pruned =
747 ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
748
749 assert_eq!(pruned.len(), 2);
752 assert!(pruned.contains(&RegionId::new(1, 1)));
753 assert!(pruned.contains(&RegionId::new(1, 2)));
754 }
755}