tests_fuzz/ir/
partition_expr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use datatypes::prelude::ConcreteDataType;
16use datatypes::value::Value;
17use partition::expr::{Operand, PartitionExpr, RestrictedOp};
18use rand::Rng;
19use snafu::ensure;
20
21use crate::error::{self, Result};
22use crate::ir::{Ident, generate_random_value};
23
24/// A partitioning scheme that divides a single column into multiple ranges based on provided bounds.
25///
26/// `SimplePartitions` is designed for range-based partitioning on a given column using explicit boundary values.
27/// Each partition is represented by an interval. With `n` bounds, there are `n+1` resulting partitions.
28///
29/// # Example
30///
31/// Partitioning by the column `"age"` with bounds `[10, 20]` generates the following partitions:
32/// - Partition 1: `age < 10`
33/// - Partition 2: `age >= 10 AND age < 20`
34/// - Partition 3: `age >= 20`
35///
36/// # Fields
37/// - `column_name`: The name of the column used for partitioning.
38/// - `bounds`: The partition boundary values; must be sorted for correct partitioning logic.
39pub struct SimplePartitions {
40    /// The column to partition by.
41    pub column_name: Ident,
42    /// The boundaries that define each partition range.
43    ///
44    /// With `k` bounds, the following partitions are created:
45    /// - `< bound[0]`
46    /// - `[bound[0], bound[1])`
47    /// - ...
48    /// - `>= bound[k-1]`
49    pub bounds: Vec<Value>,
50}
51
52impl SimplePartitions {
53    pub fn new(column_name: Ident, bounds: Vec<Value>) -> Self {
54        Self {
55            column_name,
56            bounds,
57        }
58    }
59
60    /// Generates partition expressions for the defined bounds on a single column.
61    ///
62    /// Returns a vector of `PartitionExpr` representing all resulting partitions.
63    pub fn generate(&self) -> Result<Vec<PartitionExpr>> {
64        ensure!(
65            !self.bounds.is_empty(),
66            error::UnexpectedSnafu {
67                violated: "partition bounds must not be empty".to_string(),
68            }
69        );
70        let mut sorted = self.bounds.clone();
71        sorted.sort();
72        let mut exprs = Vec::with_capacity(sorted.len() + 1);
73        let first_bound = sorted[0].clone();
74        let column_name = self.column_name.value.clone();
75        exprs.push(PartitionExpr::new(
76            Operand::Column(column_name.clone()),
77            RestrictedOp::Lt,
78            Operand::Value(first_bound),
79        ));
80        for bound_idx in 1..sorted.len() {
81            exprs.push(PartitionExpr::new(
82                Operand::Expr(PartitionExpr::new(
83                    Operand::Column(column_name.clone()),
84                    RestrictedOp::GtEq,
85                    Operand::Value(sorted[bound_idx - 1].clone()),
86                )),
87                RestrictedOp::And,
88                Operand::Expr(PartitionExpr::new(
89                    Operand::Column(column_name.clone()),
90                    RestrictedOp::Lt,
91                    Operand::Value(sorted[bound_idx].clone()),
92                )),
93            ));
94        }
95        let last_bound = sorted.last().unwrap().clone();
96        exprs.push(PartitionExpr::new(
97            Operand::Column(column_name.clone()),
98            RestrictedOp::GtEq,
99            Operand::Value(last_bound),
100        ));
101
102        Ok(exprs)
103    }
104
105    /// Reconstructs a `SimplePartitions` instance from a slice of `PartitionExpr`.
106    ///
107    /// This extracts the upper-bound partition values for a given column from a list of
108    /// partition expressions (typically produced by `generate`). Only expressions that
109    /// define an upper bound for the column are included. Must not be passed an empty slice.
110    ///
111    /// # Arguments
112    ///
113    /// * `column_name` - The name of the column to partition by.
114    /// * `exprs` - A list of partition expressions, each specifying a partition's bounding condition.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if `exprs` is empty or if any expression cannot be parsed for bounds.
119    ///
120    /// # Returns
121    ///
122    /// A [`SimplePartitions`] value, where the bounds vector contains each extracted upper bound.
123    pub fn from_exprs(column_name: Ident, exprs: &[PartitionExpr]) -> Result<Self> {
124        ensure!(
125            !exprs.is_empty(),
126            error::UnexpectedSnafu {
127                violated: "partition exprs must not be empty".to_string(),
128            }
129        );
130        let mut bounds = Vec::new();
131        for expr in exprs {
132            if let Some(bound) = extract_upper_bound(&column_name.value, expr)? {
133                bounds.push(bound);
134            }
135        }
136        Ok(Self::new(column_name, bounds))
137    }
138
139    /// Inserts a new bound into the partition bounds and returns the index of the new bound.
140    pub fn insert_bound(&mut self, bound: Value) -> Result<usize> {
141        ensure!(
142            !self.bounds.contains(&bound),
143            error::UnexpectedSnafu {
144                violated: format!("duplicate bound: {bound}"),
145            }
146        );
147        self.bounds.push(bound.clone());
148        self.bounds.sort();
149
150        let insert_pos = self.bounds.binary_search(&bound).unwrap();
151        Ok(insert_pos)
152    }
153
154    /// Removes a bound at the specified index and returns the removed bound.
155    pub fn remove_bound(&mut self, idx: usize) -> Result<Value> {
156        ensure!(
157            idx < self.bounds.len(),
158            error::UnexpectedSnafu {
159                violated: format!("index out of bounds: {idx}"),
160            }
161        );
162        Ok(self.bounds.remove(idx))
163    }
164}
165
166fn extract_upper_bound(column: &str, expr: &PartitionExpr) -> Result<Option<Value>> {
167    match expr.op {
168        RestrictedOp::Lt | RestrictedOp::LtEq => {
169            let value = extract_column_value(column, expr)?;
170            Ok(Some(value))
171        }
172        RestrictedOp::Gt | RestrictedOp::GtEq => Ok(None),
173        RestrictedOp::And => {
174            let left = extract_expr_operand(expr.lhs.as_ref())?;
175            let right = extract_expr_operand(expr.rhs.as_ref())?;
176            let (left_op, left_value) = extract_bound_operand(column, left)?;
177            let (right_op, right_value) = extract_bound_operand(column, right)?;
178            let upper = match (is_upper_op(&left_op), is_upper_op(&right_op)) {
179                (true, false) => Some(left_value),
180                (false, true) => Some(right_value),
181                _ => None,
182            };
183            Ok(upper)
184        }
185        _ => error::UnexpectedSnafu {
186            violated: format!("unsupported partition op: {:?}", expr.op),
187        }
188        .fail(),
189    }
190}
191
192fn extract_expr_operand(operand: &Operand) -> Result<&PartitionExpr> {
193    match operand {
194        Operand::Expr(expr) => Ok(expr),
195        _ => error::UnexpectedSnafu {
196            violated: "expected partition expr operand".to_string(),
197        }
198        .fail(),
199    }
200}
201
202fn extract_bound_operand(column: &str, expr: &PartitionExpr) -> Result<(RestrictedOp, Value)> {
203    let lhs = expr.lhs.as_ref();
204    let rhs = expr.rhs.as_ref();
205    match (lhs, rhs) {
206        (Operand::Column(col), Operand::Value(value)) if col == column => {
207            Ok((expr.op.clone(), value.clone()))
208        }
209        _ => error::UnexpectedSnafu {
210            violated: format!("unexpected partition expr for column: {column}"),
211        }
212        .fail(),
213    }
214}
215
216fn extract_column_value(column: &str, expr: &PartitionExpr) -> Result<Value> {
217    let (op, value) = extract_bound_operand(column, expr)?;
218    ensure!(
219        is_upper_op(&op),
220        error::UnexpectedSnafu {
221            violated: format!("expected upper bound op, got: {:?}", op),
222        }
223    );
224    Ok(value)
225}
226
227fn is_upper_op(op: &RestrictedOp) -> bool {
228    matches!(op, RestrictedOp::Lt | RestrictedOp::LtEq)
229}
230
231/// Generates a unique partition bound that is not in the given bounds.
232pub fn generate_unique_bound<R: Rng + 'static>(
233    rng: &mut R,
234    datatype: &ConcreteDataType,
235    bounds: &[Value],
236) -> Result<Value> {
237    for _ in 0..16 {
238        let candidate = generate_random_value(rng, datatype, None);
239        if !bounds.contains(&candidate) {
240            return Ok(candidate);
241        }
242    }
243    error::UnexpectedSnafu {
244        violated: "unable to generate unique partition bound".to_string(),
245    }
246    .fail()
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252
253    #[test]
254    fn test_simple_partitions() {
255        let partitions =
256            SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
257        let exprs = partitions.generate().unwrap();
258        assert_eq!(exprs.len(), 3);
259        assert_eq!(exprs[0].to_string(), "age < 10");
260        assert_eq!(exprs[1].to_string(), "age >= 10 AND age < 20");
261        assert_eq!(exprs[2].to_string(), "age >= 20");
262    }
263
264    #[test]
265    fn test_simple_partitions_from_exprs() {
266        let partitions =
267            SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
268        let exprs = partitions.generate().unwrap();
269        let partitions = SimplePartitions::from_exprs(Ident::new("age"), &exprs).unwrap();
270        assert_eq!(partitions.bounds, vec![Value::from(10), Value::from(20)]);
271    }
272}