1use snafu::ensure;
16use sqlparser::ast::{BinaryOperator, Expr, Ident, Value};
17
18use crate::error::{InvalidPartitionNumberSnafu, Result};
19use crate::statements::create::Partitions;
20
21const DEFAULT_PARTITION_NUM_FOR_TRACES: u32 = 16;
23
24const MAX_PARTITION_NUM_FOR_TRACES: u32 = 65536;
26
27macro_rules! between_string {
28 ($col: expr, $left_incl: expr, $right_excl: expr) => {
29 Expr::BinaryOp {
30 op: BinaryOperator::And,
31 left: Box::new(Expr::BinaryOp {
32 op: BinaryOperator::GtEq,
33 left: Box::new($col.clone()),
34 right: Box::new(Expr::Value(
35 Value::SingleQuotedString($left_incl.to_string()).into(),
36 )),
37 }),
38 right: Box::new(Expr::BinaryOp {
39 op: BinaryOperator::Lt,
40 left: Box::new($col.clone()),
41 right: Box::new(Expr::Value(
42 Value::SingleQuotedString($right_excl.to_string()).into(),
43 )),
44 }),
45 }
46 };
47}
48
49pub fn partition_rule_for_hexstring(ident: &str, partition_num: Option<u32>) -> Result<Partitions> {
50 Ok(Partitions {
51 column_list: vec![Ident::new(ident)],
52 exprs: partition_rules_for_uuid(
53 partition_num.unwrap_or(DEFAULT_PARTITION_NUM_FOR_TRACES),
54 ident,
55 )?,
56 })
57}
58
59fn partition_rules_for_uuid(partition_num: u32, ident: &str) -> Result<Vec<Expr>> {
61 ensure!(
62 partition_num.is_power_of_two() && (2..=65536).contains(&partition_num),
63 InvalidPartitionNumberSnafu { partition_num }
64 );
65
66 let ident_expr = Expr::Identifier(Ident::new(ident).clone());
67
68 let (total_partitions, hex_length) = {
69 match partition_num {
70 2..=16 => (16, 1),
71 17..=256 => (256, 2),
72 257..=4096 => (4096, 3),
73 4097..=MAX_PARTITION_NUM_FOR_TRACES => (MAX_PARTITION_NUM_FOR_TRACES, 4),
74 _ => unreachable!(),
75 }
76 };
77
78 let partition_size = total_partitions / partition_num;
79 let remainder = total_partitions % partition_num;
80
81 let mut rules = Vec::new();
82 let mut current_boundary = 0;
83 for i in 0..partition_num {
84 let mut size = partition_size;
85 if i < remainder {
86 size += 1;
87 }
88 let start = current_boundary;
89 let end = current_boundary + size;
90
91 if i == 0 {
92 rules.push(Expr::BinaryOp {
94 left: Box::new(ident_expr.clone()),
95 op: BinaryOperator::Lt,
96 right: Box::new(Expr::Value(
97 Value::SingleQuotedString(format!("{:0hex_length$x}", end)).into(),
98 )),
99 });
100 } else if i == partition_num - 1 {
101 rules.push(Expr::BinaryOp {
103 left: Box::new(ident_expr.clone()),
104 op: BinaryOperator::GtEq,
105 right: Box::new(Expr::Value(
106 Value::SingleQuotedString(format!("{:0hex_length$x}", start)).into(),
107 )),
108 });
109 } else {
110 rules.push(between_string!(
112 ident_expr,
113 format!("{:0hex_length$x}", start),
114 format!("{:0hex_length$x}", end)
115 ));
116 }
117
118 current_boundary = end;
119 }
120
121 Ok(rules)
122}
123
124#[cfg(test)]
125mod tests {
126 use sqlparser::ast::{Expr, ValueWithSpan};
127 use sqlparser::dialect::GenericDialect;
128 use sqlparser::parser::Parser;
129 use uuid::Uuid;
130
131 use super::*;
132
133 #[test]
134 fn test_partition_rules_for_uuid() {
135 assert!(check_distribution(2, 10_000)); assert!(check_distribution(4, 10_000)); assert!(check_distribution(8, 10_000)); assert!(check_distribution(16, 10_000)); assert!(check_distribution(32, 10_000)); assert!(check_distribution(64, 100_000)); assert!(check_distribution(128, 100_000)); assert!(check_distribution(256, 100_000)); }
146
147 #[test]
148 fn test_rules() {
149 let expr = vec![
150 "trace_id < '1'",
151 "trace_id >= '1' AND trace_id < '2'",
152 "trace_id >= '2' AND trace_id < '3'",
153 "trace_id >= '3' AND trace_id < '4'",
154 "trace_id >= '4' AND trace_id < '5'",
155 "trace_id >= '5' AND trace_id < '6'",
156 "trace_id >= '6' AND trace_id < '7'",
157 "trace_id >= '7' AND trace_id < '8'",
158 "trace_id >= '8' AND trace_id < '9'",
159 "trace_id >= '9' AND trace_id < 'a'",
160 "trace_id >= 'a' AND trace_id < 'b'",
161 "trace_id >= 'b' AND trace_id < 'c'",
162 "trace_id >= 'c' AND trace_id < 'd'",
163 "trace_id >= 'd' AND trace_id < 'e'",
164 "trace_id >= 'e' AND trace_id < 'f'",
165 "trace_id >= 'f'",
166 ];
167
168 let dialect = GenericDialect {};
169 let results = expr
170 .into_iter()
171 .map(|s| {
172 let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
173 parser.parse_expr().unwrap()
174 })
175 .collect::<Vec<Expr>>();
176
177 assert_eq!(
178 results,
179 partition_rule_for_hexstring("trace_id", None)
180 .unwrap()
181 .exprs
182 );
183
184 let expr = vec![
186 "trace_id < '4'",
187 "trace_id >= '4' AND trace_id < '8'",
188 "trace_id >= '8' AND trace_id < 'c'",
189 "trace_id >= 'c'",
190 ];
191 let results = expr
192 .into_iter()
193 .map(|s| {
194 let mut parser = Parser::new(&dialect).try_with_sql(s).unwrap();
195 parser.parse_expr().unwrap()
196 })
197 .collect::<Vec<Expr>>();
198 assert_eq!(
199 results,
200 partition_rule_for_hexstring("trace_id", Some(4))
201 .unwrap()
202 .exprs
203 );
204 }
205
206 fn check_distribution(test_partition: u32, test_uuid_num: usize) -> bool {
207 let rules = partition_rules_for_uuid(test_partition, "test_trace_id").unwrap();
209 let upper_bounds = upper_bounds_from_rules(&rules);
210
211 let mut counts = vec![0usize; test_partition as usize];
213 for _ in 0..test_uuid_num {
214 let uuid = Uuid::new_v4().simple().to_string();
215 let partition = allocate_partition_for_uuid(uuid.as_str(), &upper_bounds);
216 counts[partition] += 1;
217 }
218
219 let expected_ratio = 100.0 / test_partition as f64;
221
222 let tolerance = 100.0 / test_partition as f64 * 0.30;
224
225 for count in counts {
227 let ratio = (count as f64 / test_uuid_num as f64) * 100.0;
228 if (ratio - expected_ratio).abs() >= tolerance {
229 return false;
230 }
231 }
232
233 true
234 }
235
236 fn upper_bounds_from_rules<'a>(rules: &'a [Expr]) -> Vec<&'a str> {
237 let mut upper_bounds = Vec::with_capacity(rules.len().saturating_sub(1));
238 let mut prev_upper: Option<&'a str> = None;
239
240 for (idx, rule) in rules.iter().enumerate() {
241 let (lower, upper) = extract_rule_bounds(rule);
242 match idx {
243 0 => {
244 assert!(lower.is_none());
245 assert!(upper.is_some());
246 }
247 idx if idx == rules.len() - 1 => {
248 assert_eq!(lower, prev_upper);
249 assert!(upper.is_none());
250 }
251 _ => {
252 assert_eq!(lower, prev_upper);
253 assert!(upper.is_some());
254 }
255 }
256
257 if idx < rules.len() - 1 {
258 upper_bounds.push(upper.unwrap());
259 }
260
261 prev_upper = upper;
262 }
263
264 upper_bounds
265 }
266
267 fn extract_rule_bounds(rule: &Expr) -> (Option<&str>, Option<&str>) {
268 fn extract_single_quoted_string(expr: &Expr) -> Option<&str> {
269 match expr {
270 Expr::Value(ValueWithSpan {
271 value: Value::SingleQuotedString(value),
272 ..
273 }) => Some(value.as_str()),
274 _ => None,
275 }
276 }
277
278 match rule {
279 Expr::BinaryOp {
280 op: BinaryOperator::Lt,
281 right,
282 ..
283 } => (None, extract_single_quoted_string(right)),
284 Expr::BinaryOp {
285 op: BinaryOperator::GtEq,
286 right,
287 ..
288 } => (extract_single_quoted_string(right), None),
289 Expr::BinaryOp {
290 op: BinaryOperator::And,
291 left,
292 right,
293 } => {
294 let lower = match left.as_ref() {
295 Expr::BinaryOp {
296 op: BinaryOperator::GtEq,
297 right,
298 ..
299 } => extract_single_quoted_string(right),
300 _ => None,
301 };
302 let upper = match right.as_ref() {
303 Expr::BinaryOp {
304 op: BinaryOperator::Lt,
305 right,
306 ..
307 } => extract_single_quoted_string(right),
308 _ => None,
309 };
310 (lower, upper)
311 }
312 _ => (None, None),
313 }
314 }
315
316 fn allocate_partition_for_uuid(uuid: &str, upper_bounds: &[&str]) -> usize {
317 upper_bounds.partition_point(|upper| uuid >= *upper)
318 }
319
320 #[test]
321 fn test_extract_rule_bounds() {
322 let rules = partition_rules_for_uuid(16, "trace_id").unwrap();
323 let upper_bounds = upper_bounds_from_rules(&rules);
324 assert_eq!(upper_bounds.len(), 15);
325 }
326}