1use datatypes::prelude::ConcreteDataType;
16use datatypes::value::Value;
17use partition::expr::{Operand, PartitionExpr, RestrictedOp};
18use rand::Rng;
19use snafu::ensure;
20
21use crate::context::TableContext;
22use crate::error::{self, Result};
23use crate::ir::{Ident, generate_random_value, string_value};
24
25#[derive(Clone)]
41pub struct SimplePartitions {
42 pub column_name: Ident,
44 pub bounds: Vec<Value>,
52}
53
54impl SimplePartitions {
55 pub fn new(column_name: Ident, bounds: Vec<Value>) -> Self {
56 Self {
57 column_name,
58 bounds,
59 }
60 }
61
62 pub fn generate(&self) -> Result<Vec<PartitionExpr>> {
66 ensure!(
67 !self.bounds.is_empty(),
68 error::UnexpectedSnafu {
69 violated: "partition bounds must not be empty".to_string(),
70 }
71 );
72 let mut sorted = self.bounds.clone();
73 sorted.sort();
74 let mut exprs = Vec::with_capacity(sorted.len() + 1);
75 let first_bound = sorted[0].clone();
76 let column_name = self.column_name.value.clone();
77 exprs.push(PartitionExpr::new(
78 Operand::Column(column_name.clone()),
79 RestrictedOp::Lt,
80 Operand::Value(first_bound),
81 ));
82 for bound_idx in 1..sorted.len() {
83 exprs.push(PartitionExpr::new(
84 Operand::Expr(PartitionExpr::new(
85 Operand::Column(column_name.clone()),
86 RestrictedOp::GtEq,
87 Operand::Value(sorted[bound_idx - 1].clone()),
88 )),
89 RestrictedOp::And,
90 Operand::Expr(PartitionExpr::new(
91 Operand::Column(column_name.clone()),
92 RestrictedOp::Lt,
93 Operand::Value(sorted[bound_idx].clone()),
94 )),
95 ));
96 }
97 let last_bound = sorted.last().unwrap().clone();
98 exprs.push(PartitionExpr::new(
99 Operand::Column(column_name.clone()),
100 RestrictedOp::GtEq,
101 Operand::Value(last_bound),
102 ));
103
104 Ok(exprs)
105 }
106
107 pub fn from_exprs(column_name: Ident, exprs: &[PartitionExpr]) -> Result<Self> {
126 ensure!(
127 !exprs.is_empty(),
128 error::UnexpectedSnafu {
129 violated: "partition exprs must not be empty".to_string(),
130 }
131 );
132 let mut bounds = Vec::new();
133 for expr in exprs {
134 if let Some(bound) = extract_upper_bound(&column_name.value, expr)? {
135 bounds.push(bound);
136 }
137 }
138 Ok(Self::new(column_name, bounds))
139 }
140
141 pub fn from_table_ctx(table_ctx: &TableContext) -> Result<Self> {
143 let partition_def = table_ctx
144 .partition
145 .as_ref()
146 .expect("expected partition def");
147 Self::from_exprs(partition_def.columns[0].clone(), &partition_def.exprs)
148 }
149
150 pub fn insert_bound(&mut self, bound: Value) -> Result<usize> {
152 ensure!(
153 !self.bounds.contains(&bound),
154 error::UnexpectedSnafu {
155 violated: format!("duplicate bound: {bound}"),
156 }
157 );
158 self.bounds.push(bound.clone());
159 self.bounds.sort();
160
161 let insert_pos = self.bounds.binary_search(&bound).unwrap();
162 Ok(insert_pos)
163 }
164
165 pub fn remove_bound(&mut self, idx: usize) -> Result<Value> {
167 ensure!(
168 idx < self.bounds.len(),
169 error::UnexpectedSnafu {
170 violated: format!("index out of bounds: {idx}"),
171 }
172 );
173 Ok(self.bounds.remove(idx))
174 }
175}
176
177fn extract_upper_bound(column: &str, expr: &PartitionExpr) -> Result<Option<Value>> {
178 match expr.op {
179 RestrictedOp::Lt | RestrictedOp::LtEq => {
180 let value = extract_column_value(column, expr)?;
181 Ok(Some(value))
182 }
183 RestrictedOp::Gt | RestrictedOp::GtEq => Ok(None),
184 RestrictedOp::And => {
185 let left = extract_expr_operand(expr.lhs.as_ref())?;
186 let right = extract_expr_operand(expr.rhs.as_ref())?;
187 let (left_op, left_value) = extract_bound_operand(column, left)?;
188 let (right_op, right_value) = extract_bound_operand(column, right)?;
189 let upper = match (is_upper_op(&left_op), is_upper_op(&right_op)) {
190 (true, false) => Some(left_value),
191 (false, true) => Some(right_value),
192 _ => None,
193 };
194 Ok(upper)
195 }
196 _ => error::UnexpectedSnafu {
197 violated: format!("unsupported partition op: {:?}", expr.op),
198 }
199 .fail(),
200 }
201}
202
203fn extract_expr_operand(operand: &Operand) -> Result<&PartitionExpr> {
204 match operand {
205 Operand::Expr(expr) => Ok(expr),
206 _ => error::UnexpectedSnafu {
207 violated: "expected partition expr operand".to_string(),
208 }
209 .fail(),
210 }
211}
212
213fn extract_bound_operand(column: &str, expr: &PartitionExpr) -> Result<(RestrictedOp, Value)> {
214 let lhs = expr.lhs.as_ref();
215 let rhs = expr.rhs.as_ref();
216 match (lhs, rhs) {
217 (Operand::Column(col), Operand::Value(value)) if col == column => {
218 Ok((expr.op.clone(), value.clone()))
219 }
220 _ => error::UnexpectedSnafu {
221 violated: format!("unexpected partition expr for column: {column}"),
222 }
223 .fail(),
224 }
225}
226
227fn extract_column_value(column: &str, expr: &PartitionExpr) -> Result<Value> {
228 let (op, value) = extract_bound_operand(column, expr)?;
229 ensure!(
230 is_upper_op(&op),
231 error::UnexpectedSnafu {
232 violated: format!("expected upper bound op, got: {:?}", op),
233 }
234 );
235 Ok(value)
236}
237
238fn is_upper_op(op: &RestrictedOp) -> bool {
239 matches!(op, RestrictedOp::Lt | RestrictedOp::LtEq)
240}
241
242pub fn generate_unique_bound<R: Rng + 'static>(
244 rng: &mut R,
245 datatype: &ConcreteDataType,
246 bounds: &[Value],
247) -> Result<Value> {
248 if matches!(datatype, ConcreteDataType::String(_)) {
249 return string_value::generate_unique_partition_bound(rng, bounds);
250 }
251
252 for _ in 0..16 {
253 let candidate = generate_random_value(rng, datatype, None);
254 if !bounds.contains(&candidate) {
255 return Ok(candidate);
256 }
257 }
258 error::UnexpectedSnafu {
259 violated: "unable to generate unique partition bound".to_string(),
260 }
261 .fail()
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn test_simple_partitions() {
270 let partitions =
271 SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
272 let exprs = partitions.generate().unwrap();
273 assert_eq!(exprs.len(), 3);
274 assert_eq!(exprs[0].to_string(), "age < 10");
275 assert_eq!(exprs[1].to_string(), "age >= 10 AND age < 20");
276 assert_eq!(exprs[2].to_string(), "age >= 20");
277 }
278
279 #[test]
280 fn test_simple_partitions_from_exprs() {
281 let partitions =
282 SimplePartitions::new(Ident::new("age"), vec![Value::from(10), Value::from(20)]);
283 let exprs = partitions.generate().unwrap();
284 let partitions = SimplePartitions::from_exprs(Ident::new("age"), &exprs).unwrap();
285 assert_eq!(partitions.bounds, vec![Value::from(10), Value::from(20)]);
286 }
287}