query/dist_plan/
region_pruner.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! [`ConstraintPruner`] prunes partition info based on given expressions.
16
17use std::cmp::Ordering;
18use std::ops::Bound;
19
20use ahash::{HashMap, HashSet};
21use common_telemetry::debug;
22use datatypes::prelude::ConcreteDataType;
23use datatypes::value::{OrderedF64, OrderedFloat, Value};
24use partition::collider::{AtomicExpr, Collider, GluonOp, NucleonExpr};
25use partition::expr::{Operand, PartitionExpr};
26use partition::manager::PartitionInfo;
27use store_api::storage::RegionId;
28use GluonOp::*;
29
30use crate::error::Result;
31
32pub struct ConstraintPruner;
33
34impl ConstraintPruner {
35    /// Prune regions using constraint satisfaction approach
36    ///
37    /// Takes query expressions and partition info, returns matching region IDs
38    pub fn prune_regions(
39        query_expressions: &[PartitionExpr],
40        partitions: &[PartitionInfo],
41        column_datatypes: HashMap<String, ConcreteDataType>,
42    ) -> Result<Vec<RegionId>> {
43        let start = std::time::Instant::now();
44        if query_expressions.is_empty() || partitions.is_empty() {
45            // No constraints, return all regions
46            return Ok(partitions.iter().map(|p| p.id).collect());
47        }
48
49        // Collect all partition expressions for unified normalization
50        let mut expression_to_partition = Vec::with_capacity(partitions.len());
51        let mut all_partition_expressions = Vec::with_capacity(partitions.len());
52        for partition in partitions {
53            if let Some(expr) = &partition.partition_expr {
54                expression_to_partition.push(partition.id);
55                all_partition_expressions.push(expr.clone());
56            }
57        }
58        if all_partition_expressions.is_empty() {
59            return Ok(partitions.iter().map(|p| p.id).collect());
60        }
61
62        // Create unified collider with both query and partition expressions for consistent normalization
63        let mut all_expressions = query_expressions.to_vec();
64        all_expressions.extend(all_partition_expressions.iter().cloned());
65        if !Self::normalize_datatype(&mut all_expressions, &column_datatypes) {
66            return Ok(partitions.iter().map(|p| p.id).collect());
67        }
68
69        let collider = match Collider::new(&all_expressions) {
70            Ok(collider) => collider,
71            Err(err) => {
72                debug!(
73                    "Failed to create unified collider: {}, returning all regions conservatively",
74                    err
75                );
76                return Ok(partitions.iter().map(|p| p.id).collect());
77            }
78        };
79
80        // Extract query atomic expressions (first N expressions in the collider)
81        let query_atomics: Vec<&AtomicExpr> = collider
82            .atomic_exprs
83            .iter()
84            .filter(|atomic| atomic.source_expr_index < query_expressions.len())
85            .collect();
86
87        let mut candidate_regions = HashSet::default();
88
89        for region_atomics in collider
90            .atomic_exprs
91            .iter()
92            .filter(|atomic| atomic.source_expr_index >= query_expressions.len())
93        {
94            if Self::atomic_sets_overlap(&query_atomics, region_atomics) {
95                let partition_expr_index =
96                    region_atomics.source_expr_index - query_expressions.len();
97                candidate_regions.insert(expression_to_partition[partition_expr_index]);
98            }
99        }
100
101        debug!(
102            "Constraint pruning (cost {}ms): {} -> {} regions",
103            start.elapsed().as_millis(),
104            partitions.len(),
105            candidate_regions.len()
106        );
107
108        Ok(candidate_regions.into_iter().collect())
109    }
110
111    fn atomic_sets_overlap(query_atomics: &[&AtomicExpr], partition_atomic: &AtomicExpr) -> bool {
112        for query_atomic in query_atomics {
113            if Self::atomic_constraint_satisfied(query_atomic, partition_atomic) {
114                return true;
115            }
116        }
117
118        false
119    }
120
121    fn normalize_datatype(
122        all_expressions: &mut Vec<PartitionExpr>,
123        column_datatypes: &HashMap<String, ConcreteDataType>,
124    ) -> bool {
125        for expr in all_expressions {
126            if !Self::normalize_expr_datatype(&mut expr.lhs, &mut expr.rhs, column_datatypes) {
127                return false;
128            }
129        }
130        true
131    }
132
133    fn normalize_expr_datatype(
134        lhs: &mut Operand,
135        rhs: &mut Operand,
136        column_datatypes: &HashMap<String, ConcreteDataType>,
137    ) -> bool {
138        match (lhs, rhs) {
139            (Operand::Expr(lhs_expr), Operand::Expr(rhs_expr)) => {
140                Self::normalize_expr_datatype(
141                    &mut lhs_expr.lhs,
142                    &mut lhs_expr.rhs,
143                    column_datatypes,
144                ) && Self::normalize_expr_datatype(
145                    &mut rhs_expr.lhs,
146                    &mut rhs_expr.rhs,
147                    column_datatypes,
148                )
149            }
150            (Operand::Column(col_name), Operand::Value(val))
151            | (Operand::Value(val), Operand::Column(col_name)) => {
152                let Some(datatype) = column_datatypes.get(col_name) else {
153                    debug!("Column {} not found from type set, skip pruning", col_name);
154                    return false;
155                };
156
157                match datatype {
158                    ConcreteDataType::Int8(_)
159                    | ConcreteDataType::Int16(_)
160                    | ConcreteDataType::Int32(_)
161                    | ConcreteDataType::Int64(_) => {
162                        let Some(new_lit) = val.as_i64() else {
163                            debug!("Value {:?} cannot be converted to i64", val);
164                            return false;
165                        };
166                        *val = Value::Int64(new_lit);
167                    }
168
169                    ConcreteDataType::UInt8(_)
170                    | ConcreteDataType::UInt16(_)
171                    | ConcreteDataType::UInt32(_)
172                    | ConcreteDataType::UInt64(_) => {
173                        let Some(new_lit) = val.as_u64() else {
174                            debug!("Value {:?} cannot be converted to u64", val);
175                            return false;
176                        };
177                        *val = Value::UInt64(new_lit);
178                    }
179
180                    ConcreteDataType::Float32(_) | ConcreteDataType::Float64(_) => {
181                        let Some(new_lit) = val.as_f64_lossy() else {
182                            debug!("Value {:?} cannot be converted to f64", val);
183                            return false;
184                        };
185
186                        *val = Value::Float64(OrderedFloat(new_lit));
187                    }
188
189                    ConcreteDataType::String(_) | ConcreteDataType::Boolean(_) => {
190                        // no operation needed
191                    }
192
193                    ConcreteDataType::Decimal128(_)
194                    | ConcreteDataType::Binary(_)
195                    | ConcreteDataType::Date(_)
196                    | ConcreteDataType::Timestamp(_)
197                    | ConcreteDataType::Time(_)
198                    | ConcreteDataType::Duration(_)
199                    | ConcreteDataType::Interval(_)
200                    | ConcreteDataType::List(_)
201                    | ConcreteDataType::Dictionary(_)
202                    | ConcreteDataType::Struct(_)
203                    | ConcreteDataType::Json(_)
204                    | ConcreteDataType::Null(_)
205                    | ConcreteDataType::Vector(_) => {
206                        debug!("Unsupported data type {datatype}");
207                        return false;
208                    }
209                }
210
211                true
212            }
213            _ => false,
214        }
215    }
216
217    /// Check if a single atomic constraint can be satisfied
218    fn atomic_constraint_satisfied(
219        query_atomic: &AtomicExpr,
220        partition_atomic: &AtomicExpr,
221    ) -> bool {
222        let mut query_index = 0;
223        let mut partition_index = 0;
224
225        while query_index < query_atomic.nucleons.len()
226            && partition_index < partition_atomic.nucleons.len()
227        {
228            let query_col = query_atomic.nucleons[query_index].column();
229            let partition_col = partition_atomic.nucleons[partition_index].column();
230
231            match query_col.cmp(partition_col) {
232                Ordering::Equal => {
233                    let mut query_index_for_next_col = query_index;
234                    let mut partition_index_for_next_col = partition_index;
235
236                    while query_index_for_next_col < query_atomic.nucleons.len()
237                        && query_atomic.nucleons[query_index_for_next_col].column() == query_col
238                    {
239                        query_index_for_next_col += 1;
240                    }
241                    while partition_index_for_next_col < partition_atomic.nucleons.len()
242                        && partition_atomic.nucleons[partition_index_for_next_col].column()
243                            == partition_col
244                    {
245                        partition_index_for_next_col += 1;
246                    }
247
248                    let query_range = Self::nucleons_to_range(
249                        &query_atomic.nucleons[query_index..query_index_for_next_col],
250                    );
251                    let partition_range = Self::nucleons_to_range(
252                        &partition_atomic.nucleons[partition_index..partition_index_for_next_col],
253                    );
254
255                    debug!("Comparing two ranges, {query_range:?} and {partition_range:?}");
256
257                    query_index = query_index_for_next_col;
258                    partition_index = partition_index_for_next_col;
259
260                    if !query_range.overlaps_with(&partition_range) {
261                        return false;
262                    }
263                }
264                Ordering::Less => {
265                    // Query column comes before partition column - skip query column
266                    while query_index < query_atomic.nucleons.len()
267                        && query_atomic.nucleons[query_index].column() == query_col
268                    {
269                        query_index += 1;
270                    }
271                }
272                Ordering::Greater => {
273                    // Partition column comes before query column - skip partition column
274                    while partition_index < partition_atomic.nucleons.len()
275                        && partition_atomic.nucleons[partition_index].column() == partition_col
276                    {
277                        partition_index += 1;
278                    }
279                }
280            }
281        }
282
283        true
284    }
285
286    /// Convert a slice of nucleons (all for the same column) into a ValueRange
287    fn nucleons_to_range(nucleons: &[NucleonExpr]) -> ValueRange {
288        let mut range = ValueRange::new();
289
290        for nucleon in nucleons {
291            let value = nucleon.value();
292            match nucleon.op() {
293                Eq => {
294                    range.lower = Bound::Included(value);
295                    range.upper = Bound::Included(value);
296                    break; // exact value, most restrictive
297                }
298                Lt => {
299                    // upper < value
300                    range.update_upper(Bound::Excluded(value));
301                }
302                LtEq => {
303                    range.update_upper(Bound::Included(value));
304                }
305                Gt => {
306                    range.update_lower(Bound::Excluded(value));
307                }
308                GtEq => {
309                    range.update_lower(Bound::Included(value));
310                }
311                NotEq => {
312                    // handled as two separate atomic exprs elsewhere
313                    continue;
314                }
315            }
316        }
317
318        range
319    }
320}
321
322/// Represents a value range derived from a group of nucleons for the same column
323#[derive(Debug, Clone)]
324struct ValueRange {
325    // lower and upper bounds using standard library Bound semantics
326    lower: Bound<OrderedF64>,
327    upper: Bound<OrderedF64>,
328}
329
330impl ValueRange {
331    fn new() -> Self {
332        Self {
333            lower: Bound::Unbounded,
334            upper: Bound::Unbounded,
335        }
336    }
337
338    // Update lower bound choosing the more restrictive one
339    fn update_lower(&mut self, new_lower: Bound<OrderedF64>) {
340        match (&self.lower, &new_lower) {
341            (Bound::Unbounded, _) => self.lower = new_lower,
342            (_, Bound::Unbounded) => { /* keep existing */ }
343            (Bound::Included(cur), Bound::Included(new))
344            | (Bound::Excluded(cur), Bound::Included(new))
345            | (Bound::Included(cur), Bound::Excluded(new))
346            | (Bound::Excluded(cur), Bound::Excluded(new)) => {
347                if new > cur {
348                    self.lower = new_lower;
349                } else if new == cur {
350                    // prefer Excluded over Included for the same value (more restrictive)
351                    if matches!(new_lower, Bound::Excluded(_))
352                        && matches!(self.lower, Bound::Included(_))
353                    {
354                        self.lower = new_lower;
355                    }
356                }
357            }
358        }
359    }
360
361    // Update upper bound choosing the more restrictive one
362    fn update_upper(&mut self, new_upper: Bound<OrderedF64>) {
363        match (&self.upper, &new_upper) {
364            (Bound::Unbounded, _) => self.upper = new_upper,
365            (_, Bound::Unbounded) => { /* keep existing */ }
366            (Bound::Included(cur), Bound::Included(new))
367            | (Bound::Excluded(cur), Bound::Included(new))
368            | (Bound::Included(cur), Bound::Excluded(new))
369            | (Bound::Excluded(cur), Bound::Excluded(new)) => {
370                if new < cur {
371                    self.upper = new_upper;
372                } else if new == cur {
373                    // prefer Excluded over Included for the same value (more restrictive)
374                    if matches!(new_upper, Bound::Excluded(_))
375                        && matches!(self.upper, Bound::Included(_))
376                    {
377                        self.upper = new_upper;
378                    }
379                }
380            }
381        }
382    }
383
384    /// Check if this range overlaps with another range
385    fn overlaps_with(&self, other: &ValueRange) -> bool {
386        fn no_overlap(upper: &Bound<OrderedF64>, lower: &Bound<OrderedF64>) -> bool {
387            match (upper, lower) {
388                (Bound::Unbounded, _) | (_, Bound::Unbounded) => false,
389                // u], [l
390                (Bound::Included(u), Bound::Included(l)) => u < l,
391                // u], (l
392                (Bound::Included(u), Bound::Excluded(l))
393                // u), [l
394                | (Bound::Excluded(u), Bound::Included(l))
395                // u), (l
396                | (Bound::Excluded(u), Bound::Excluded(l)) => u <= l,
397            }
398        }
399
400        if no_overlap(&self.upper, &other.lower) || no_overlap(&other.upper, &self.lower) {
401            return false;
402        }
403        true
404    }
405}
406
407#[cfg(test)]
408mod tests {
409    use datatypes::value::Value;
410    use partition::expr::{col, Operand, PartitionExpr, RestrictedOp};
411    use store_api::storage::RegionId;
412
413    use super::*;
414
415    fn create_test_partition_info(region_id: u64, expr: Option<PartitionExpr>) -> PartitionInfo {
416        PartitionInfo {
417            id: RegionId::new(1, region_id as u32),
418            partition_expr: expr,
419        }
420    }
421
422    #[test]
423    fn test_constraint_pruning_equality() {
424        let partitions = vec![
425            // Region 1: user_id >= 0 AND user_id < 100
426            create_test_partition_info(
427                1,
428                Some(
429                    col("user_id")
430                        .gt_eq(Value::Int64(0))
431                        .and(col("user_id").lt(Value::Int64(100))),
432                ),
433            ),
434            // Region 2: user_id >= 100 AND user_id < 200
435            create_test_partition_info(
436                2,
437                Some(
438                    col("user_id")
439                        .gt_eq(Value::Int64(100))
440                        .and(col("user_id").lt(Value::Int64(200))),
441                ),
442            ),
443            // Region 3: user_id >= 200 AND user_id < 300
444            create_test_partition_info(
445                3,
446                Some(
447                    col("user_id")
448                        .gt_eq(Value::Int64(200))
449                        .and(col("user_id").lt(Value::Int64(300))),
450                ),
451            ),
452        ];
453
454        // Query: user_id = 150 (should only match Region 2)
455        let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
456        let mut column_datatypes = HashMap::default();
457        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
458        let pruned =
459            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
460
461        // Should include Region 2, and potentially others due to conservative approach
462        assert!(pruned.contains(&RegionId::new(1, 2)));
463    }
464
465    #[test]
466    fn test_constraint_pruning_in_list() {
467        let partitions = vec![
468            // Region 1: user_id >= 0 AND user_id < 100
469            create_test_partition_info(
470                1,
471                Some(
472                    col("user_id")
473                        .gt_eq(Value::Int64(0))
474                        .and(col("user_id").lt(Value::Int64(100))),
475                ),
476            ),
477            // Region 2: user_id >= 100 AND user_id < 200
478            create_test_partition_info(
479                2,
480                Some(
481                    col("user_id")
482                        .gt_eq(Value::Int64(100))
483                        .and(col("user_id").lt(Value::Int64(200))),
484                ),
485            ),
486            // Region 3: user_id >= 200 AND user_id < 300
487            create_test_partition_info(
488                3,
489                Some(
490                    col("user_id")
491                        .gt_eq(Value::Int64(200))
492                        .and(col("user_id").lt(Value::Int64(300))),
493                ),
494            ),
495        ];
496
497        // Query: user_id IN (50, 150, 250) - should match all regions
498        let query_exprs = vec![PartitionExpr::new(
499            Operand::Expr(PartitionExpr::new(
500                Operand::Expr(col("user_id").eq(Value::Int64(50))),
501                RestrictedOp::Or,
502                Operand::Expr(col("user_id").eq(Value::Int64(150))),
503            )),
504            RestrictedOp::Or,
505            Operand::Expr(col("user_id").eq(Value::Int64(250))),
506        )];
507
508        let mut column_datatypes = HashMap::default();
509        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
510        let pruned =
511            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
512
513        // Should include regions that can satisfy any of the values
514        assert!(!pruned.is_empty());
515    }
516
517    #[test]
518    fn test_constraint_pruning_range() {
519        let partitions = vec![
520            // Region 1: user_id >= 0 AND user_id < 100
521            create_test_partition_info(
522                1,
523                Some(
524                    col("user_id")
525                        .gt_eq(Value::Int64(0))
526                        .and(col("user_id").lt(Value::Int64(100))),
527                ),
528            ),
529            // Region 2: user_id >= 100 AND user_id < 200
530            create_test_partition_info(
531                2,
532                Some(
533                    col("user_id")
534                        .gt_eq(Value::Int64(100))
535                        .and(col("user_id").lt(Value::Int64(200))),
536                ),
537            ),
538            // Region 3: user_id >= 200 AND user_id < 300
539            create_test_partition_info(
540                3,
541                Some(
542                    col("user_id")
543                        .gt_eq(Value::Int64(200))
544                        .and(col("user_id").lt(Value::Int64(300))),
545                ),
546            ),
547        ];
548
549        // Query: user_id >= 150 (should include regions that can satisfy this constraint)
550        let query_exprs = vec![col("user_id").gt_eq(Value::Int64(150))];
551        let mut column_datatypes = HashMap::default();
552        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
553        let pruned =
554            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
555
556        // With constraint-based approach:
557        // Region 1: [0, 100) - user_id >= 150 is not satisfiable
558        // Region 2: [100, 200) - user_id >= 150 is satisfiable in range [150, 200)
559        // Region 3: [200, 300) - user_id >= 150 is satisfiable (all values >= 200 satisfy user_id >= 150)
560        // Conservative approach may include more regions, but should at least include regions 2 and 3
561        assert!(pruned.len() >= 2);
562        assert!(pruned.contains(&RegionId::new(1, 2))); // Region 2 should be included
563        assert!(pruned.contains(&RegionId::new(1, 3))); // Region 3 should be included
564    }
565
566    #[test]
567    fn test_prune_regions_no_constraints() {
568        let partitions = vec![
569            create_test_partition_info(1, None),
570            create_test_partition_info(2, None),
571        ];
572
573        let constraints = vec![];
574        let column_datatypes = HashMap::default();
575        let pruned =
576            ConstraintPruner::prune_regions(&constraints, &partitions, column_datatypes).unwrap();
577
578        // No constraints should return all regions
579        assert_eq!(pruned.len(), 2);
580    }
581
582    #[test]
583    fn test_prune_regions_with_simple_equality() {
584        let partitions = vec![
585            // Region 1: user_id >= 0 AND user_id < 100
586            create_test_partition_info(
587                1,
588                Some(
589                    col("user_id")
590                        .gt_eq(Value::Int64(0))
591                        .and(col("user_id").lt(Value::Int64(100))),
592                ),
593            ),
594            // Region 2: user_id >= 100 AND user_id < 200
595            create_test_partition_info(
596                2,
597                Some(
598                    col("user_id")
599                        .gt_eq(Value::Int64(100))
600                        .and(col("user_id").lt(Value::Int64(200))),
601                ),
602            ),
603            // Region 3: user_id >= 200 AND user_id < 300
604            create_test_partition_info(
605                3,
606                Some(
607                    col("user_id")
608                        .gt_eq(Value::Int64(200))
609                        .and(col("user_id").lt(Value::Int64(300))),
610                ),
611            ),
612        ];
613
614        // Query: user_id = 150 (should only match Region 2 which contains values [100, 200))
615        let query_exprs = vec![col("user_id").eq(Value::Int64(150))];
616        let mut column_datatypes = HashMap::default();
617        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
618        let pruned =
619            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
620
621        // user_id = 150 should match Region 2 ([100, 200)) and potentially others due to conservative approach
622        assert!(pruned.contains(&RegionId::new(1, 2)));
623    }
624
625    #[test]
626    fn test_prune_regions_with_or_constraint() {
627        let partitions = vec![
628            // Region 1: user_id >= 0 AND user_id < 100
629            create_test_partition_info(
630                1,
631                Some(
632                    col("user_id")
633                        .gt_eq(Value::Int64(0))
634                        .and(col("user_id").lt(Value::Int64(100))),
635                ),
636            ),
637            // Region 2: user_id >= 100 AND user_id < 200
638            create_test_partition_info(
639                2,
640                Some(
641                    col("user_id")
642                        .gt_eq(Value::Int64(100))
643                        .and(col("user_id").lt(Value::Int64(200))),
644                ),
645            ),
646            // Region 3: user_id >= 200 AND user_id < 300
647            create_test_partition_info(
648                3,
649                Some(
650                    col("user_id")
651                        .gt_eq(Value::Int64(200))
652                        .and(col("user_id").lt(Value::Int64(300))),
653                ),
654            ),
655        ];
656
657        // Query: user_id = 50 OR user_id = 150 OR user_id = 250 - should match all 3 regions
658        let expr1 = col("user_id").eq(Value::Int64(50));
659        let expr2 = col("user_id").eq(Value::Int64(150));
660        let expr3 = col("user_id").eq(Value::Int64(250));
661
662        let or_expr = PartitionExpr::new(
663            Operand::Expr(PartitionExpr::new(
664                Operand::Expr(expr1),
665                RestrictedOp::Or,
666                Operand::Expr(expr2),
667            )),
668            RestrictedOp::Or,
669            Operand::Expr(expr3),
670        );
671
672        let query_exprs = vec![or_expr];
673        let mut column_datatypes = HashMap::default();
674        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
675        let pruned =
676            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
677
678        // Should match all 3 regions: 50 matches Region 1, 150 matches Region 2, 250 matches Region 3
679        assert_eq!(pruned.len(), 3);
680        assert!(pruned.contains(&RegionId::new(1, 1)));
681        assert!(pruned.contains(&RegionId::new(1, 2)));
682        assert!(pruned.contains(&RegionId::new(1, 3)));
683    }
684
685    #[test]
686    fn test_constraint_pruning_no_match() {
687        let partitions = vec![
688            // Region 1: user_id >= 0 AND user_id < 100
689            create_test_partition_info(
690                1,
691                Some(
692                    col("user_id")
693                        .gt_eq(Value::Int64(0))
694                        .and(col("user_id").lt(Value::Int64(100))),
695                ),
696            ),
697            // Region 2: user_id >= 100 AND user_id < 200
698            create_test_partition_info(
699                2,
700                Some(
701                    col("user_id")
702                        .gt_eq(Value::Int64(100))
703                        .and(col("user_id").lt(Value::Int64(200))),
704                ),
705            ),
706        ];
707
708        // Query: user_id = 300 (should match no regions)
709        let query_exprs = vec![col("user_id").eq(Value::Int64(300))];
710        let mut column_datatypes = HashMap::default();
711        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
712        let pruned =
713            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
714
715        // Should match no regions since 300 is outside both partition ranges
716        assert_eq!(pruned.len(), 0);
717    }
718
719    #[test]
720    fn test_constraint_pruning_partial_match() {
721        let partitions = vec![
722            // Region 1: user_id >= 0 AND user_id < 100
723            create_test_partition_info(
724                1,
725                Some(
726                    col("user_id")
727                        .gt_eq(Value::Int64(0))
728                        .and(col("user_id").lt(Value::Int64(100))),
729                ),
730            ),
731            // Region 2: user_id >= 100 AND user_id < 200
732            create_test_partition_info(
733                2,
734                Some(
735                    col("user_id")
736                        .gt_eq(Value::Int64(100))
737                        .and(col("user_id").lt(Value::Int64(200))),
738                ),
739            ),
740        ];
741
742        // Query: user_id >= 50 (should match both regions partially)
743        let query_exprs = vec![col("user_id").gt_eq(Value::Int64(50))];
744        let mut column_datatypes = HashMap::default();
745        column_datatypes.insert("user_id".to_string(), ConcreteDataType::int64_datatype());
746        let pruned =
747            ConstraintPruner::prune_regions(&query_exprs, &partitions, column_datatypes).unwrap();
748
749        // Region 1: [0,100) intersects with [50,∞) -> includes [50,100)
750        // Region 2: [100,200) is fully contained in [50,∞)
751        assert_eq!(pruned.len(), 2);
752        assert!(pruned.contains(&RegionId::new(1, 1)));
753        assert!(pruned.contains(&RegionId::new(1, 2)));
754    }
755}