1use datatypes::prelude::ConcreteDataType;
16use datatypes::value::Value;
17use partition::expr::{Operand, PartitionExpr, RestrictedOp};
18use rand::Rng;
19use snafu::ensure;
20
21use crate::error::{self, Result};
22use crate::ir::{Ident, generate_random_value};
23
24pub struct SimplePartitions {
40 pub column_name: Ident,
42 pub bounds: Vec<Value>,
50}
51
52impl SimplePartitions {
53 pub fn new(column_name: Ident, bounds: Vec<Value>) -> Self {
54 Self {
55 column_name,
56 bounds,
57 }
58 }
59
60 pub fn generate(&self) -> Result<Vec<PartitionExpr>> {
64 ensure!(
65 !self.bounds.is_empty(),
66 error::UnexpectedSnafu {
67 violated: "partition bounds must not be empty".to_string(),
68 }
69 );
70 let mut sorted = self.bounds.clone();
71 sorted.sort();
72 let mut exprs = Vec::with_capacity(sorted.len() + 1);
73 let first_bound = sorted[0].clone();
74 let column_name = self.column_name.value.clone();
75 exprs.push(PartitionExpr::new(
76 Operand::Column(column_name.clone()),
77 RestrictedOp::Lt,
78 Operand::Value(first_bound),
79 ));
80 for bound_idx in 1..sorted.len() {
81 exprs.push(PartitionExpr::new(
82 Operand::Expr(PartitionExpr::new(
83 Operand::Column(column_name.clone()),
84 RestrictedOp::GtEq,
85 Operand::Value(sorted[bound_idx - 1].clone()),
86 )),
87 RestrictedOp::And,
88 Operand::Expr(PartitionExpr::new(
89 Operand::Column(column_name.clone()),
90 RestrictedOp::Lt,
91 Operand::Value(sorted[bound_idx].clone()),
92 )),
93 ));
94 }
95 let last_bound = sorted.last().unwrap().clone();
96 exprs.push(PartitionExpr::new(
97 Operand::Column(column_name.clone()),
98 RestrictedOp::GtEq,
99 Operand::Value(last_bound),
100 ));
101
102 Ok(exprs)
103 }
104
105 pub fn from_exprs(column_name: Ident, exprs: &[PartitionExpr]) -> Result<Self> {
124 ensure!(
125 !exprs.is_empty(),
126 error::UnexpectedSnafu {
127 violated: "partition exprs must not be empty".to_string(),
128 }
129 );
130 let mut bounds = Vec::new();
131 for expr in exprs {
132 if let Some(bound) = extract_upper_bound(&column_name.value, expr)? {
133 bounds.push(bound);
134 }
135 }
136 Ok(Self::new(column_name, bounds))
137 }
138
139 pub fn insert_bound(&mut self, bound: Value) -> Result<usize> {
141 ensure!(
142 !self.bounds.contains(&bound),
143 error::UnexpectedSnafu {
144 violated: format!("duplicate bound: {bound}"),
145 }
146 );
147 self.bounds.push(bound.clone());
148 self.bounds.sort();
149
150 let insert_pos = self.bounds.binary_search(&bound).unwrap();
151 Ok(insert_pos)
152 }
153
154 pub fn remove_bound(&mut self, idx: usize) -> Result<Value> {
156 ensure!(
157 idx < self.bounds.len(),
158 error::UnexpectedSnafu {
159 violated: format!("index out of bounds: {idx}"),
160 }
161 );
162 Ok(self.bounds.remove(idx))
163 }
164}
165
166fn extract_upper_bound(column: &str, expr: &PartitionExpr) -> Result<Option<Value>> {
167 match expr.op {
168 RestrictedOp::Lt | RestrictedOp::LtEq => {
169 let value = extract_column_value(column, expr)?;
170 Ok(Some(value))
171 }
172 RestrictedOp::Gt | RestrictedOp::GtEq => Ok(None),
173 RestrictedOp::And => {
174 let left = extract_expr_operand(expr.lhs.as_ref())?;
175 let right = extract_expr_operand(expr.rhs.as_ref())?;
176 let (left_op, left_value) = extract_bound_operand(column, left)?;
177 let (right_op, right_value) = extract_bound_operand(column, right)?;
178 let upper = match (is_upper_op(&left_op), is_upper_op(&right_op)) {
179 (true, false) => Some(left_value),
180 (false, true) => Some(right_value),
181 _ => None,
182 };
183 Ok(upper)
184 }
185 _ => error::UnexpectedSnafu {
186 violated: format!("unsupported partition op: {:?}", expr.op),
187 }
188 .fail(),
189 }
190}
191
192fn extract_expr_operand(operand: &Operand) -> Result<&PartitionExpr> {
193 match operand {
194 Operand::Expr(expr) => Ok(expr),
195 _ => error::UnexpectedSnafu {
196 violated: "expected partition expr operand".to_string(),
197 }
198 .fail(),
199 }
200}
201
202fn extract_bound_operand(column: &str, expr: &PartitionExpr) -> Result<(RestrictedOp, Value)> {
203 let lhs = expr.lhs.as_ref();
204 let rhs = expr.rhs.as_ref();
205 match (lhs, rhs) {
206 (Operand::Column(col), Operand::Value(value)) if col == column => {
207 Ok((expr.op.clone(), value.clone()))
208 }
209 _ => error::UnexpectedSnafu {
210 violated: format!("unexpected partition expr for column: {column}"),
211 }
212 .fail(),
213 }
214}
215
216fn extract_column_value(column: &str, expr: &PartitionExpr) -> Result<Value> {
217 let (op, value) = extract_bound_operand(column, expr)?;
218 ensure!(
219 is_upper_op(&op),
220 error::UnexpectedSnafu {
221 violated: format!("expected upper bound op, got: {:?}", op),
222 }
223 );
224 Ok(value)
225}
226
227fn is_upper_op(op: &RestrictedOp) -> bool {
228 matches!(op, RestrictedOp::Lt | RestrictedOp::LtEq)
229}
230
231pub fn generate_unique_bound<R: Rng + 'static>(
233 rng: &mut R,
234 datatype: &ConcreteDataType,
235 bounds: &[Value],
236) -> Result<Value> {
237 for _ in 0..16 {
238 let candidate = generate_random_value(rng, datatype, None);
239 if !bounds.contains(&candidate) {
240 return Ok(candidate);
241 }
242 }
243 error::UnexpectedSnafu {
244 violated: "unable to generate unique partition bound".to_string(),
245 }
246 .fail()
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252
253 #[test]
254 fn test_simple_partitions() {
255 let partitions =
256 SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
257 let exprs = partitions.generate().unwrap();
258 assert_eq!(exprs.len(), 3);
259 assert_eq!(exprs[0].to_string(), "age < 10");
260 assert_eq!(exprs[1].to_string(), "age >= 10 AND age < 20");
261 assert_eq!(exprs[2].to_string(), "age >= 20");
262 }
263
264 #[test]
265 fn test_simple_partitions_from_exprs() {
266 let partitions =
267 SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
268 let exprs = partitions.generate().unwrap();
269 let partitions = SimplePartitions::from_exprs(Ident::new("age"), &exprs).unwrap();
270 assert_eq!(partitions.bounds, vec![Value::from(10), Value::from(20)]);
271 }
272}